diff --git a/tools/celestia-node-fiber/adapter_test.go b/tools/celestia-node-fiber/adapter_test.go index af3e42161..7b5de1915 100644 --- a/tools/celestia-node-fiber/adapter_test.go +++ b/tools/celestia-node-fiber/adapter_test.go @@ -211,7 +211,18 @@ func TestListen_FiltersFibreOnlyAndEmitsEvent(t *testing.T) { return ch, nil }, } - a := cnfiber.FromModules(&fakeFibre{}, blob, 0) + + // Listen now issues a Download per v2 blob to recover the original + // payload size for BlobEvent.DataSize (see Listen's doc comment). + // Feed a deterministic payload back; the test asserts DataSize + // matches its length. + originalPayload := []byte("this is the original blob payload") + fibre := &fakeFibre{ + downloadFn: func(_ context.Context, _ appfibre.BlobID) (*fibreapi.GetBlobResult, error) { + return &fibreapi.GetBlobResult{Data: originalPayload}, nil + }, + } + a := cnfiber.FromModules(fibre, blob, 0) events, err := a.Listen(context.Background(), namespaceBytes()) require.NoError(t, err) require.Equal(t, ns, seenNs) @@ -224,7 +235,8 @@ func TestListen_FiltersFibreOnlyAndEmitsEvent(t *testing.T) { expectedID := appfibre.NewBlobID(0, expectedCommit) require.Equal(t, block.FiberBlobID(expectedID), ev.BlobID) require.Equal(t, uint64(42), ev.Height) - require.Equal(t, uint64(v2Lib.DataLen()), ev.DataSize) + require.Equal(t, uint64(len(originalPayload)), ev.DataSize, + "DataSize must match the original payload length resolved via Download") case <-time.After(time.Second): t.Fatal("timed out waiting for blob event") } diff --git a/tools/celestia-node-fiber/listen.go b/tools/celestia-node-fiber/listen.go index 6ba5c87f1..7c666622a 100644 --- a/tools/celestia-node-fiber/listen.go +++ b/tools/celestia-node-fiber/listen.go @@ -2,6 +2,7 @@ package celestianodefiber import ( "context" + "errors" "fmt" appfibre "github.com/celestiaorg/celestia-app/v8/fibre" @@ -16,6 +17,14 @@ import ( // bridge node for the given namespace and forwards only share-version-2 // (Fibre) blobs as BlobEvents. PFB blobs (v0/v1) sharing the namespace are // dropped so consumers see a pure Fibre event stream. +// +// DataSize on emitted events is the original payload byte length — matching +// the fibermock contract ev-node consumers code against. The v2 share only +// carries (fibre_blob_version + commitment), so the real size isn't derivable +// from the subscription alone; Listen therefore performs a Download per event +// to recover the size before forwarding. This adds one FSP round-trip per +// blob. If that cost becomes material we can expose an opt-out mode, but for +// now correctness over latency. func (a *Adapter) Listen(ctx context.Context, namespace []byte) (<-chan block.FiberBlobEvent, error) { ns, err := toV0Namespace(namespace) if err != nil { @@ -26,14 +35,14 @@ func (a *Adapter) Listen(ctx context.Context, namespace []byte) (<-chan block.Fi return nil, fmt.Errorf("subscribing to blob stream: %w", err) } out := make(chan block.FiberBlobEvent, a.listenChannelSz) - go forwardFibreBlobs(ctx, sub, out) + go a.forwardFibreBlobs(ctx, sub, out) return out, nil } // forwardFibreBlobs drains a blob.SubscriptionResponse stream and emits a // BlobEvent per share-version-2 blob. The output channel is closed when the // subscription closes or ctx is cancelled. -func forwardFibreBlobs( +func (a *Adapter) forwardFibreBlobs( ctx context.Context, sub <-chan *blob.SubscriptionResponse, out chan<- block.FiberBlobEvent, @@ -53,10 +62,13 @@ func forwardFibreBlobs( if b == nil || !b.IsFibreBlob() { continue } - event, err := fibreBlobToEvent(b.Blob, height) + event, err := a.fibreBlobToEvent(ctx, b.Blob, height) if err != nil { - // Skip a malformed v2 blob rather than kill the - // subscription. The server should not produce these. + // Skip a malformed or un-fetchable v2 blob rather than + // kill the subscription. Most likely causes: the v2 + // payload was garbage-collected from FSPs, or the + // download was cancelled. Either way the consumer has + // no actionable signal for this single blob. continue } select { @@ -82,17 +94,17 @@ func resolveHeight(resp *blob.SubscriptionResponse) uint64 { } // fibreBlobToEvent reconstructs the Fibre BlobID (version byte + 32-byte -// commitment) from a share-version-2 libshare.Blob and wraps it as a -// BlobEvent. +// commitment) from a share-version-2 libshare.Blob, downloads the blob to +// determine the original payload size, and wraps everything as a BlobEvent. // -// DataSize caveat: a v2 share carries only (fibre_blob_version + commitment), -// not the original blob payload, so b.DataLen() is the on-chain share size -// (a fixed constant), not the user-facing "how big is this blob" number -// that ev-node's fibermock and its consumers typically expect. Reporting -// the true payload size requires an on-chain query against x/fibre's -// PaymentPromise keyed by commitment. Tracked as a follow-up; for now we -// report the share size so the field is non-zero. -func fibreBlobToEvent(b *libshare.Blob, height uint64) (block.FiberBlobEvent, error) { +// The Download is what makes DataSize accurate. Without it we would have to +// either report the v2 share size (wrong — misleads consumers) or zero +// (lossy). See the Listen doc for the cost / correctness rationale. +func (a *Adapter) fibreBlobToEvent( + ctx context.Context, + b *libshare.Blob, + height uint64, +) (block.FiberBlobEvent, error) { version, err := b.FibreBlobVersion() if err != nil { return block.FiberBlobEvent{}, err @@ -110,9 +122,18 @@ func fibreBlobToEvent(b *libshare.Blob, height uint64) (block.FiberBlobEvent, er var c appfibre.Commitment copy(c[:], commit) id := appfibre.NewBlobID(uint8(version), c) + + res, err := a.fibre.Download(ctx, id) + if err != nil { + return block.FiberBlobEvent{}, fmt.Errorf("resolving payload size via Download: %w", err) + } + if res == nil { + return block.FiberBlobEvent{}, errors.New("fibre.Download returned nil result while resolving payload size") + } + return block.FiberBlobEvent{ BlobID: block.FiberBlobID(id), Height: height, - DataSize: uint64(b.DataLen()), + DataSize: uint64(len(res.Data)), }, nil } diff --git a/tools/celestia-node-fiber/testing/showcase_test.go b/tools/celestia-node-fiber/testing/showcase_test.go index 5c7ec1962..929e540fb 100644 --- a/tools/celestia-node-fiber/testing/showcase_test.go +++ b/tools/celestia-node-fiber/testing/showcase_test.go @@ -131,18 +131,16 @@ func TestShowcase(t *testing.T) { } } - // Every event must carry a real block height. DataSize is - // intentionally not asserted against the payload length: the v2 share - // contains only (fibre_blob_version + commitment), not the original - // payload bytes, so b.DataLen() — what the adapter reports today — is - // always the fixed share-data size, not len(payload). Fixing that - // needs a PaymentPromise chain lookup; tracked as a TODO on the - // adapter's Listen path. + // Every event must carry the right DataSize and a non-zero block + // height. DataSize matches the original payload length because the + // adapter's Listen issues a Download per event to recover it (see + // listen.go). A silent byte truncation anywhere upstream would + // surface here before we even get to the Download round-trip. for key, ev := range seen { require.Greater(t, ev.Height, uint64(0), "BlobEvent %s must carry a real block height", key) - require.Greater(t, ev.DataSize, uint64(0), - "BlobEvent %s must report a non-zero DataSize", key) + require.Equal(t, uint64(len(expected[key])), ev.DataSize, + "BlobEvent %s DataSize must match original payload length", key) } // Round-trip every blob through Download and diff bytes. Walking