From fab1de7900a5500ff40f77efebfc0d47b00e6cf9 Mon Sep 17 00:00:00 2001 From: Evan Date: Sun, 19 Apr 2026 23:25:57 +0200 Subject: [PATCH] [flink] Support batch read for tables without datalake enabled Enable Flink batch execution mode for Fluss tables that do not have table.datalake.enabled set to true. Previously, batch queries on non-lake tables threw UnsupportedOperationException. Now, log-only tables produce LogSplits with stopping offsets, and primary-key tables produce HybridSnapshotLogSplits with bounded log tails. Closes #40 Co-authored-by: binary-signal Co-authored-by: Claude Signed-off-by: Evan --- .../fluss/flink/source/FlinkTableSource.java | 7 +- .../enumerator/FlinkSourceEnumerator.java | 140 +++++++++++++++++- .../source/reader/FlinkSourceSplitReader.java | 41 ++--- .../source/split/HybridSnapshotLogSplit.java | 65 +++++++- .../split/HybridSnapshotLogSplitState.java | 3 +- .../source/split/SourceSplitSerializer.java | 15 +- .../source/FlinkTableSourceBatchITCase.java | 50 +++---- .../enumerator/FlinkSourceEnumeratorTest.java | 117 +++++++++++++++ .../split/SourceSplitSerializerTest.java | 60 ++++++++ .../source/state/SourceSplitStateTest.java | 28 ++++ 10 files changed, 462 insertions(+), 64 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 01cca4ee7c..a7f3398c0f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -369,8 +369,7 @@ public boolean isBounded() { leaseContext); if (!streaming) { - // return a bounded source provide to make planner happy, - // but this should throw exception when used to create source + // return a bounded source provider for batch execution mode return new SourceProvider() { @Override public boolean isBounded() { @@ -385,10 +384,6 @@ public boolean isBounded() { + modificationScanType + " statement with conditions on primary key."); } - if (!isDataLakeEnabled) { - throw new UnsupportedOperationException( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); - } return source; } }; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index b537a81224..ca39761260 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -26,7 +26,6 @@ import org.apache.fluss.client.initializer.OffsetsInitializer.BucketOffsetsRetriever; import org.apache.fluss.client.initializer.SnapshotOffsetsInitializer; import org.apache.fluss.client.metadata.KvSnapshots; -import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.UnsupportedVersionException; import org.apache.fluss.flink.lake.LakeSplitGenerator; @@ -365,11 +364,142 @@ private void startInBatchMode() { }, this::handleSplitsAdd); } else { - throw new UnsupportedOperationException( - String.format( - "Batch only supports when table option '%s' is set to true.", - ConfigOptions.TABLE_DATALAKE_ENABLED)); + if (isPartitioned) { + context.callAsync(this::initBoundedPartitionedSplits, this::handleSplitsAdd); + } else { + context.callAsync(this::initBoundedNonPartitionedSplits, this::handleSplitsAdd); + } + } + } + + private List initBoundedNonPartitionedSplits() { + if (hasPrimaryKey && startingOffsetsInitializer instanceof SnapshotOffsetsInitializer) { + return getBoundedSnapshotAndLogSplits(getLatestKvSnapshotsAndRegister(null), null); + } else { + return getBoundedLogSplits(null, null); + } + } + + private List initBoundedPartitionedSplits() { + List partitionInfos; + try { + partitionInfos = applyPartitionFilter(flussAdmin.listPartitionInfos(tablePath).get()); + } catch (Exception e) { + throw new FlinkRuntimeException( + String.format("Failed to list partitions for %s", tablePath), + ExceptionUtils.stripCompletionException(e)); + } + + List splits = new ArrayList<>(); + for (PartitionInfo partitionInfo : partitionInfos) { + long partitionId = partitionInfo.getPartitionId(); + String partitionName = partitionInfo.getPartitionName(); + if (hasPrimaryKey && startingOffsetsInitializer instanceof SnapshotOffsetsInitializer) { + splits.addAll( + getBoundedSnapshotAndLogSplits( + getLatestKvSnapshotsAndRegister(partitionName), partitionName)); + } else { + splits.addAll(getBoundedLogSplits(partitionId, partitionName)); + } + } + return splits; + } + + private List getBoundedLogSplits( + @Nullable Long partitionId, @Nullable String partitionName) { + List splits = new ArrayList<>(); + List bucketsNeedInitOffset = new ArrayList<>(); + for (int bucketId = 0; bucketId < tableInfo.getNumBuckets(); bucketId++) { + TableBucket tableBucket = + new TableBucket(tableInfo.getTableId(), partitionId, bucketId); + if (ignoreTableBucket(tableBucket)) { + continue; + } + bucketsNeedInitOffset.add(bucketId); + } + + if (!bucketsNeedInitOffset.isEmpty()) { + Map startingOffsets = + startingOffsetsInitializer.getBucketOffsets( + partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever); + Map stoppingOffsets = + stoppingOffsetsInitializer.getBucketOffsets( + partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever); + startingOffsets.forEach( + (bucketId, startingOffset) -> + splits.add( + new LogSplit( + new TableBucket( + tableInfo.getTableId(), partitionId, bucketId), + partitionName, + startingOffset, + stoppingOffsets.getOrDefault( + bucketId, LogSplit.NO_STOPPING_OFFSET)))); + } + return splits; + } + + private List getBoundedSnapshotAndLogSplits( + KvSnapshots snapshots, @Nullable String partitionName) { + long tableId = snapshots.getTableId(); + Long partitionId = snapshots.getPartitionId(); + List splits = new ArrayList<>(); + List bucketsNeedInitOffset = new ArrayList<>(); + + // Collect all bucket IDs for stopping offset retrieval + List allBucketIds = new ArrayList<>(); + for (Integer bucketId : snapshots.getBucketIds()) { + TableBucket tb = new TableBucket(tableId, partitionId, bucketId); + if (!ignoreTableBucket(tb)) { + allBucketIds.add(bucketId); + } } + + Map stoppingOffsets = + allBucketIds.isEmpty() + ? Collections.emptyMap() + : stoppingOffsetsInitializer.getBucketOffsets( + partitionName, allBucketIds, bucketOffsetsRetriever); + + for (Integer bucketId : allBucketIds) { + TableBucket tb = new TableBucket(tableId, partitionId, bucketId); + OptionalLong snapshotId = snapshots.getSnapshotId(bucketId); + long stoppingOffset = + stoppingOffsets.getOrDefault( + bucketId, HybridSnapshotLogSplit.NO_STOPPING_OFFSET); + if (snapshotId.isPresent()) { + OptionalLong logOffset = snapshots.getLogOffset(bucketId); + checkState( + logOffset.isPresent(), + "Log offset should be present if snapshot id is present."); + splits.add( + new HybridSnapshotLogSplit( + tb, + partitionName, + snapshotId.getAsLong(), + logOffset.getAsLong(), + stoppingOffset)); + } else { + bucketsNeedInitOffset.add(bucketId); + } + } + + if (!bucketsNeedInitOffset.isEmpty()) { + startingOffsetsInitializer + .getBucketOffsets(partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever) + .forEach( + (bucketId, startingOffset) -> + splits.add( + new LogSplit( + new TableBucket(tableId, partitionId, bucketId), + partitionName, + startingOffset, + stoppingOffsets.getOrDefault( + bucketId, + LogSplit.NO_STOPPING_OFFSET)))); + } + + return splits; } private void startInStreamModeForNonPartitionedTable() { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index 90f2b6e9a6..1b4ef1b55d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -250,24 +250,29 @@ private void subscribeLog(SourceSplitBase split, long startingOffset) { // assign bucket offset dynamically TableBucket tableBucket = split.getTableBucket(); boolean isEmptyLogSplit = false; + + // Extract stopping offset from LogSplit or HybridSnapshotLogSplit + Optional stoppingOffsetOpt = Optional.empty(); if (split instanceof LogSplit) { - LogSplit logSplit = split.asLogSplit(); - Optional stoppingOffsetOpt = logSplit.getStoppingOffset(); - if (stoppingOffsetOpt.isPresent()) { - Long stoppingOffset = stoppingOffsetOpt.get(); - if (startingOffset >= stoppingOffset) { - // is empty log splits as no log record can be fetched - emptyLogSplits.add(split.splitId()); - isEmptyLogSplit = true; - } else if (stoppingOffset >= 0) { - stoppingOffsets.put(tableBucket, stoppingOffset); - } else { - // This should not happen. - throw new FlinkRuntimeException( - String.format( - "Invalid stopping offset %d for bucket %s", - stoppingOffset, tableBucket)); - } + stoppingOffsetOpt = split.asLogSplit().getStoppingOffset(); + } else if (split instanceof HybridSnapshotLogSplit) { + stoppingOffsetOpt = split.asHybridSnapshotLogSplit().getLogStoppingOffset(); + } + + if (stoppingOffsetOpt.isPresent()) { + Long stoppingOffset = stoppingOffsetOpt.get(); + if (startingOffset >= stoppingOffset) { + // is empty log splits as no log record can be fetched + emptyLogSplits.add(split.splitId()); + isEmptyLogSplit = true; + } else if (stoppingOffset >= 0) { + stoppingOffsets.put(tableBucket, stoppingOffset); + } else { + // This should not happen. + throw new FlinkRuntimeException( + String.format( + "Invalid stopping offset %d for bucket %s", + stoppingOffset, tableBucket)); } } @@ -276,7 +281,7 @@ private void subscribeLog(SourceSplitBase split, long startingOffset) { "Skip to read log for split {} since the split is empty with starting offset {}, stopping offset {}.", split.splitId(), startingOffset, - split.asLogSplit().getStoppingOffset().get()); + stoppingOffsetOpt.get()); } else { Long partitionId = tableBucket.getPartitionId(); int bucket = tableBucket.getBucket(); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/HybridSnapshotLogSplit.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/HybridSnapshotLogSplit.java index 5c8f30e282..1a47f764d1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/HybridSnapshotLogSplit.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/HybridSnapshotLogSplit.java @@ -17,11 +17,13 @@ package org.apache.fluss.flink.source.split; +import org.apache.fluss.client.table.scanner.log.LogScanner; import org.apache.fluss.metadata.TableBucket; import javax.annotation.Nullable; import java.util.Objects; +import java.util.Optional; /** * The hybrid split for first reading the snapshot files and then switch to read the cdc log from a @@ -32,16 +34,42 @@ */ public class HybridSnapshotLogSplit extends SnapshotSplit { + public static final long NO_STOPPING_OFFSET = LogScanner.NO_STOPPING_OFFSET; + private static final String HYBRID_SPLIT_PREFIX = "hybrid-snapshot-log-"; private final boolean isSnapshotFinished; private final long logStartingOffset; + private final long logStoppingOffset; public HybridSnapshotLogSplit( TableBucket tableBucket, @Nullable String partitionName, long snapshotId, long logStartingOffset) { - this(tableBucket, partitionName, snapshotId, 0, false, logStartingOffset); + this( + tableBucket, + partitionName, + snapshotId, + 0, + false, + logStartingOffset, + NO_STOPPING_OFFSET); + } + + public HybridSnapshotLogSplit( + TableBucket tableBucket, + @Nullable String partitionName, + long snapshotId, + long logStartingOffset, + long logStoppingOffset) { + this( + tableBucket, + partitionName, + snapshotId, + 0, + false, + logStartingOffset, + logStoppingOffset); } public HybridSnapshotLogSplit( @@ -51,15 +79,42 @@ public HybridSnapshotLogSplit( long recordsToSkip, boolean isSnapshotFinished, long logStartingOffset) { + this( + tableBucket, + partitionName, + snapshotId, + recordsToSkip, + isSnapshotFinished, + logStartingOffset, + NO_STOPPING_OFFSET); + } + + public HybridSnapshotLogSplit( + TableBucket tableBucket, + @Nullable String partitionName, + long snapshotId, + long recordsToSkip, + boolean isSnapshotFinished, + long logStartingOffset, + long logStoppingOffset) { super(tableBucket, partitionName, snapshotId, recordsToSkip); this.isSnapshotFinished = isSnapshotFinished; this.logStartingOffset = logStartingOffset; + this.logStoppingOffset = logStoppingOffset; } public long getLogStartingOffset() { return logStartingOffset; } + public Optional getLogStoppingOffset() { + return logStoppingOffset >= 0 ? Optional.of(logStoppingOffset) : Optional.empty(); + } + + public long getLogStoppingOffsetRaw() { + return logStoppingOffset; + } + public boolean isSnapshotFinished() { return isSnapshotFinished; } @@ -82,12 +137,14 @@ public boolean equals(Object o) { } HybridSnapshotLogSplit that = (HybridSnapshotLogSplit) o; return isSnapshotFinished == that.isSnapshotFinished - && logStartingOffset == that.logStartingOffset; + && logStartingOffset == that.logStartingOffset + && logStoppingOffset == that.logStoppingOffset; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), isSnapshotFinished, logStartingOffset); + return Objects.hash( + super.hashCode(), isSnapshotFinished, logStartingOffset, logStoppingOffset); } @Override @@ -103,6 +160,8 @@ public String toString() { + isSnapshotFinished + ", logStartingOffset=" + logStartingOffset + + ", logStoppingOffset=" + + logStoppingOffset + ", recordsToSkip=" + recordsToSkip + '}'; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/HybridSnapshotLogSplitState.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/HybridSnapshotLogSplitState.java index 6958518b53..2908091e53 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/HybridSnapshotLogSplitState.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/HybridSnapshotLogSplitState.java @@ -44,7 +44,8 @@ public HybridSnapshotLogSplit toSourceSplit() { hybridSnapshotLogSplit.getSnapshotId(), recordsToSkip, snapshotFinished, - nextOffset); + nextOffset, + hybridSnapshotLogSplit.getLogStoppingOffsetRaw()); } public void setRecordsToSkip(long recordsToSkip) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java index 7ee92cef9e..aa6f420506 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/split/SourceSplitSerializer.java @@ -36,6 +36,8 @@ public class SourceSplitSerializer implements SimpleVersionedSerializer { private static final int VERSION_0 = 0; + /** Version 1 adds logStoppingOffset to HybridSnapshotLogSplit. */ + private static final int VERSION_1 = 1; private static final ThreadLocal SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); @@ -43,7 +45,7 @@ public class SourceSplitSerializer implements SimpleVersionedSerializer lakeSource; @@ -75,6 +77,8 @@ public byte[] serialize(SourceSplitBase split) throws IOException { out.writeBoolean(hybridSnapshotLogSplit.isSnapshotFinished()); // write log starting offset out.writeLong(hybridSnapshotLogSplit.getLogStartingOffset()); + // write log stopping offset (added in VERSION_1) + out.writeLong(hybridSnapshotLogSplit.getLogStoppingOffsetRaw()); } else { LogSplit logSplit = split.asLogSplit(); // write starting offset @@ -111,7 +115,7 @@ private void serializeSourceSplitBase(DataOutputSerializer out, SourceSplitBase @Override public SourceSplitBase deserialize(int version, byte[] serialized) throws IOException { - if (version != VERSION_0) { + if (version != VERSION_0 && version != VERSION_1) { throw new IOException("Unknown version " + version); } final DataInputDeserializer in = new DataInputDeserializer(serialized); @@ -133,13 +137,18 @@ public SourceSplitBase deserialize(int version, byte[] serialized) throws IOExce long recordsToSkip = in.readLong(); boolean isSnapshotFinished = in.readBoolean(); long logStartingOffset = in.readLong(); + long logStoppingOffset = + version >= VERSION_1 + ? in.readLong() + : HybridSnapshotLogSplit.NO_STOPPING_OFFSET; return new HybridSnapshotLogSplit( tableBucket, partitionName, snapshotId, recordsToSkip, isSnapshotFinished, - logStartingOffset); + logStartingOffset, + logStoppingOffset); } else if (splitKind == LOG_SPLIT_FLAG) { long startingOffset = in.readLong(); long stoppingOffset = in.readLong(); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java index f449477df6..2ebbefcac4 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java @@ -239,16 +239,14 @@ void testFilterOnLookupSource() throws Exception { } @Test - void testScanSingleRowFilterException() throws Exception { + void testScanWithPartialPrimaryKeyFilter() throws Exception { String tableName = prepareSourceTable(new String[] {"id", "name"}, null); + // query with partial primary key filter (id only, PK is (id, name)) + // should work as a batch scan with filter String query = String.format("SELECT * FROM %s WHERE id = 1", tableName); - - // doesn't have all condition for primary key, doesn't support to execute - assertThatThrownBy(() -> tEnv.explainSql(query)) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessage( - "Currently, Fluss only support queries on table with datalake enabled" - + " or point queries on primary key when it's in batch execution mode."); + CloseableIterator collected = tEnv.executeSql(query).collect(); + List expected = Collections.singletonList("+I[1, address1, name1]"); + assertResultsIgnoreOrder(collected, expected, true); } @Test @@ -368,16 +366,13 @@ void testCountPushDownForPkTable(boolean partitionTable) throws Exception { List expected = Collections.singletonList("+I[5]"); assertThat(collected).isEqualTo(expected); - // test not push down grouping count. - assertThatThrownBy( - () -> - tEnv.explainSql( - String.format( - "SELECT COUNT(*) FROM %s group by id", - tableName)) - .wait()) - .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + // test grouped count is not pushed down but still works via batch scan + String groupQuery = String.format("SELECT COUNT(*) FROM %s group by id", tableName); + assertThat(tEnv.explainSql(groupQuery)).doesNotContain("Count1AggFunction"); + CloseableIterator groupedRows = tEnv.executeSql(groupQuery).collect(); + List groupedCollected = collectRowsWithTimeout(groupedRows, 5); + // each id has exactly 1 row, so each group count is 1 + assertThat(groupedCollected).allMatch(row -> row.equals("+I[1]")); } @Test @@ -416,16 +411,15 @@ void testCountPushDownForLogTable(boolean partitionTable) throws Exception { List expected = Collections.singletonList(String.format("+I[%s]", expectedRows)); assertThat(collected).isEqualTo(expected); - // test not push down grouping count. - assertThatThrownBy( - () -> - tEnv.explainSql( - String.format( - "SELECT COUNT(*) FROM %s group by id", - tableName)) - .wait()) - .hasMessageContaining( - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); + // test grouped count is not pushed down but still works via batch scan + String groupQuery = String.format("SELECT COUNT(*) FROM %s group by id", tableName); + assertThat(tEnv.explainSql(groupQuery)).doesNotContain("Count1AggFunction"); + CloseableIterator groupedRows = tEnv.executeSql(groupQuery).collect(); + int expectedGroupCount = partitionTable ? 2 : 1; + List groupedCollected = + collectRowsWithTimeout(groupedRows, expectedRows / expectedGroupCount); + assertThat(groupedCollected) + .allMatch(row -> row.equals(String.format("+I[%s]", expectedGroupCount))); } private String prepareSourceTable(String[] keys, String partitionedKey) throws Exception { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index ca4722c9e6..2ad38eba1a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -910,6 +910,123 @@ private void runPeriodicPartitionDiscovery(MockWorkExecutor workExecutor) throws } } + @Test + void testBatchModeNonLakeLogTable() throws Throwable { + int numSubtasks = 3; + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .build(); + + TableDescriptor nonPkTableDescriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(DEFAULT_BUCKET_NUM, "id") + .build(); + + TablePath path = TablePath.of(DEFAULT_DB, "test-batch-log-table"); + admin.createTable(path, nonPkTableDescriptor, true).get(); + long tableId = admin.getTableInfo(path).get().getTableId(); + + // write some data so log segments exist + List rows = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + rows.add(row(i, "v" + i)); + } + writeRows(conn, path, rows, true); + + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(numSubtasks)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + path, + flussConf, + false, + false, + context, + OffsetsInitializer.earliest(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, // batch mode + null, + null, + LeaseContext.DEFAULT, + false); + + enumerator.start(); + + for (int i = 0; i < numSubtasks; i++) { + registerReader(context, enumerator, i); + } + + context.runNextOneTimeCallable(); + + Map> actualAssignment = + getLastReadersAssignments(context); + + // verify all splits are LogSplit with stopping offsets + for (List splits : actualAssignment.values()) { + for (SourceSplitBase split : splits) { + assertThat(split).isInstanceOf(LogSplit.class); + LogSplit logSplit = (LogSplit) split; + // stopping offset should be present (>= 0) + assertThat(logSplit.getStoppingOffset()).isPresent(); + } + } + + // verify no more splits is signaled + assertThat(context.getSplitsAssignmentSequence()).isNotEmpty(); + } + } + + @Test + void testBatchModeNonLakePkTable() throws Throwable { + long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + int numSubtasks = 3; + // write data and trigger snapshot + putRows(DEFAULT_TABLE_PATH, 10); + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(DEFAULT_TABLE_PATH); + + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(numSubtasks)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + true, + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + false, // batch mode + null, + null, + LeaseContext.DEFAULT, + false); + + enumerator.start(); + + for (int i = 0; i < numSubtasks; i++) { + registerReader(context, enumerator, i); + } + + context.runNextOneTimeCallable(); + + Map> actualAssignment = + getLastReadersAssignments(context); + + // verify splits are HybridSnapshotLogSplit with stopping offsets + for (List splits : actualAssignment.values()) { + for (SourceSplitBase split : splits) { + assertThat(split).isInstanceOf(HybridSnapshotLogSplit.class); + HybridSnapshotLogSplit hybridSplit = (HybridSnapshotLogSplit) split; + // stopping offset should be present for batch mode + assertThat(hybridSplit.getLogStoppingOffset()).isPresent(); + } + } + } + } + private LogSplit genLogSplit(long tableId, int bucketId) { return new LogSplit(new TableBucket(tableId, bucketId), null, -2L); } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/split/SourceSplitSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/split/SourceSplitSerializerTest.java index 54a3207807..509ddcdea1 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/split/SourceSplitSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/split/SourceSplitSerializerTest.java @@ -19,6 +19,8 @@ import org.apache.fluss.metadata.TableBucket; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -59,6 +61,64 @@ void testHybridSnapshotLogSplitSerde(boolean isPartitioned) throws Exception { assertThat(deserializedSplit).isEqualTo(split); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHybridSnapshotLogSplitSerdeWithStoppingOffset(boolean isPartitioned) throws Exception { + TableBucket bucket = isPartitioned ? partitionedTableBucket : tableBucket; + String partitionName = isPartitioned ? "2024" : null; + + // test with explicit stopping offset + HybridSnapshotLogSplit split = + new HybridSnapshotLogSplit(bucket, partitionName, 100, 50L, 500L); + byte[] serialized = serializer.serialize(split); + SourceSplitBase deserializedSplit = + serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedSplit).isEqualTo(split); + + // test with NO_STOPPING_OFFSET (default) + split = new HybridSnapshotLogSplit(bucket, partitionName, 100, 50L); + serialized = serializer.serialize(split); + deserializedSplit = serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedSplit).isEqualTo(split); + assertThat(((HybridSnapshotLogSplit) deserializedSplit).getLogStoppingOffset()).isEmpty(); + + // test with snapshot finished and stopping offset + split = new HybridSnapshotLogSplit(bucket, partitionName, 100, 3, true, 50L, 500L); + serialized = serializer.serialize(split); + deserializedSplit = serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedSplit).isEqualTo(split); + assertThat(((HybridSnapshotLogSplit) deserializedSplit).getLogStoppingOffset()) + .hasValue(500L); + } + + @Test + void testHybridSnapshotLogSplitV0BackwardCompat() throws Exception { + // Manually serialize in VERSION_0 format (without logStoppingOffset) + DataOutputSerializer out = new DataOutputSerializer(64); + out.writeByte(1); // HYBRID_SNAPSHOT_SPLIT_FLAG + // table bucket (tableId=1, no partition, bucketId=2) + out.writeLong(1L); + out.writeBoolean(false); + out.writeInt(2); + // snapshot fields + out.writeLong(100L); // snapshotId + out.writeLong(3L); // recordsToSkip + out.writeBoolean(true); // isSnapshotFinished + out.writeLong(50L); // logStartingOffset + byte[] v0Bytes = out.getCopyOfBuffer(); + + // Deserialize with VERSION_0 + SourceSplitBase deserialized = serializer.deserialize(0, v0Bytes); + assertThat(deserialized).isInstanceOf(HybridSnapshotLogSplit.class); + HybridSnapshotLogSplit split = (HybridSnapshotLogSplit) deserialized; + assertThat(split.getSnapshotId()).isEqualTo(100L); + assertThat(split.recordsToSkip()).isEqualTo(3L); + assertThat(split.isSnapshotFinished()).isTrue(); + assertThat(split.getLogStartingOffset()).isEqualTo(50L); + // logStoppingOffset should default to NO_STOPPING_OFFSET + assertThat(split.getLogStoppingOffset()).isEmpty(); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testLogSplitSerde(boolean isPartitioned) throws Exception { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceSplitStateTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceSplitStateTest.java index e122a13ee0..17aa24086f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceSplitStateTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceSplitStateTest.java @@ -94,4 +94,32 @@ void testHybridSnapshotLogSplitState() { assertThat(hybridSnapshotLogSplitState.toSourceSplit()) .isEqualTo(expectedHybridSnapshotLogSplit); } + + @Test + void testHybridSnapshotLogSplitStateWithStoppingOffset() { + TableBucket tableBucket = new TableBucket(0, 0L, 0); + + // verify stopping offset is preserved through state transitions + HybridSnapshotLogSplit hybridSnapshotLogSplit = + new HybridSnapshotLogSplit(tableBucket, "partition1", 1L, 100L, 1000L); + HybridSnapshotLogSplitState hybridSnapshotLogSplitState = + new HybridSnapshotLogSplitState(hybridSnapshotLogSplit); + assertThat(hybridSnapshotLogSplitState.toSourceSplit()).isEqualTo(hybridSnapshotLogSplit); + + // set records to skip — stopping offset should be preserved + hybridSnapshotLogSplitState.setRecordsToSkip(200L); + HybridSnapshotLogSplit expectedSplit = + new HybridSnapshotLogSplit(tableBucket, "partition1", 1L, 200L, false, 100L, 1000L); + assertThat(hybridSnapshotLogSplitState.toSourceSplit()).isEqualTo(expectedSplit); + + // set next offset (transitions to log phase) — stopping offset should be preserved + hybridSnapshotLogSplitState.setNextOffset(500L); + expectedSplit = + new HybridSnapshotLogSplit(tableBucket, "partition1", 1L, 200L, true, 500L, 1000L); + assertThat(hybridSnapshotLogSplitState.toSourceSplit()).isEqualTo(expectedSplit); + assertThat( + ((HybridSnapshotLogSplit) hybridSnapshotLogSplitState.toSourceSplit()) + .getLogStoppingOffset()) + .hasValue(1000L); + } }