[FLINK-38450][iceberg] Fix duplicate records when schema change splits writes within a checkpoint#4360
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses duplicate rows in the Iceberg pipeline sink when a schema change causes writes to be split into multiple batches within the same Flink checkpoint, by introducing per-table batch indexing on the writer side and committing batches with ordered Iceberg snapshots on the committer side.
Changes:
- Scope writer flush behavior to the affected table on schema-change events and introduce a per-table
batchIndexpropagated viaWriteResultWrapper. - Commit per-checkpoint batches as separate Iceberg snapshots with
flink.batch-index/flink.checkpoint-idsnapshot properties to enforce sequence-number ordering and support idempotent retry after partial commits. - Add unit tests covering intra-checkpoint flush splitting, multiple schema changes, retry after partial commit, and cross-table flush isolation.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
.../IcebergWriterTest.java |
Adds regression tests for duplicate prevention across schema-change flush splits and retry scenarios. |
.../WriteResultWrapper.java |
Adds batchIndex field to carry per-table batch ordering information to the committer. |
.../IcebergWriter.java |
Replaces global flush rotation with flushTableWriter(tableId) on schema change; tracks per-table batch indices. |
.../IcebergCommitter.java |
Commits batches as sequential snapshots and adds snapshot properties for batch/checkpoint tracking and retry skipping. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
The current fix uses a per-writer batchIndex counter to track flush batches within a checkpoint. However, when the IcebergWriter runs Here is my implementation(there is no merge upstream, only refer to this part of logic)
|
|
Thanks for the detailed review, @fcfangcc, this was a very helpful catch. You’re right there were two issues here:
Retry safety: The changes are in this commit for reference: I also looked at your implementation. Using explicit per-snapshot markers to track batch boundaries makes sense. One concern is retry behavior: if the committer crashes after batch 0 but before batch 1, the retry would see the checkpoint id property on the batch 0 snapshot and skip the entire checkpoint, leaving batch 1’s data uncommitted. In this approach, only the final non-empty batch sets that property, so retries resume from the next uncommitted batch instead of skipping the checkpoint entirely. Happy to discuss further, and I would appreciate your feedback on this approach. |
4eba0f0 to
a618352
Compare
|
Hi @fcfangcc , just wanted to follow up on this thread when you get a chance. Happy to clarify or adjust anything based on your feedback. |
😂 I am not a maintainer, but I also encountered the same problem to be solved by the main branch |
|
Thanks @fcfangcc for the clarification. Hi @lvyanquan, would appreciate your review on this PR when you get a chance. Happy to make any adjustments based on your feedback. |
…in a checkpoint When a schema-change event arrives mid-checkpoint, the writer flushes the affected table before applying the new schema, producing two batches for the same table. Previously these were merged into one RowDelta and committed as a single Iceberg snapshot. Because Iceberg equality-delete files only suppress data with a strictly lower sequence number, same-snapshot deletes were ineffective and both versions of a row appeared on read. - flush(boolean) is now a no-op to prevent unrelated tables from being split into multiple batches on non-schema-change flushes - Schema-change events call flushTableWriter(tableId) to flush only the affected table; a per-table batchIndex increments on each flush - Each batch is committed as a separate Iceberg snapshot so equality-deletes in batch N have a strictly higher sequence number than data in batch M (M<N) - flink.batch-index and flink.checkpoint-id snapshot properties enable retry-safe idempotency: on failure, the committer resumes from the last uncommitted batch without re-committing already-persisted files Tests added for: same-PK dedup across batches, schema-change split correctness, retry after partial batch commit, multiple schema changes in one checkpoint, and multi-table isolation.
…g sink Address parallelism issues identified during review: - Writer: Advance tableBatchIndexMap before the writer == null guard so all subtasks stay in sync when a subtask has no data for the table at schema-change time - Writer: Skip flushTableWriter on initial CreateTableEvent since no data has been written yet and there is nothing to split - Committer: Group WriteResultWrappers by batchIndex using a TreeMap, so wrappers from different subtasks with the same batchIndex are merged into a single Iceberg snapshot instead of being committed separately Tests added: - testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange - testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData - testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot - testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges
…s parameter After rebasing onto upstream master (which merged FLINK-39342), IcebergWriter and IcebergCommitter constructors now require an additional hadoopConfOptions parameter. Updated all new test methods to pass new HashMap<>() for this param.
b1c46f7 to
2f0ed03
Compare
TL;DR: Fix duplicate rows caused by same-snapshot equality-deletes by committing per-batch snapshots with increasing sequence numbers.
Root Cause
When a schema-change event arrives mid-checkpoint, the writer flushes the affected table before applying the new schema.
This produces two batches within one checkpoint:
Previously, all batches for a table were merged into a single Iceberg
RowDeltaand committed as one snapshot.Iceberg equality-delete files only suppress data with strictly lower sequence numbers. When data files and equality-deletes are committed in the same snapshot, they share the same sequence number, so deletes are ineffective. As a result, both versions of a row remain visible, causing duplicates.
A secondary issue was that
flush(boolean)rotated all table writers globally, unnecessarily splitting unrelated tables into multiple batches.Fix
Writer-side (scope reduction)
flush(boolean)no longer rotates task writers globally and becomes a no-op for non-schema-change pathsflushTableWriter(tableId), flushing only the affected tablebatchIndexincrements on each flush and is propagated viaWriteResultWrapperCommitter-side (primary correctness fix)
flink.batch-indexflink.checkpoint-idMAX_COMMITTED_CHECKPOINT_IDis written only on the final non-empty batch, preserving compatibility with Flink checkpoint semanticsWhy
getLastCommittedBatchIndex()is safeMAX_COMMITTED_CHECKPOINT_ID < checkpointIdis encounteredwhich marks the boundary of the previously completed checkpoint
Tests Added
testNoDuplicateWhenFlushSplitsSamePkUpdatesWithinCheckpoint
Verifies
flush(false)is a no-op and same-PK updates produce a single correct rowtestNoDuplicateWhenSchemaChangeFlushSplitsSamePkUpdates
Verifies schema-change split produces correct dedup via batch ordering
testRetryAfterPartialBatchCommit
Verifies idempotent recovery when batch 0 is already committed and retry occurs
testNoDuplicateWithMultipleSchemaChangesInOneCheckpoint
Verifies correctness across multiple sequential batches
testSchemaChangeFlushDoesNotAffectOtherTable
Verifies schema-change flush is scoped to the affected table only
Notes
batchIndexto0, preserving compatibility with current usage