Skip to content

feat: Phase 3f — comprehensive end-to-end integration test#6356

Merged
g-talbot merged 13 commits intomainfrom
gtt/parquet-merge-pipeline-3f
May 1, 2026
Merged

feat: Phase 3f — comprehensive end-to-end integration test#6356
g-talbot merged 13 commits intomainfrom
gtt/parquet-merge-pipeline-3f

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

@g-talbot g-talbot commented Apr 29, 2026

Summary

Stacked on #6354 (Phase 3d+3e). Comprehensive integration test for the Parquet merge pipeline with thorough output verification.

Integration test

Exercises the full merge actor chain in-process with RamStorage and mock metastore using diverse inputs (different metrics, timestamps, tags per split):

  1. Creates 2 real sorted Parquet files with different data (cpu.usage vs mem.usage, disjoint timestamps, different service/host tags)
  2. Uploads to RamStorage, seeds ParquetMergePipeline with split metadata (merge_factor=2)
  3. Verifies the pipeline plans a merge, downloads, merges, uploads, and publishes

What the test verifies

Staged metadata (captured from mock metastore):

  • num_rows == 100 (MC-1 row preservation)
  • time_range == (100, 250) (union of inputs)
  • metric_names == {cpu.usage, mem.usage}
  • num_merge_ops == 1, sort_fields preserved, row_keys_proto present
  • zonemap_regexes present with metric_name entry
  • service tags == {web, api}

Parquet file contents (read back from RamStorage):

  • Row count matches metadata
  • All metric names, timestamps, service/host values present
  • sorted_series monotonically non-decreasing (sort invariant)
  • cpu.usage rows precede mem.usage rows (sort schema semantics)

KV metadata headers: qh.sort_fields, qh.num_merge_ops, qh.row_keys, qh.zonemap_regexes

Cross-validation: metadata agrees with file contents (timestamps, row count, zonemap regexes)

Known limitation documented

Only "service" tags are extracted in both the ingest and merge paths. Host/env/datacenter/region columns exist in the data but are not yet tracked in split metadata.

Test plan

  • Integration test passes (full merge pipeline e2e with data verification)
  • cargo clippy clean
  • cargo +nightly fmt clean
  • Compiles with and without metrics feature

🤖 Generated with Claude Code

@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from e220505 to 0af0572 Compare April 29, 2026 14:02
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from d82d72d to 92cb5ed Compare April 29, 2026 15:31
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 1e9ff6c to 372f4eb Compare April 29, 2026 15:31
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 92cb5ed to b6f8bcc Compare April 29, 2026 18:11
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 372f4eb to 7db8a68 Compare April 29, 2026 18:11
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from b6f8bcc to d296774 Compare April 29, 2026 18:16
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 7db8a68 to 4a4679e Compare April 29, 2026 18:16
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from d296774 to ededb89 Compare April 29, 2026 18:24
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 4a4679e to 59f9690 Compare April 29, 2026 18:26
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from ededb89 to 761b379 Compare April 29, 2026 18:42
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 59f9690 to f8155a6 Compare April 29, 2026 18:43
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 761b379 to 9b3769e Compare April 29, 2026 18:51
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from f8155a6 to 74ebf40 Compare April 29, 2026 18:51
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 9b3769e to 927dc9f Compare April 29, 2026 19:05
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 11766e6 to 2933c6e Compare April 29, 2026 19:06
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 927dc9f to c51a84b Compare April 29, 2026 20:54
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 2933c6e to 9bbfd34 Compare April 29, 2026 20:54
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from c51a84b to 804e384 Compare April 30, 2026 02:30
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 1396fd4 to dd1f8fc Compare April 30, 2026 02:30
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 804e384 to 55d8ff1 Compare April 30, 2026 13:42
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from dd1f8fc to 6230fca Compare April 30, 2026 13:42
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 55d8ff1 to d360c80 Compare April 30, 2026 13:49
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 6230fca to 38f5749 Compare April 30, 2026 13:49
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from d360c80 to ab8fb10 Compare April 30, 2026 14:47
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 38f5749 to 92460d8 Compare April 30, 2026 14:47
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from ab8fb10 to 93fde45 Compare April 30, 2026 16:41
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 92460d8 to 793ffd1 Compare April 30, 2026 16:41
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 93fde45 to 9ad89a5 Compare April 30, 2026 16:52
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 793ffd1 to ab0ae23 Compare April 30, 2026 16:52
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 9ad89a5 to a691c64 Compare April 30, 2026 20:25
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 5c47734 to f052239 Compare May 1, 2026 17:41
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 87f8183 to eb43918 Compare May 1, 2026 17:42
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from f052239 to 768e966 Compare May 1, 2026 17:47
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from eb43918 to 3a0f4b8 Compare May 1, 2026 17:48
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 768e966 to 9d28f36 Compare May 1, 2026 17:52
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 3a0f4b8 to d429436 Compare May 1, 2026 17:53
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3de branch from 9d28f36 to 64bb5e8 Compare May 1, 2026 18:51
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from d429436 to 468fa48 Compare May 1, 2026 18:51
@g-talbot g-talbot requested a review from mattmkim May 1, 2026 18:53
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 468fa48 to 202d999 Compare May 1, 2026 18:57
Base automatically changed from gtt/parquet-merge-pipeline-3de to main May 1, 2026 19:13
Copy link
Copy Markdown
Contributor

@mattmkim mattmkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple comments

Comment thread quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs Outdated
Comment thread quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs Outdated
g-talbot and others added 10 commits May 1, 2026 16:55
…se 3f)

Integration test that exercises the full merge actor chain:
1. Creates 2 real sorted Parquet files (via ParquetWriter with sorted_series,
   sort schema KV metadata, and window metadata)
2. Uploads to RamStorage
3. Seeds ParquetMergePipeline with split metadata (merge_factor=2)
4. Verifies the pipeline plans and executes a merge
5. Asserts publish_metrics_splits called with correct replaced_split_ids

Also fixes TempDirectory lifetime bug: adds _scratch_directory_opt to
ParquetSplitBatch so the merge executor's scratch directory stays alive
until the uploader finishes reading the merged files. Without this, the
temp directory was cleaned up between the executor handler returning and
the uploader's async upload task reading the files.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…y comment

Review findings:

1. ParquetWriterConfig was hardcoded to Default in the executor. If ingest
   uses custom compression, merge output would differ. Now threaded from
   ParquetMergePipelineParams through to the executor.

2. Fixed misleading comment claiming "planner will eventually re-plan"
   on merge failure. In reality, input splits are drained by operations()
   and won't be re-planned until the pipeline restarts with metastore
   re-seeding (not yet implemented — TODO added).

3. Added TODO for fetch_immature_parquet_splits() on pipeline respawn,
   matching the Tantivy MergePipeline pattern.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…riter_config

Review findings addressed:

1. fetch_immature_splits(): on pipeline respawn after crash, queries the
   metastore for published Parquet splits so the planner can re-plan
   merges that were in-flight during the crash. On first spawn, uses the
   initial splits from the IndexingService (same as Tantivy pattern).

2. ParquetWriterConfig threaded from pipeline params to executor so merge
   output uses the same compression as ingest.

3. Fixed misleading "planner will eventually re-plan" comment on merge
   failure — honest about the limitation that failed splits wait for
   respawn re-seeding.

4. Added index_uid to ParquetMergePipelineParams for metastore queries.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Matches the Tantivy MergePipeline pattern:
- ObservableState is now MergeStatistics (was unit type)
- perform_observe() collects counters from uploader + publisher handles
- Tracks generation, num_spawn_attempts, num_ongoing_merges,
  num_uploaded_splits, num_published_splits
- previous_generations_statistics preserved across respawns

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Both Tantivy and Parquet merge scheduling used identical score logic
(prefer merges that reduce more splits for less total bytes). Extracted
the core arithmetic into score_merge(num_splits, total_bytes) and have
both score_merge_operation() and score_parquet_merge_operation() call it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per CODE_STYLE.md: comments should convey intent, not implementation.
Added explanations for num_merge_ops lineage, known_split_ids rebuild
heuristic, output dir isolation, empty merge handling, scratch dir
lifetime, permit Drop safety, publisher setter ordering, and feedback
loop guard conditions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…line

Reads parquet_indexing.sort_fields and parquet_indexing.window_duration_secs
from IndexingSettings when constructing the ingest pipeline's TableConfig
(was hardcoded to defaults).

Adds parquet_merge_policy_from_settings() that converts the config-layer
ParquetMergePolicyConfig to an Arc<dyn ParquetMergePolicy> runtime policy,
paralleling merge_policy_from_settings() for Tantivy.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rewrites the end-to-end merge pipeline test with diverse inputs (different
metrics, timestamps, tags per split) and comprehensive output verification:

- Staged metadata: num_rows, time_range union, metric_names, num_merge_ops,
  sort_fields, row_keys_proto, zonemap_regexes, service tags
- Parquet file contents: row count, column values, sort order
  (sorted_series monotonically non-decreasing, cpu < mem ordering)
- KV headers: qh.sort_fields, qh.num_merge_ops, qh.row_keys,
  qh.zonemap_regexes
- Cross-validation: metadata agrees with file contents (timestamps,
  row count, metric_names, zonemap_regexes)

Documents known limitation: only "service" tags are extracted in both
the ingest and merge paths; host/env/datacenter/region are not yet
tracked in split metadata.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…lumns

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/parquet-merge-pipeline-3f branch from 3fa2a58 to a41c1cc Compare May 1, 2026 20:56
g-talbot and others added 3 commits May 1, 2026 16:58
fetch_immature_splits was always calling list_metrics_splits. Sketch
indexes use a separate Postgres table, so we need to check
is_sketches_index and call list_sketch_splits instead. The stage and
publish paths already dispatched correctly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot enabled auto-merge (squash) May 1, 2026 21:21
@g-talbot g-talbot merged commit 14cbc4b into main May 1, 2026
9 checks passed
@g-talbot g-talbot deleted the gtt/parquet-merge-pipeline-3f branch May 1, 2026 21:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants