[Drain] Support extensible element metadata propagation in ReduceFnRunner#38230
[Drain] Support extensible element metadata propagation in ReduceFnRunner#38230stankiewicz wants to merge 3 commits intoapache:masterfrom
Conversation
|
R: @kennknowles |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
… PipelineMetadata
b1dfea2 to
036803e
Compare
|
@kennknowles this is generally finished, the task :runners:core-java:analyzeClassesDependencies is failing because of an undeclared dependency on beam.model:fn-execution. |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a CombinedMetadata class and associated logic to support stateful propagation of pipeline metadata within ReduceFnRunner. The changes include new combiner and coder implementations, as well as updates to state handling and tests. Feedback identifies a performance regression due to per-element state access in processElement, a bug in onTrigger where metadata is lost when windows remain open, and an optimization opportunity for the CombinedMetadata coder to reduce serialization overhead.
| ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG); | ||
| CombinedMetadata currentMetadata = metadataState.read(); | ||
| if (currentMetadata == null) { | ||
| currentMetadata = CombinedMetadata.createDefault(); | ||
| } | ||
| CombinedMetadata inputMetadata = CombinedMetadata.create(value.causedByDrain()); | ||
| CombinedMetadata newMetadata = | ||
| CombinedMetadataCombiner.of().addInput(currentMetadata, inputMetadata); | ||
| metadataState.write(newMetadata); |
There was a problem hiding this comment.
This block introduces a state read and write for every single element processed by ReduceFnRunner. This is a significant performance regression, especially for high-throughput pipelines where causedByDrain is typically NORMAL. Additionally, this logic is currently executed before checking if the window is closed (line 609), meaning state is updated even for elements that will be dropped.
Consider optimizing this to avoid state access in the common case and moving it after the isClosed check.
| ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG); | |
| CombinedMetadata currentMetadata = metadataState.read(); | |
| if (currentMetadata == null) { | |
| currentMetadata = CombinedMetadata.createDefault(); | |
| } | |
| CombinedMetadata inputMetadata = CombinedMetadata.create(value.causedByDrain()); | |
| CombinedMetadata newMetadata = | |
| CombinedMetadataCombiner.of().addInput(currentMetadata, inputMetadata); | |
| metadataState.write(newMetadata); | |
| if (value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN) { | |
| ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG); | |
| CombinedMetadata currentMetadata = metadataState.read(); | |
| if (currentMetadata == null || currentMetadata.causedByDrain() != CausedByDrain.CAUSED_BY_DRAIN) { | |
| metadataState.write(CombinedMetadata.create(CausedByDrain.CAUSED_BY_DRAIN)); | |
| } | |
| } |
| ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG); | ||
| CombinedMetadata aggregatedMetadata = metadataState.read(); | ||
| if (aggregatedMetadata == null) { | ||
| aggregatedMetadata = CombinedMetadata.createDefault(); | ||
| } | ||
| CombinedMetadata fullyAggregatedMetadata = | ||
| CombinedMetadataCombiner.of().addInput(aggregatedMetadata, metadata); | ||
| final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain(); | ||
| if (isFinished) { | ||
| metadataState.clear(); | ||
| } |
There was a problem hiding this comment.
In onTrigger, the metadata from the trigger/timer is combined with the persisted state, but the result is not written back to state if the window remains open (isFinished == false). This means that any metadata associated with the trigger (e.g., a drain signal from a timer) will be lost for subsequent panes of the same window. To ensure "stateful tracking" as intended, the aggregated metadata should be persisted if the window remains open.
| ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG); | |
| CombinedMetadata aggregatedMetadata = metadataState.read(); | |
| if (aggregatedMetadata == null) { | |
| aggregatedMetadata = CombinedMetadata.createDefault(); | |
| } | |
| CombinedMetadata fullyAggregatedMetadata = | |
| CombinedMetadataCombiner.of().addInput(aggregatedMetadata, metadata); | |
| final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain(); | |
| if (isFinished) { | |
| metadataState.clear(); | |
| } | |
| ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG); | |
| CombinedMetadata aggregatedMetadata = metadataState.read(); | |
| CombinedMetadata fullyAggregatedMetadata = | |
| CombinedMetadataCombiner.of() | |
| .addInput( | |
| aggregatedMetadata == null ? CombinedMetadata.createDefault() : aggregatedMetadata, | |
| metadata); | |
| final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain(); | |
| if (isFinished) { | |
| metadataState.clear(); | |
| } else if (!fullyAggregatedMetadata.equals(aggregatedMetadata)) { | |
| metadataState.write(fullyAggregatedMetadata); | |
| } |
| NullableCoder.of(ByteArrayCoder.of()).encode(null, outStream); | ||
| return; | ||
| } | ||
| BeamFnApi.Elements.ElementMetadata.Builder builder = | ||
| BeamFnApi.Elements.ElementMetadata.newBuilder(); | ||
| builder.setDrain( | ||
| value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN | ||
| ? BeamFnApi.Elements.DrainMode.Enum.DRAINING | ||
| : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING); | ||
|
|
||
| NullableCoder.of(ByteArrayCoder.of()).encode(builder.build().toByteArray(), outStream); |
There was a problem hiding this comment.
The Coder implementation uses NullableCoder.of(ByteArrayCoder.of()) to wrap the serialized proto bytes. This adds unnecessary overhead (extra bytes for nullability and length prefixing) since AtomicCoder can handle the serialization directly. Furthermore, ByteArrayCoder is redundant if you are already managing the byte array from the proto. Consider simplifying the coder to write the proto bytes directly to the OutputStream with a simple length prefix if necessary.
Description
This PR refactors the metadata propagation logic in
ReduceFnRunnerto support extensible metadata.Previously, metadata tracking (specifically
causedByDrain) was not stored in state at all during execution inReduceFnRunner, which caused metatada loss failures when firing timers or merging panes. This PR fixes that by introducing a unified state map for element metadata.To make it easier to add future payloads (such as OpenTelemetry context maps or CDC insert/update markers) without modifying method signatures, this change groups targeted fields into a unified container and offloads combination rules to a clean aggregator class.
Key Changes
METADATA_TAG(persistingCombinedMetadata) inReduceFnRunner, ensuring metadata is no longer lost during grouping.CombinedMetadata(guided by@AutoValue) that groups element metadata together.CombinedMetadataCombiner.Open question
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.