feat: Phase 3f — comprehensive end-to-end integration test#6356
Merged
feat: Phase 3f — comprehensive end-to-end integration test#6356
Conversation
e220505 to
0af0572
Compare
d82d72d to
92cb5ed
Compare
1e9ff6c to
372f4eb
Compare
92cb5ed to
b6f8bcc
Compare
372f4eb to
7db8a68
Compare
b6f8bcc to
d296774
Compare
7db8a68 to
4a4679e
Compare
d296774 to
ededb89
Compare
4a4679e to
59f9690
Compare
ededb89 to
761b379
Compare
59f9690 to
f8155a6
Compare
761b379 to
9b3769e
Compare
f8155a6 to
74ebf40
Compare
9b3769e to
927dc9f
Compare
11766e6 to
2933c6e
Compare
927dc9f to
c51a84b
Compare
2933c6e to
9bbfd34
Compare
c51a84b to
804e384
Compare
1396fd4 to
dd1f8fc
Compare
804e384 to
55d8ff1
Compare
dd1f8fc to
6230fca
Compare
55d8ff1 to
d360c80
Compare
6230fca to
38f5749
Compare
d360c80 to
ab8fb10
Compare
38f5749 to
92460d8
Compare
ab8fb10 to
93fde45
Compare
92460d8 to
793ffd1
Compare
93fde45 to
9ad89a5
Compare
793ffd1 to
ab0ae23
Compare
9ad89a5 to
a691c64
Compare
5c47734 to
f052239
Compare
87f8183 to
eb43918
Compare
f052239 to
768e966
Compare
eb43918 to
3a0f4b8
Compare
768e966 to
9d28f36
Compare
3a0f4b8 to
d429436
Compare
3 tasks
9d28f36 to
64bb5e8
Compare
d429436 to
468fa48
Compare
468fa48 to
202d999
Compare
mattmkim
approved these changes
May 1, 2026
…se 3f) Integration test that exercises the full merge actor chain: 1. Creates 2 real sorted Parquet files (via ParquetWriter with sorted_series, sort schema KV metadata, and window metadata) 2. Uploads to RamStorage 3. Seeds ParquetMergePipeline with split metadata (merge_factor=2) 4. Verifies the pipeline plans and executes a merge 5. Asserts publish_metrics_splits called with correct replaced_split_ids Also fixes TempDirectory lifetime bug: adds _scratch_directory_opt to ParquetSplitBatch so the merge executor's scratch directory stays alive until the uploader finishes reading the merged files. Without this, the temp directory was cleaned up between the executor handler returning and the uploader's async upload task reading the files. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…y comment Review findings: 1. ParquetWriterConfig was hardcoded to Default in the executor. If ingest uses custom compression, merge output would differ. Now threaded from ParquetMergePipelineParams through to the executor. 2. Fixed misleading comment claiming "planner will eventually re-plan" on merge failure. In reality, input splits are drained by operations() and won't be re-planned until the pipeline restarts with metastore re-seeding (not yet implemented — TODO added). 3. Added TODO for fetch_immature_parquet_splits() on pipeline respawn, matching the Tantivy MergePipeline pattern. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…riter_config Review findings addressed: 1. fetch_immature_splits(): on pipeline respawn after crash, queries the metastore for published Parquet splits so the planner can re-plan merges that were in-flight during the crash. On first spawn, uses the initial splits from the IndexingService (same as Tantivy pattern). 2. ParquetWriterConfig threaded from pipeline params to executor so merge output uses the same compression as ingest. 3. Fixed misleading "planner will eventually re-plan" comment on merge failure — honest about the limitation that failed splits wait for respawn re-seeding. 4. Added index_uid to ParquetMergePipelineParams for metastore queries. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Matches the Tantivy MergePipeline pattern: - ObservableState is now MergeStatistics (was unit type) - perform_observe() collects counters from uploader + publisher handles - Tracks generation, num_spawn_attempts, num_ongoing_merges, num_uploaded_splits, num_published_splits - previous_generations_statistics preserved across respawns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Both Tantivy and Parquet merge scheduling used identical score logic (prefer merges that reduce more splits for less total bytes). Extracted the core arithmetic into score_merge(num_splits, total_bytes) and have both score_merge_operation() and score_parquet_merge_operation() call it. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per CODE_STYLE.md: comments should convey intent, not implementation. Added explanations for num_merge_ops lineage, known_split_ids rebuild heuristic, output dir isolation, empty merge handling, scratch dir lifetime, permit Drop safety, publisher setter ordering, and feedback loop guard conditions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…line Reads parquet_indexing.sort_fields and parquet_indexing.window_duration_secs from IndexingSettings when constructing the ingest pipeline's TableConfig (was hardcoded to defaults). Adds parquet_merge_policy_from_settings() that converts the config-layer ParquetMergePolicyConfig to an Arc<dyn ParquetMergePolicy> runtime policy, paralleling merge_policy_from_settings() for Tantivy. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rewrites the end-to-end merge pipeline test with diverse inputs (different metrics, timestamps, tags per split) and comprehensive output verification: - Staged metadata: num_rows, time_range union, metric_names, num_merge_ops, sort_fields, row_keys_proto, zonemap_regexes, service tags - Parquet file contents: row count, column values, sort order (sorted_series monotonically non-decreasing, cpu < mem ordering) - KV headers: qh.sort_fields, qh.num_merge_ops, qh.row_keys, qh.zonemap_regexes - Cross-validation: metadata agrees with file contents (timestamps, row count, metric_names, zonemap_regexes) Documents known limitation: only "service" tags are extracted in both the ingest and merge paths; host/env/datacenter/region are not yet tracked in split metadata. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…lumns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
3fa2a58 to
a41c1cc
Compare
fetch_immature_splits was always calling list_metrics_splits. Sketch indexes use a separate Postgres table, so we need to check is_sketches_index and call list_sketch_splits instead. The stage and publish paths already dispatched correctly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Stacked on #6354 (Phase 3d+3e). Comprehensive integration test for the Parquet merge pipeline with thorough output verification.
Integration test
Exercises the full merge actor chain in-process with RamStorage and mock metastore using diverse inputs (different metrics, timestamps, tags per split):
RamStorage, seedsParquetMergePipelinewith split metadata (merge_factor=2)What the test verifies
Staged metadata (captured from mock metastore):
num_rows == 100(MC-1 row preservation)time_range == (100, 250)(union of inputs)metric_names == {cpu.usage, mem.usage}num_merge_ops == 1,sort_fieldspreserved,row_keys_protopresentzonemap_regexespresent withmetric_nameentryservicetags == {web, api}Parquet file contents (read back from RamStorage):
sorted_seriesmonotonically non-decreasing (sort invariant)cpu.usagerows precedemem.usagerows (sort schema semantics)KV metadata headers:
qh.sort_fields,qh.num_merge_ops,qh.row_keys,qh.zonemap_regexesCross-validation: metadata agrees with file contents (timestamps, row count, zonemap regexes)
Known limitation documented
Only "service" tags are extracted in both the ingest and merge paths. Host/env/datacenter/region columns exist in the data but are not yet tracked in split metadata.
Test plan
cargo clippycleancargo +nightly fmtcleanmetricsfeature🤖 Generated with Claude Code