From 96d7021b4ff7d6c2f8722e74b869e689f1dcb4d0 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Thu, 23 Apr 2026 14:41:26 +0200 Subject: [PATCH] fix(celestia-node-fiber): report original payload size in BlobEvent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit listen.go previously set BlobEvent.DataSize to b.DataLen(), which for a share-version-2 Fibre blob is always the fixed share-data layout (fibre_blob_version + commitment = 36 bytes) — not the original payload length. That diverges from ev-node's fibermock contract and misleads any consumer that uses DataSize to allocate buffers or report progress. The v2 share genuinely doesn't carry the original size, and x/fibre v8 has no chain query to derive it from the commitment. The only accurate path is to Download the blob and measure. Listen now does exactly that before forwarding each event. The cost is one FSP round-trip per v2 blob; can be made opt-out later if it hurts throughput-sensitive use cases. Tests: - Showcase restores the strict DataSize == len(payload) assertion across all 10 blobs. - Unit test TestListen_FiltersFibreOnlyAndEmitsEvent now stubs fakeFibre.Download to return a deterministic payload and asserts DataSize matches its length. Co-Authored-By: Claude Opus 4.7 (1M context) --- tools/celestia-node-fiber/adapter_test.go | 16 +++++- tools/celestia-node-fiber/listen.go | 53 +++++++++++++------ .../testing/showcase_test.go | 16 +++--- 3 files changed, 58 insertions(+), 27 deletions(-) diff --git a/tools/celestia-node-fiber/adapter_test.go b/tools/celestia-node-fiber/adapter_test.go index af3e42161..7b5de1915 100644 --- a/tools/celestia-node-fiber/adapter_test.go +++ b/tools/celestia-node-fiber/adapter_test.go @@ -211,7 +211,18 @@ func TestListen_FiltersFibreOnlyAndEmitsEvent(t *testing.T) { return ch, nil }, } - a := cnfiber.FromModules(&fakeFibre{}, blob, 0) + + // Listen now issues a Download per v2 blob to recover the original + // payload size for BlobEvent.DataSize (see Listen's doc comment). + // Feed a deterministic payload back; the test asserts DataSize + // matches its length. + originalPayload := []byte("this is the original blob payload") + fibre := &fakeFibre{ + downloadFn: func(_ context.Context, _ appfibre.BlobID) (*fibreapi.GetBlobResult, error) { + return &fibreapi.GetBlobResult{Data: originalPayload}, nil + }, + } + a := cnfiber.FromModules(fibre, blob, 0) events, err := a.Listen(context.Background(), namespaceBytes()) require.NoError(t, err) require.Equal(t, ns, seenNs) @@ -224,7 +235,8 @@ func TestListen_FiltersFibreOnlyAndEmitsEvent(t *testing.T) { expectedID := appfibre.NewBlobID(0, expectedCommit) require.Equal(t, block.FiberBlobID(expectedID), ev.BlobID) require.Equal(t, uint64(42), ev.Height) - require.Equal(t, uint64(v2Lib.DataLen()), ev.DataSize) + require.Equal(t, uint64(len(originalPayload)), ev.DataSize, + "DataSize must match the original payload length resolved via Download") case <-time.After(time.Second): t.Fatal("timed out waiting for blob event") } diff --git a/tools/celestia-node-fiber/listen.go b/tools/celestia-node-fiber/listen.go index 6ba5c87f1..7c666622a 100644 --- a/tools/celestia-node-fiber/listen.go +++ b/tools/celestia-node-fiber/listen.go @@ -2,6 +2,7 @@ package celestianodefiber import ( "context" + "errors" "fmt" appfibre "github.com/celestiaorg/celestia-app/v8/fibre" @@ -16,6 +17,14 @@ import ( // bridge node for the given namespace and forwards only share-version-2 // (Fibre) blobs as BlobEvents. PFB blobs (v0/v1) sharing the namespace are // dropped so consumers see a pure Fibre event stream. +// +// DataSize on emitted events is the original payload byte length — matching +// the fibermock contract ev-node consumers code against. The v2 share only +// carries (fibre_blob_version + commitment), so the real size isn't derivable +// from the subscription alone; Listen therefore performs a Download per event +// to recover the size before forwarding. This adds one FSP round-trip per +// blob. If that cost becomes material we can expose an opt-out mode, but for +// now correctness over latency. func (a *Adapter) Listen(ctx context.Context, namespace []byte) (<-chan block.FiberBlobEvent, error) { ns, err := toV0Namespace(namespace) if err != nil { @@ -26,14 +35,14 @@ func (a *Adapter) Listen(ctx context.Context, namespace []byte) (<-chan block.Fi return nil, fmt.Errorf("subscribing to blob stream: %w", err) } out := make(chan block.FiberBlobEvent, a.listenChannelSz) - go forwardFibreBlobs(ctx, sub, out) + go a.forwardFibreBlobs(ctx, sub, out) return out, nil } // forwardFibreBlobs drains a blob.SubscriptionResponse stream and emits a // BlobEvent per share-version-2 blob. The output channel is closed when the // subscription closes or ctx is cancelled. -func forwardFibreBlobs( +func (a *Adapter) forwardFibreBlobs( ctx context.Context, sub <-chan *blob.SubscriptionResponse, out chan<- block.FiberBlobEvent, @@ -53,10 +62,13 @@ func forwardFibreBlobs( if b == nil || !b.IsFibreBlob() { continue } - event, err := fibreBlobToEvent(b.Blob, height) + event, err := a.fibreBlobToEvent(ctx, b.Blob, height) if err != nil { - // Skip a malformed v2 blob rather than kill the - // subscription. The server should not produce these. + // Skip a malformed or un-fetchable v2 blob rather than + // kill the subscription. Most likely causes: the v2 + // payload was garbage-collected from FSPs, or the + // download was cancelled. Either way the consumer has + // no actionable signal for this single blob. continue } select { @@ -82,17 +94,17 @@ func resolveHeight(resp *blob.SubscriptionResponse) uint64 { } // fibreBlobToEvent reconstructs the Fibre BlobID (version byte + 32-byte -// commitment) from a share-version-2 libshare.Blob and wraps it as a -// BlobEvent. +// commitment) from a share-version-2 libshare.Blob, downloads the blob to +// determine the original payload size, and wraps everything as a BlobEvent. // -// DataSize caveat: a v2 share carries only (fibre_blob_version + commitment), -// not the original blob payload, so b.DataLen() is the on-chain share size -// (a fixed constant), not the user-facing "how big is this blob" number -// that ev-node's fibermock and its consumers typically expect. Reporting -// the true payload size requires an on-chain query against x/fibre's -// PaymentPromise keyed by commitment. Tracked as a follow-up; for now we -// report the share size so the field is non-zero. -func fibreBlobToEvent(b *libshare.Blob, height uint64) (block.FiberBlobEvent, error) { +// The Download is what makes DataSize accurate. Without it we would have to +// either report the v2 share size (wrong — misleads consumers) or zero +// (lossy). See the Listen doc for the cost / correctness rationale. +func (a *Adapter) fibreBlobToEvent( + ctx context.Context, + b *libshare.Blob, + height uint64, +) (block.FiberBlobEvent, error) { version, err := b.FibreBlobVersion() if err != nil { return block.FiberBlobEvent{}, err @@ -110,9 +122,18 @@ func fibreBlobToEvent(b *libshare.Blob, height uint64) (block.FiberBlobEvent, er var c appfibre.Commitment copy(c[:], commit) id := appfibre.NewBlobID(uint8(version), c) + + res, err := a.fibre.Download(ctx, id) + if err != nil { + return block.FiberBlobEvent{}, fmt.Errorf("resolving payload size via Download: %w", err) + } + if res == nil { + return block.FiberBlobEvent{}, errors.New("fibre.Download returned nil result while resolving payload size") + } + return block.FiberBlobEvent{ BlobID: block.FiberBlobID(id), Height: height, - DataSize: uint64(b.DataLen()), + DataSize: uint64(len(res.Data)), }, nil } diff --git a/tools/celestia-node-fiber/testing/showcase_test.go b/tools/celestia-node-fiber/testing/showcase_test.go index 5c7ec1962..929e540fb 100644 --- a/tools/celestia-node-fiber/testing/showcase_test.go +++ b/tools/celestia-node-fiber/testing/showcase_test.go @@ -131,18 +131,16 @@ func TestShowcase(t *testing.T) { } } - // Every event must carry a real block height. DataSize is - // intentionally not asserted against the payload length: the v2 share - // contains only (fibre_blob_version + commitment), not the original - // payload bytes, so b.DataLen() — what the adapter reports today — is - // always the fixed share-data size, not len(payload). Fixing that - // needs a PaymentPromise chain lookup; tracked as a TODO on the - // adapter's Listen path. + // Every event must carry the right DataSize and a non-zero block + // height. DataSize matches the original payload length because the + // adapter's Listen issues a Download per event to recover it (see + // listen.go). A silent byte truncation anywhere upstream would + // surface here before we even get to the Download round-trip. for key, ev := range seen { require.Greater(t, ev.Height, uint64(0), "BlobEvent %s must carry a real block height", key) - require.Greater(t, ev.DataSize, uint64(0), - "BlobEvent %s must report a non-zero DataSize", key) + require.Equal(t, uint64(len(expected[key])), ev.DataSize, + "BlobEvent %s DataSize must match original payload length", key) } // Round-trip every blob through Download and diff bytes. Walking