Skip to content

[server] Optimize RemoteLogFetcher with async prefetch for recovery#3132

Open
Kaixuan-Duan wants to merge 4 commits intoapache:mainfrom
Kaixuan-Duan:remote-log-fetcher-prefetch
Open

[server] Optimize RemoteLogFetcher with async prefetch for recovery#3132
Kaixuan-Duan wants to merge 4 commits intoapache:mainfrom
Kaixuan-Duan:remote-log-fetcher-prefetch

Conversation

@Kaixuan-Duan
Copy link
Copy Markdown

@Kaixuan-Duan Kaixuan-Duan commented Apr 19, 2026

Purpose

Linked issue: close #3091
This PR improves KV recovery performance by reducing wait time between remote log segments in RemoteLogFetcher.

Brief change log

  • Async prefetch window in RemoteLogFetcher: added a sliding window that downloads the next N remote log segments in the background while the consumer reads the current one, replacing the previous strictly-serial download → open → read → next loop.
  • Two-layer concurrency control (matches Java client / Rust client behavior):
    Prefetch limit (default 4) — caps the number of downloaded-but-not-consumed segments via a Semaphore, bounding local-disk usage.
    Download-thread limit (default 3) — caps simultaneous downloads via a dedicated bounded ExecutorService.
  • Exponential backoff retry: failed downloads retry up to 5 times with backoff 100ms → 5s and 0.25 jitter (ExponentialBackoff), preventing thrash on transient remote-storage failures while still surfacing persistent failures to the recovery driver.
  • Defensive sync fallback: when an in-flight prefetch future fails or is cancelled, fetchSegmentFile() transparently falls back to a synchronous downloadSegmentWithRetry, so a single bad segment cannot stall recovery as long as it is recoverable; if it is not, the failure is propagated and the prefetch window is drained to release permits.

Tests

  • ./mvnw -pl fluss-server spotless:apply -Dtest='RemoteLogFetcherTest' -DfailIfNoTests=false test

API and Format

  • kv.recover.remote-log.prefetch-num int (default 4) : Max remote log segments downloaded but not yet consumed during KV recovery. 1 preserves the legacy one-step-prefetch behavior.
  • kv.recover.remote-log.download-threads (default 3) : Threads used to download remote log segments during KV recovery. Should be ≤ prefetch-num.

No breaking changes to user-facing public APIs.

Documentation
No user-facing feature. No documentation update required.

Copy link
Copy Markdown
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kaixuan-Duan Thanks for the contribution. I will help to review this PR

One process point: this issue was already assigned and I was actively working on it. In that situation, please coordinate on the issue before opening an overlapping PR. Assignment is not exclusive ownership, but it is an important coordination signal, and skipping it usually leads to duplicated effort and fragmented review.

We can evaluate this PR on its merits, but for future cases please check on the issue first.

Copy link
Copy Markdown
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty, direction is right, I left some cooments, PTAL


private void cancelPrefetch() {
if (nextDownloadedSegmentFuture != null) {
nextDownloadedSegmentFuture.cancel(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancel(true) on an already-completed future is a no-op and drops the reference to the downloaded File, which then lives in tempDir until fetcher-level close()

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — I have updated the logic to handle this explicitly.

activeIterator = null;
}
} finally {
downloadExecutor.shutdownNow();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdownNow() doesn't wait - if a prefetch is mid-flush, it can write to tempDir after deleteDirectoryQuietly runs. Either downloadExecutor.awaitTermination() with a short timeout before deletion, or make downloadSegment interruption-aware (most S3 SDKs don't honor Thread.isInterrupted() during socket reads, so the interrupt from shutdownNow is effectively decorative)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I updated close() to call awaitTermination() after shutdownNow() before deleting the temp directory. If the download executor does not terminate within the timeout, the cleanup is skipped to avoid racing with an in-progress download that may still write into tempDir.

}

@Override
public boolean hasNext() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fetch() is called twice, the first Iterable still wraps the now-closed iterator and iterating it re-enters advance() on a closed instance, downloading into the shared tempDir, racing with the new iterator

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the closed iterator to stop immediately by marking it finished, clearing the pending batch, and making hasNext() return false after close.

}

private File fetchSegmentFile(RemoteLogSegment segment) throws IOException {
if (segment.equals(prefetchedSegment) && nextDownloadedSegmentFuture != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This depends on RemoteLogSegment having value-based equals(), or on both references coming from the same segments list (reference equality). Works today, but safer to compare by segment id tbh.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the prefetch match to compare remoteLogSegmentId() instead of relying on RemoteLogSegment.equals().

if (segment.equals(prefetchedSegment) && nextDownloadedSegmentFuture != null) {
try {
return nextDownloadedSegmentFuture.get();
} catch (InterruptedException e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also catch CancellationException - it's unchecked (extends RuntimeException) and CompletableFuture.get() throws it on a cancelled future. Not a live bug in the current state machine (every cancelPrefetch nulls the field) but cheap defense-in-depth, especially given closed is volatile.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this as a defensive guard: if the prefetched future is cancelled and get() throws CancellationException.

@@ -28,10 +28,13 @@
import org.junit.jupiter.api.Test;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking: Two of the three new tests inject state via reflection (setPrivateField) instead of exercising a real async prefetch - they cover the branches in fetchSegmentFile, but not close-during-real-in-flight-download or the orphan-file cleanup.

Consider one integration-style test with a real slow/failing download source.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty, Addressed comments. PTAL

"Prefetched segment {} failed, fallback to sync download.",
segment.remoteLogSegmentId(),
e.getCause());
return downloadSegment(segment);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: No retry on transient S3 failure - one flaky segment fails the entire recovery. In fluss-rust we added exponential backoff (100ms -> 5s with jitter) for this.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. The retry design in fluss-rust is solid, so I followed the same idea here and added retries around synchronous segment downloads with exponential backoff and jitter, so transient remote storage failures are retried before failing the recovery.

return downloadSegment(segment);
}

private void prefetchNextSegment() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefetch depth hardcoded to 1. If S3 p99 download time > consume time for a segment, the downloader sits idle and the optimization is half-realized. On the Rust side (fluss-rust #187) we landed on configurable depth with default 4 for exactly this reason. Since it's KV depth = 1 might be fine, but it's still better to configure and reason properly

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I optimized the prefetch depth, PTAL.

@Kaixuan-Duan
Copy link
Copy Markdown
Author

Kaixuan-Duan commented Apr 24, 2026

@fresh-borzoni Thanks for the review, and sorry for not coordinating on the issue beforehand. I didn’t realize it was already being actively worked on.
The testing code and prefetch depth optimization are still progressing, and I will learn the implementation approach of fluss-rust #187.

@Kaixuan-Duan
Copy link
Copy Markdown
Author

@fresh-borzoni Thank you for the review. Addressed, PTAL.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[server] RemoteLogFetcher optimize to async downloading

2 participants