feat: wire ParquetMergePipeline into IndexingService#6367
Merged
Conversation
4 tasks
d429436 to
468fa48
Compare
d3acd2b to
6139fca
Compare
468fa48 to
202d999
Compare
6139fca to
0cd16c2
Compare
mattmkim
approved these changes
May 1, 2026
5bf00d6 to
f7e16e0
Compare
…se 3f) Integration test that exercises the full merge actor chain: 1. Creates 2 real sorted Parquet files (via ParquetWriter with sorted_series, sort schema KV metadata, and window metadata) 2. Uploads to RamStorage 3. Seeds ParquetMergePipeline with split metadata (merge_factor=2) 4. Verifies the pipeline plans and executes a merge 5. Asserts publish_metrics_splits called with correct replaced_split_ids Also fixes TempDirectory lifetime bug: adds _scratch_directory_opt to ParquetSplitBatch so the merge executor's scratch directory stays alive until the uploader finishes reading the merged files. Without this, the temp directory was cleaned up between the executor handler returning and the uploader's async upload task reading the files. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…y comment Review findings: 1. ParquetWriterConfig was hardcoded to Default in the executor. If ingest uses custom compression, merge output would differ. Now threaded from ParquetMergePipelineParams through to the executor. 2. Fixed misleading comment claiming "planner will eventually re-plan" on merge failure. In reality, input splits are drained by operations() and won't be re-planned until the pipeline restarts with metastore re-seeding (not yet implemented — TODO added). 3. Added TODO for fetch_immature_parquet_splits() on pipeline respawn, matching the Tantivy MergePipeline pattern. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…riter_config Review findings addressed: 1. fetch_immature_splits(): on pipeline respawn after crash, queries the metastore for published Parquet splits so the planner can re-plan merges that were in-flight during the crash. On first spawn, uses the initial splits from the IndexingService (same as Tantivy pattern). 2. ParquetWriterConfig threaded from pipeline params to executor so merge output uses the same compression as ingest. 3. Fixed misleading "planner will eventually re-plan" comment on merge failure — honest about the limitation that failed splits wait for respawn re-seeding. 4. Added index_uid to ParquetMergePipelineParams for metastore queries. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Matches the Tantivy MergePipeline pattern: - ObservableState is now MergeStatistics (was unit type) - perform_observe() collects counters from uploader + publisher handles - Tracks generation, num_spawn_attempts, num_ongoing_merges, num_uploaded_splits, num_published_splits - previous_generations_statistics preserved across respawns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Both Tantivy and Parquet merge scheduling used identical score logic (prefer merges that reduce more splits for less total bytes). Extracted the core arithmetic into score_merge(num_splits, total_bytes) and have both score_merge_operation() and score_parquet_merge_operation() call it. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per CODE_STYLE.md: comments should convey intent, not implementation. Added explanations for num_merge_ops lineage, known_split_ids rebuild heuristic, output dir isolation, empty merge handling, scratch dir lifetime, permit Drop safety, publisher setter ordering, and feedback loop guard conditions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…line Reads parquet_indexing.sort_fields and parquet_indexing.window_duration_secs from IndexingSettings when constructing the ingest pipeline's TableConfig (was hardcoded to defaults). Adds parquet_merge_policy_from_settings() that converts the config-layer ParquetMergePolicyConfig to an Arc<dyn ParquetMergePolicy> runtime policy, paralleling merge_policy_from_settings() for Tantivy. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rewrites the end-to-end merge pipeline test with diverse inputs (different metrics, timestamps, tags per split) and comprehensive output verification: - Staged metadata: num_rows, time_range union, metric_names, num_merge_ops, sort_fields, row_keys_proto, zonemap_regexes, service tags - Parquet file contents: row count, column values, sort order (sorted_series monotonically non-decreasing, cpu < mem ordering) - KV headers: qh.sort_fields, qh.num_merge_ops, qh.row_keys, qh.zonemap_regexes - Cross-validation: metadata agrees with file contents (timestamps, row count, metric_names, zonemap_regexes) Documents known limitation: only "service" tags are extracted in both the ingest and merge paths; host/env/datacenter/region are not yet tracked in split metadata. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…lumns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
3fa2a58 to
a41c1cc
Compare
fetch_immature_splits was always calling list_metrics_splits. Sketch indexes use a separate Postgres table, so we need to check is_sketches_index and call list_sketch_splits instead. The stage and publish paths already dispatched correctly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
f7e16e0 to
d01c83f
Compare
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
d01c83f to
5956d86
Compare
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Spawns a ParquetMergePipeline alongside each MetricsPipeline, mirroring how the Tantivy MergePipeline is wired to the IndexingPipeline: - IndexingService stores parquet_merge_pipeline_handles (keyed by IndexUid) - get_or_create_parquet_merge_pipeline() creates/reuses merge pipelines - spawn_metrics_pipeline() creates the merge pipeline first, then passes the planner mailbox to MetricsPipelineParams - MetricsPipeline wires the planner mailbox to the Publisher via set_parquet_merge_planner_mailbox() for ingest→merge feedback - handle_supervise() cleans up orphan Parquet merge pipelines when their parent metrics pipeline is gone Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…astore fetch_immature_splits() falls through to the metastore query when initial_immature_splits_opt is None, so the first spawn self-seeds. No pre-fetch in the IndexingService is needed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Stacked on #6356 (Phase 3f). Wires the ParquetMergePipeline into the IndexingService so Parquet compaction actually runs in production.
What this does
Mirrors the Tantivy merge pipeline wiring pattern:
parquet_merge_pipeline_handles(keyed byIndexUid, shared across sources for the same index)get_or_create_parquet_merge_pipeline()creates or reuses a merge pipeline, converting the config-crateParquetMergePolicyConfigto the engine-crate typespawn_metrics_pipeline()now spawns the merge pipeline first, then passes the planner mailbox toMetricsPipelineParamsset_parquet_merge_planner_mailbox(), enabling the ingest→merge feedback loophandle_supervise()cleans up orphan Parquet merge pipelines when their parent metrics pipeline is goneData flow after this PR
Known TODO
Immature Parquet splits are not yet fetched from the metastore when spawning a new merge pipeline (seeding). The planner starts empty and picks up splits from the publisher feedback loop. Seeding will be added as a follow-up.
Test plan
--features metricstest_merge_pipeline_end_to_end)cargo +nightly fmtclean🤖 Generated with Claude Code