Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,7 @@ public boolean isBounded() {
leaseContext);

if (!streaming) {
// return a bounded source provide to make planner happy,
// but this should throw exception when used to create source
// return a bounded source provider for batch execution mode
return new SourceProvider() {
@Override
public boolean isBounded() {
Expand All @@ -386,10 +385,6 @@ public boolean isBounded() {
+ modificationScanType
+ " statement with conditions on primary key.");
}
if (!isDataLakeEnabled) {
throw new UnsupportedOperationException(
"Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode.");
}
return source;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.fluss.client.initializer.OffsetsInitializer.BucketOffsetsRetriever;
import org.apache.fluss.client.initializer.SnapshotOffsetsInitializer;
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.UnsupportedVersionException;
import org.apache.fluss.flink.lake.LakeSplitGenerator;
Expand Down Expand Up @@ -369,11 +368,142 @@ private void startInBatchMode() {
},
this::handleSplitsAdd);
} else {
throw new UnsupportedOperationException(
String.format(
"Batch only supports when table option '%s' is set to true.",
ConfigOptions.TABLE_DATALAKE_ENABLED));
if (isPartitioned) {
context.callAsync(this::initBoundedPartitionedSplits, this::handleSplitsAdd);
} else {
context.callAsync(this::initBoundedNonPartitionedSplits, this::handleSplitsAdd);
}
}
}

private List<SourceSplitBase> initBoundedNonPartitionedSplits() {
if (hasPrimaryKey && startingOffsetsInitializer instanceof SnapshotOffsetsInitializer) {
return getBoundedSnapshotAndLogSplits(getLatestKvSnapshotsAndRegister(null), null);
} else {
return getBoundedLogSplits(null, null);
}
}

private List<SourceSplitBase> initBoundedPartitionedSplits() {
List<PartitionInfo> partitionInfos;
try {
partitionInfos = applyPartitionFilter(flussAdmin.listPartitionInfos(tablePath).get());
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Failed to list partitions for %s", tablePath),
ExceptionUtils.stripCompletionException(e));
}

List<SourceSplitBase> splits = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
long partitionId = partitionInfo.getPartitionId();
String partitionName = partitionInfo.getPartitionName();
if (hasPrimaryKey && startingOffsetsInitializer instanceof SnapshotOffsetsInitializer) {
splits.addAll(
getBoundedSnapshotAndLogSplits(
getLatestKvSnapshotsAndRegister(partitionName), partitionName));
} else {
splits.addAll(getBoundedLogSplits(partitionId, partitionName));
}
}
return splits;
}

private List<SourceSplitBase> getBoundedLogSplits(
@Nullable Long partitionId, @Nullable String partitionName) {
List<SourceSplitBase> splits = new ArrayList<>();
List<Integer> bucketsNeedInitOffset = new ArrayList<>();
for (int bucketId = 0; bucketId < tableInfo.getNumBuckets(); bucketId++) {
TableBucket tableBucket =
new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
if (ignoreTableBucket(tableBucket)) {
continue;
}
bucketsNeedInitOffset.add(bucketId);
}

if (!bucketsNeedInitOffset.isEmpty()) {
Map<Integer, Long> startingOffsets =
startingOffsetsInitializer.getBucketOffsets(
partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever);
Map<Integer, Long> stoppingOffsets =
stoppingOffsetsInitializer.getBucketOffsets(
partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever);
startingOffsets.forEach(
(bucketId, startingOffset) ->
splits.add(
new LogSplit(
new TableBucket(
tableInfo.getTableId(), partitionId, bucketId),
partitionName,
startingOffset,
stoppingOffsets.getOrDefault(
bucketId, LogSplit.NO_STOPPING_OFFSET))));
}
return splits;
}

private List<SourceSplitBase> getBoundedSnapshotAndLogSplits(
KvSnapshots snapshots, @Nullable String partitionName) {
long tableId = snapshots.getTableId();
Long partitionId = snapshots.getPartitionId();
List<SourceSplitBase> splits = new ArrayList<>();
List<Integer> bucketsNeedInitOffset = new ArrayList<>();

// Collect all bucket IDs for stopping offset retrieval
List<Integer> allBucketIds = new ArrayList<>();
for (Integer bucketId : snapshots.getBucketIds()) {
TableBucket tb = new TableBucket(tableId, partitionId, bucketId);
if (!ignoreTableBucket(tb)) {
allBucketIds.add(bucketId);
}
}

Map<Integer, Long> stoppingOffsets =
allBucketIds.isEmpty()
? Collections.emptyMap()
: stoppingOffsetsInitializer.getBucketOffsets(
partitionName, allBucketIds, bucketOffsetsRetriever);

for (Integer bucketId : allBucketIds) {
TableBucket tb = new TableBucket(tableId, partitionId, bucketId);
OptionalLong snapshotId = snapshots.getSnapshotId(bucketId);
long stoppingOffset =
stoppingOffsets.getOrDefault(
bucketId, HybridSnapshotLogSplit.NO_STOPPING_OFFSET);
if (snapshotId.isPresent()) {
OptionalLong logOffset = snapshots.getLogOffset(bucketId);
checkState(
logOffset.isPresent(),
"Log offset should be present if snapshot id is present.");
splits.add(
new HybridSnapshotLogSplit(
tb,
partitionName,
snapshotId.getAsLong(),
logOffset.getAsLong(),
stoppingOffset));
} else {
bucketsNeedInitOffset.add(bucketId);
}
}

if (!bucketsNeedInitOffset.isEmpty()) {
startingOffsetsInitializer
.getBucketOffsets(partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever)
.forEach(
(bucketId, startingOffset) ->
splits.add(
new LogSplit(
new TableBucket(tableId, partitionId, bucketId),
partitionName,
startingOffset,
stoppingOffsets.getOrDefault(
bucketId,
LogSplit.NO_STOPPING_OFFSET))));
}

return splits;
}

private void startInStreamModeForNonPartitionedTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,24 +250,29 @@ private void subscribeLog(SourceSplitBase split, long startingOffset) {
// assign bucket offset dynamically
TableBucket tableBucket = split.getTableBucket();
boolean isEmptyLogSplit = false;

// Extract stopping offset from LogSplit or HybridSnapshotLogSplit
Optional<Long> stoppingOffsetOpt = Optional.empty();
if (split instanceof LogSplit) {
LogSplit logSplit = split.asLogSplit();
Optional<Long> stoppingOffsetOpt = logSplit.getStoppingOffset();
if (stoppingOffsetOpt.isPresent()) {
Long stoppingOffset = stoppingOffsetOpt.get();
if (startingOffset >= stoppingOffset) {
// is empty log splits as no log record can be fetched
emptyLogSplits.add(split.splitId());
isEmptyLogSplit = true;
} else if (stoppingOffset >= 0) {
stoppingOffsets.put(tableBucket, stoppingOffset);
} else {
// This should not happen.
throw new FlinkRuntimeException(
String.format(
"Invalid stopping offset %d for bucket %s",
stoppingOffset, tableBucket));
}
stoppingOffsetOpt = split.asLogSplit().getStoppingOffset();
} else if (split instanceof HybridSnapshotLogSplit) {
stoppingOffsetOpt = split.asHybridSnapshotLogSplit().getLogStoppingOffset();
}

if (stoppingOffsetOpt.isPresent()) {
Long stoppingOffset = stoppingOffsetOpt.get();
if (startingOffset >= stoppingOffset) {
// is empty log splits as no log record can be fetched
emptyLogSplits.add(split.splitId());
isEmptyLogSplit = true;
} else if (stoppingOffset >= 0) {
stoppingOffsets.put(tableBucket, stoppingOffset);
} else {
// This should not happen.
throw new FlinkRuntimeException(
String.format(
"Invalid stopping offset %d for bucket %s",
stoppingOffset, tableBucket));
}
}

Expand All @@ -276,7 +281,7 @@ private void subscribeLog(SourceSplitBase split, long startingOffset) {
"Skip to read log for split {} since the split is empty with starting offset {}, stopping offset {}.",
split.splitId(),
startingOffset,
split.asLogSplit().getStoppingOffset().get());
stoppingOffsetOpt.get());
} else {
Long partitionId = tableBucket.getPartitionId();
int bucket = tableBucket.getBucket();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.fluss.flink.source.split;

import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.metadata.TableBucket;

import javax.annotation.Nullable;

import java.util.Objects;
import java.util.Optional;

/**
* The hybrid split for first reading the snapshot files and then switch to read the cdc log from a
Expand All @@ -32,16 +34,42 @@
*/
public class HybridSnapshotLogSplit extends SnapshotSplit {

public static final long NO_STOPPING_OFFSET = LogScanner.NO_STOPPING_OFFSET;

private static final String HYBRID_SPLIT_PREFIX = "hybrid-snapshot-log-";
private final boolean isSnapshotFinished;
private final long logStartingOffset;
private final long logStoppingOffset;

public HybridSnapshotLogSplit(
TableBucket tableBucket,
@Nullable String partitionName,
long snapshotId,
long logStartingOffset) {
this(tableBucket, partitionName, snapshotId, 0, false, logStartingOffset);
this(
tableBucket,
partitionName,
snapshotId,
0,
false,
logStartingOffset,
NO_STOPPING_OFFSET);
}

public HybridSnapshotLogSplit(
TableBucket tableBucket,
@Nullable String partitionName,
long snapshotId,
long logStartingOffset,
long logStoppingOffset) {
this(
tableBucket,
partitionName,
snapshotId,
0,
false,
logStartingOffset,
logStoppingOffset);
}

public HybridSnapshotLogSplit(
Expand All @@ -51,15 +79,42 @@ public HybridSnapshotLogSplit(
long recordsToSkip,
boolean isSnapshotFinished,
long logStartingOffset) {
this(
tableBucket,
partitionName,
snapshotId,
recordsToSkip,
isSnapshotFinished,
logStartingOffset,
NO_STOPPING_OFFSET);
}

public HybridSnapshotLogSplit(
TableBucket tableBucket,
@Nullable String partitionName,
long snapshotId,
long recordsToSkip,
boolean isSnapshotFinished,
long logStartingOffset,
long logStoppingOffset) {
super(tableBucket, partitionName, snapshotId, recordsToSkip);
this.isSnapshotFinished = isSnapshotFinished;
this.logStartingOffset = logStartingOffset;
this.logStoppingOffset = logStoppingOffset;
}

public long getLogStartingOffset() {
return logStartingOffset;
}

public Optional<Long> getLogStoppingOffset() {
return logStoppingOffset >= 0 ? Optional.of(logStoppingOffset) : Optional.empty();
}

public long getLogStoppingOffsetRaw() {
return logStoppingOffset;
}

public boolean isSnapshotFinished() {
return isSnapshotFinished;
}
Expand All @@ -82,12 +137,14 @@ public boolean equals(Object o) {
}
HybridSnapshotLogSplit that = (HybridSnapshotLogSplit) o;
return isSnapshotFinished == that.isSnapshotFinished
&& logStartingOffset == that.logStartingOffset;
&& logStartingOffset == that.logStartingOffset
&& logStoppingOffset == that.logStoppingOffset;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), isSnapshotFinished, logStartingOffset);
return Objects.hash(
super.hashCode(), isSnapshotFinished, logStartingOffset, logStoppingOffset);
}

@Override
Expand All @@ -103,6 +160,8 @@ public String toString() {
+ isSnapshotFinished
+ ", logStartingOffset="
+ logStartingOffset
+ ", logStoppingOffset="
+ logStoppingOffset
+ ", recordsToSkip="
+ recordsToSkip
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public HybridSnapshotLogSplit toSourceSplit() {
hybridSnapshotLogSplit.getSnapshotId(),
recordsToSkip,
snapshotFinished,
nextOffset);
nextOffset,
hybridSnapshotLogSplit.getLogStoppingOffsetRaw());
}

public void setRecordsToSkip(long recordsToSkip) {
Expand Down
Loading
Loading