Skip to content

feat: wire ParquetMergePipeline into IndexingService#6367

Merged
g-talbot merged 15 commits intomainfrom
gtt/parquet-merge-pipeline-wiring
May 1, 2026
Merged

feat: wire ParquetMergePipeline into IndexingService#6367
g-talbot merged 15 commits intomainfrom
gtt/parquet-merge-pipeline-wiring

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

@g-talbot g-talbot commented May 1, 2026

Summary

Stacked on #6356 (Phase 3f). Wires the ParquetMergePipeline into the IndexingService so Parquet compaction actually runs in production.

What this does

Mirrors the Tantivy merge pipeline wiring pattern:

  • IndexingService stores parquet_merge_pipeline_handles (keyed by IndexUid, shared across sources for the same index)
  • get_or_create_parquet_merge_pipeline() creates or reuses a merge pipeline, converting the config-crate ParquetMergePolicyConfig to the engine-crate type
  • spawn_metrics_pipeline() now spawns the merge pipeline first, then passes the planner mailbox to MetricsPipelineParams
  • MetricsPipeline wires the planner mailbox to the Publisher via set_parquet_merge_planner_mailbox(), enabling the ingest→merge feedback loop
  • handle_supervise() cleans up orphan Parquet merge pipelines when their parent metrics pipeline is gone

Data flow after this PR

Ingest: Source → DocProcessor → Indexer → Packager → Uploader → Publisher
                                                                    │
                                                          ParquetNewSplits
                                                                    ↓
Merge:  MergePlanner → Downloader → Executor → Uploader → Publisher
            ↑                                                   │
            └───────────── ParquetNewSplits ────────────────────┘

Known TODO

Immature Parquet splits are not yet fetched from the metastore when spawning a new merge pipeline (seeding). The planner starts empty and picks up splits from the publisher feedback loop. Seeding will be added as a follow-up.

Test plan

  • Compiles with --features metrics
  • Integration test passes (test_merge_pipeline_end_to_end)
  • cargo +nightly fmt clean

🤖 Generated with Claude Code

@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from d429436 to 468fa48 Compare May 1, 2026 18:51
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-wiring branch from d3acd2b to 6139fca Compare May 1, 2026 18:52
@g-talbot g-talbot requested a review from mattmkim May 1, 2026 18:53
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 468fa48 to 202d999 Compare May 1, 2026 18:57
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-wiring branch from 6139fca to 0cd16c2 Compare May 1, 2026 18:58
@mattmkim mattmkim self-requested a review May 1, 2026 20:19
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-wiring branch from 5bf00d6 to f7e16e0 Compare May 1, 2026 20:27
g-talbot and others added 10 commits May 1, 2026 16:55
…se 3f)

Integration test that exercises the full merge actor chain:
1. Creates 2 real sorted Parquet files (via ParquetWriter with sorted_series,
   sort schema KV metadata, and window metadata)
2. Uploads to RamStorage
3. Seeds ParquetMergePipeline with split metadata (merge_factor=2)
4. Verifies the pipeline plans and executes a merge
5. Asserts publish_metrics_splits called with correct replaced_split_ids

Also fixes TempDirectory lifetime bug: adds _scratch_directory_opt to
ParquetSplitBatch so the merge executor's scratch directory stays alive
until the uploader finishes reading the merged files. Without this, the
temp directory was cleaned up between the executor handler returning and
the uploader's async upload task reading the files.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…y comment

Review findings:

1. ParquetWriterConfig was hardcoded to Default in the executor. If ingest
   uses custom compression, merge output would differ. Now threaded from
   ParquetMergePipelineParams through to the executor.

2. Fixed misleading comment claiming "planner will eventually re-plan"
   on merge failure. In reality, input splits are drained by operations()
   and won't be re-planned until the pipeline restarts with metastore
   re-seeding (not yet implemented — TODO added).

3. Added TODO for fetch_immature_parquet_splits() on pipeline respawn,
   matching the Tantivy MergePipeline pattern.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…riter_config

Review findings addressed:

1. fetch_immature_splits(): on pipeline respawn after crash, queries the
   metastore for published Parquet splits so the planner can re-plan
   merges that were in-flight during the crash. On first spawn, uses the
   initial splits from the IndexingService (same as Tantivy pattern).

2. ParquetWriterConfig threaded from pipeline params to executor so merge
   output uses the same compression as ingest.

3. Fixed misleading "planner will eventually re-plan" comment on merge
   failure — honest about the limitation that failed splits wait for
   respawn re-seeding.

4. Added index_uid to ParquetMergePipelineParams for metastore queries.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Matches the Tantivy MergePipeline pattern:
- ObservableState is now MergeStatistics (was unit type)
- perform_observe() collects counters from uploader + publisher handles
- Tracks generation, num_spawn_attempts, num_ongoing_merges,
  num_uploaded_splits, num_published_splits
- previous_generations_statistics preserved across respawns

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Both Tantivy and Parquet merge scheduling used identical score logic
(prefer merges that reduce more splits for less total bytes). Extracted
the core arithmetic into score_merge(num_splits, total_bytes) and have
both score_merge_operation() and score_parquet_merge_operation() call it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per CODE_STYLE.md: comments should convey intent, not implementation.
Added explanations for num_merge_ops lineage, known_split_ids rebuild
heuristic, output dir isolation, empty merge handling, scratch dir
lifetime, permit Drop safety, publisher setter ordering, and feedback
loop guard conditions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…line

Reads parquet_indexing.sort_fields and parquet_indexing.window_duration_secs
from IndexingSettings when constructing the ingest pipeline's TableConfig
(was hardcoded to defaults).

Adds parquet_merge_policy_from_settings() that converts the config-layer
ParquetMergePolicyConfig to an Arc<dyn ParquetMergePolicy> runtime policy,
paralleling merge_policy_from_settings() for Tantivy.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rewrites the end-to-end merge pipeline test with diverse inputs (different
metrics, timestamps, tags per split) and comprehensive output verification:

- Staged metadata: num_rows, time_range union, metric_names, num_merge_ops,
  sort_fields, row_keys_proto, zonemap_regexes, service tags
- Parquet file contents: row count, column values, sort order
  (sorted_series monotonically non-decreasing, cpu < mem ordering)
- KV headers: qh.sort_fields, qh.num_merge_ops, qh.row_keys,
  qh.zonemap_regexes
- Cross-validation: metadata agrees with file contents (timestamps,
  row count, metric_names, zonemap_regexes)

Documents known limitation: only "service" tags are extracted in both
the ingest and merge paths; host/env/datacenter/region are not yet
tracked in split metadata.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…lumns

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 3fa2a58 to a41c1cc Compare May 1, 2026 20:56
fetch_immature_splits was always calling list_metrics_splits. Sketch
indexes use a separate Postgres table, so we need to check
is_sketches_index and call list_sketch_splits instead. The stage and
publish paths already dispatched correctly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-wiring branch from f7e16e0 to d01c83f Compare May 1, 2026 20:58
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot closed this May 1, 2026
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-wiring branch from d01c83f to 5956d86 Compare May 1, 2026 21:00
g-talbot and others added 3 commits May 1, 2026 17:15
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Spawns a ParquetMergePipeline alongside each MetricsPipeline, mirroring
how the Tantivy MergePipeline is wired to the IndexingPipeline:

- IndexingService stores parquet_merge_pipeline_handles (keyed by IndexUid)
- get_or_create_parquet_merge_pipeline() creates/reuses merge pipelines
- spawn_metrics_pipeline() creates the merge pipeline first, then passes
  the planner mailbox to MetricsPipelineParams
- MetricsPipeline wires the planner mailbox to the Publisher via
  set_parquet_merge_planner_mailbox() for ingest→merge feedback
- handle_supervise() cleans up orphan Parquet merge pipelines when their
  parent metrics pipeline is gone

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…astore

fetch_immature_splits() falls through to the metastore query when
initial_immature_splits_opt is None, so the first spawn self-seeds.
No pre-fetch in the IndexingService is needed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot reopened this May 1, 2026
Base automatically changed from gtt/parquet-merge-pipeline-3f to main May 1, 2026 21:26
@g-talbot g-talbot merged commit 84cbbaf into main May 1, 2026
5 checks passed
@g-talbot g-talbot deleted the gtt/parquet-merge-pipeline-wiring branch May 1, 2026 23:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants