Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions tools/celestia-node-fiber/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,18 @@ func TestListen_FiltersFibreOnlyAndEmitsEvent(t *testing.T) {
return ch, nil
},
}
a := cnfiber.FromModules(&fakeFibre{}, blob, 0)

// Listen now issues a Download per v2 blob to recover the original
// payload size for BlobEvent.DataSize (see Listen's doc comment).
// Feed a deterministic payload back; the test asserts DataSize
// matches its length.
originalPayload := []byte("this is the original blob payload")
fibre := &fakeFibre{
downloadFn: func(_ context.Context, _ appfibre.BlobID) (*fibreapi.GetBlobResult, error) {
return &fibreapi.GetBlobResult{Data: originalPayload}, nil
},
}
a := cnfiber.FromModules(fibre, blob, 0)
events, err := a.Listen(context.Background(), namespaceBytes())
require.NoError(t, err)
require.Equal(t, ns, seenNs)
Expand All @@ -224,7 +235,8 @@ func TestListen_FiltersFibreOnlyAndEmitsEvent(t *testing.T) {
expectedID := appfibre.NewBlobID(0, expectedCommit)
require.Equal(t, block.FiberBlobID(expectedID), ev.BlobID)
require.Equal(t, uint64(42), ev.Height)
require.Equal(t, uint64(v2Lib.DataLen()), ev.DataSize)
require.Equal(t, uint64(len(originalPayload)), ev.DataSize,
"DataSize must match the original payload length resolved via Download")
case <-time.After(time.Second):
t.Fatal("timed out waiting for blob event")
}
Expand Down
53 changes: 37 additions & 16 deletions tools/celestia-node-fiber/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package celestianodefiber

import (
"context"
"errors"
"fmt"

appfibre "github.com/celestiaorg/celestia-app/v8/fibre"
Expand All @@ -16,6 +17,14 @@ import (
// bridge node for the given namespace and forwards only share-version-2
// (Fibre) blobs as BlobEvents. PFB blobs (v0/v1) sharing the namespace are
// dropped so consumers see a pure Fibre event stream.
//
// DataSize on emitted events is the original payload byte length — matching
// the fibermock contract ev-node consumers code against. The v2 share only
// carries (fibre_blob_version + commitment), so the real size isn't derivable
// from the subscription alone; Listen therefore performs a Download per event
// to recover the size before forwarding. This adds one FSP round-trip per
// blob. If that cost becomes material we can expose an opt-out mode, but for
// now correctness over latency.
func (a *Adapter) Listen(ctx context.Context, namespace []byte) (<-chan block.FiberBlobEvent, error) {
ns, err := toV0Namespace(namespace)
if err != nil {
Expand All @@ -26,14 +35,14 @@ func (a *Adapter) Listen(ctx context.Context, namespace []byte) (<-chan block.Fi
return nil, fmt.Errorf("subscribing to blob stream: %w", err)
}
out := make(chan block.FiberBlobEvent, a.listenChannelSz)
go forwardFibreBlobs(ctx, sub, out)
go a.forwardFibreBlobs(ctx, sub, out)
return out, nil
}

// forwardFibreBlobs drains a blob.SubscriptionResponse stream and emits a
// BlobEvent per share-version-2 blob. The output channel is closed when the
// subscription closes or ctx is cancelled.
func forwardFibreBlobs(
func (a *Adapter) forwardFibreBlobs(
ctx context.Context,
sub <-chan *blob.SubscriptionResponse,
out chan<- block.FiberBlobEvent,
Expand All @@ -53,10 +62,13 @@ func forwardFibreBlobs(
if b == nil || !b.IsFibreBlob() {
continue
}
event, err := fibreBlobToEvent(b.Blob, height)
event, err := a.fibreBlobToEvent(ctx, b.Blob, height)
if err != nil {
// Skip a malformed v2 blob rather than kill the
// subscription. The server should not produce these.
// Skip a malformed or un-fetchable v2 blob rather than
// kill the subscription. Most likely causes: the v2
// payload was garbage-collected from FSPs, or the
// download was cancelled. Either way the consumer has
// no actionable signal for this single blob.
continue
}
select {
Expand All @@ -82,17 +94,17 @@ func resolveHeight(resp *blob.SubscriptionResponse) uint64 {
}

// fibreBlobToEvent reconstructs the Fibre BlobID (version byte + 32-byte
// commitment) from a share-version-2 libshare.Blob and wraps it as a
// BlobEvent.
// commitment) from a share-version-2 libshare.Blob, downloads the blob to
// determine the original payload size, and wraps everything as a BlobEvent.
//
// DataSize caveat: a v2 share carries only (fibre_blob_version + commitment),
// not the original blob payload, so b.DataLen() is the on-chain share size
// (a fixed constant), not the user-facing "how big is this blob" number
// that ev-node's fibermock and its consumers typically expect. Reporting
// the true payload size requires an on-chain query against x/fibre's
// PaymentPromise keyed by commitment. Tracked as a follow-up; for now we
// report the share size so the field is non-zero.
func fibreBlobToEvent(b *libshare.Blob, height uint64) (block.FiberBlobEvent, error) {
// The Download is what makes DataSize accurate. Without it we would have to
// either report the v2 share size (wrong — misleads consumers) or zero
// (lossy). See the Listen doc for the cost / correctness rationale.
func (a *Adapter) fibreBlobToEvent(
ctx context.Context,
b *libshare.Blob,
height uint64,
) (block.FiberBlobEvent, error) {
version, err := b.FibreBlobVersion()
if err != nil {
return block.FiberBlobEvent{}, err
Expand All @@ -110,9 +122,18 @@ func fibreBlobToEvent(b *libshare.Blob, height uint64) (block.FiberBlobEvent, er
var c appfibre.Commitment
copy(c[:], commit)
id := appfibre.NewBlobID(uint8(version), c)

res, err := a.fibre.Download(ctx, id)
if err != nil {
return block.FiberBlobEvent{}, fmt.Errorf("resolving payload size via Download: %w", err)
}
if res == nil {
return block.FiberBlobEvent{}, errors.New("fibre.Download returned nil result while resolving payload size")
}

return block.FiberBlobEvent{
BlobID: block.FiberBlobID(id),
Height: height,
DataSize: uint64(b.DataLen()),
DataSize: uint64(len(res.Data)),
}, nil
}
16 changes: 7 additions & 9 deletions tools/celestia-node-fiber/testing/showcase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,16 @@ func TestShowcase(t *testing.T) {
}
}

// Every event must carry a real block height. DataSize is
// intentionally not asserted against the payload length: the v2 share
// contains only (fibre_blob_version + commitment), not the original
// payload bytes, so b.DataLen() — what the adapter reports today — is
// always the fixed share-data size, not len(payload). Fixing that
// needs a PaymentPromise chain lookup; tracked as a TODO on the
// adapter's Listen path.
// Every event must carry the right DataSize and a non-zero block
// height. DataSize matches the original payload length because the
// adapter's Listen issues a Download per event to recover it (see
// listen.go). A silent byte truncation anywhere upstream would
// surface here before we even get to the Download round-trip.
for key, ev := range seen {
require.Greater(t, ev.Height, uint64(0),
"BlobEvent %s must carry a real block height", key)
require.Greater(t, ev.DataSize, uint64(0),
"BlobEvent %s must report a non-zero DataSize", key)
require.Equal(t, uint64(len(expected[key])), ev.DataSize,
"BlobEvent %s DataSize must match original payload length", key)
}

// Round-trip every blob through Download and diff bytes. Walking
Expand Down
Loading