From ea9adea927c5946e110098cc00bd3326d4f78acb Mon Sep 17 00:00:00 2001 From: George Talbot Date: Wed, 29 Apr 2026 16:35:52 -0400 Subject: [PATCH 1/5] feat: add configurable ParquetMergePolicyConfig to index settings Adds `parquet_merge_policy` section to `IndexingSettings`, making the Parquet merge policy configurable per-index via YAML. Parameters: - merge_factor (default 10): min splits to trigger a merge - max_merge_factor (default 12): max splits per merge - max_merge_ops (default 4): bounds write amplification - target_split_size_bytes (default 256 MiB): target output size - maturation_period (default 48h): split maturity timeout - max_finalize_merge_operations (default 3): cold-window shutdown limit Mirrors the existing merge_policy config pattern for logs/traces. Updates index-config.md documentation with the new section. Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/configuration/index-config.md | 32 +++++++++- .../quickwit-config/src/index_config/mod.rs | 8 ++- quickwit/quickwit-config/src/lib.rs | 4 +- .../src/merge_policy_config.rs | 64 +++++++++++++++++++ 4 files changed, 105 insertions(+), 3 deletions(-) diff --git a/docs/configuration/index-config.md b/docs/configuration/index-config.md index 24ce8677902..4f502b47ece 100644 --- a/docs/configuration/index-config.md +++ b/docs/configuration/index-config.md @@ -594,7 +594,8 @@ This section describes indexing settings for a given index. | ------------- | ------------- | ------------- | | `commit_timeout_secs` | Maximum number of seconds before committing a split since its creation. | `60` | | `split_num_docs_target` | Target number of docs per split. | `10000000` | -| `merge_policy` | Describes the strategy used to trigger split merge operations (see [Merge policies](#merge-policies) section below). | +| `merge_policy` | Describes the strategy used to trigger split merge operations for logs/traces (see [Merge policies](#merge-policies) section below). | +| `parquet_merge_policy` | Describes the merge policy for Parquet (metrics/sketches) splits (see [Parquet merge policy](#parquet-merge-policy) section below). | | `resources.heap_size` | Indexer heap size per source per index. | `2000000000` | | `docstore_compression_level` | Level of compression used by zstd for the docstore. Lower values may increase ingest speed, at the cost of index size | `8` | | `docstore_blocksize` | Size of blocks in the docstore, in bytes. Lower values may improve doc retrieval speed, at the cost of index size | `1000000` | @@ -687,6 +688,35 @@ indexing_settings: type: "no_merge" ``` +#### Parquet merge policy + +*For indexes using the Parquet indexing pipeline (metrics, sketches).* + +The Parquet merge policy controls how Parquet splits within a compaction scope (same time window, partition, and sort schema) are merged. It uses a constant write amplification strategy: splits at the same merge level are greedily accumulated until reaching `max_merge_factor` or `target_split_size_bytes`. + +```yaml +version: 0.7 +index_id: "my-metrics-index" +# ... +indexing_settings: + parquet_merge_policy: + merge_factor: 10 + max_merge_factor: 12 + max_merge_ops: 4 + target_split_size_bytes: 268435456 + maturation_period: 48h + max_finalize_merge_operations: 3 +``` + + +| Variable | Description | Default value | +| ------------- | ------------- | ------------- | +| `merge_factor` | Minimum number of splits to trigger a merge. | `10` | +| `max_merge_factor` | Maximum number of splits in a single merge operation. | `12` | +| `max_merge_ops` | Maximum number of merges a split can undergo before becoming mature. Bounds total write amplification. | `4` | +| `target_split_size_bytes` | Target size for merged output splits in bytes. Merges trigger when accumulated bytes reach this threshold, even if `merge_factor` is not reached. | `268435456` (256 MiB) | +| `maturation_period` | Duration after creation when a split becomes mature (never merged again). | `48h` | +| `max_finalize_merge_operations` | *(advanced)* Maximum number of merge operations emitted during cold-window finalization at pipeline shutdown. Set to `0` to disable. | `3` | ### Indexer memory usage diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 286f2d695be..df95cac8eec 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -36,7 +36,7 @@ use siphasher::sip::SipHasher; use tracing::warn; use crate::index_config::serialize::VersionedIndexConfig; -use crate::merge_policy_config::MergePolicyConfig; +use crate::merge_policy_config::{MergePolicyConfig, ParquetMergePolicyConfig}; #[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)] #[serde(deny_unknown_fields)] @@ -118,6 +118,11 @@ pub struct IndexingSettings { pub split_num_docs_target: usize, #[serde(default)] pub merge_policy: MergePolicyConfig, + /// Merge policy for Parquet (metrics/sketches) splits. Controls how + /// Parquet splits are compacted within time windows. Only used by + /// indexes that use the Parquet indexing pipeline. + #[serde(default)] + pub parquet_merge_policy: ParquetMergePolicyConfig, #[serde(default)] pub resources: IndexingResources, } @@ -160,6 +165,7 @@ impl Default for IndexingSettings { docstore_compression_level: Self::default_docstore_compression_level(), split_num_docs_target: Self::default_split_num_docs_target(), merge_policy: MergePolicyConfig::default(), + parquet_merge_policy: ParquetMergePolicyConfig::default(), resources: IndexingResources::default(), } } diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 2abaaef79f3..12cfdc20c68 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -67,7 +67,8 @@ use tracing::warn; use crate::index_template::IndexTemplateV0_8; pub use crate::index_template::{IndexTemplate, IndexTemplateId, VersionedIndexTemplate}; use crate::merge_policy_config::{ - ConstWriteAmplificationMergePolicyConfig, MergePolicyConfig, StableLogMergePolicyConfig, + ConstWriteAmplificationMergePolicyConfig, MergePolicyConfig, ParquetMergePolicyConfig, + StableLogMergePolicyConfig, }; pub use crate::metastore_config::{ MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig, @@ -113,6 +114,7 @@ pub fn disable_ingest_v1() -> bool { KafkaSourceParams, KinesisSourceParams, MergePolicyConfig, + ParquetMergePolicyConfig, PubSubSourceParams, PulsarSourceAuth, PulsarSourceParams, diff --git a/quickwit/quickwit-config/src/merge_policy_config.rs b/quickwit/quickwit-config/src/merge_policy_config.rs index 3e4e5dad0ce..d3357b288c0 100644 --- a/quickwit/quickwit-config/src/merge_policy_config.rs +++ b/quickwit/quickwit-config/src/merge_policy_config.rs @@ -119,6 +119,70 @@ impl Default for StableLogMergePolicyConfig { } } +// --- Parquet merge policy config --- + +fn default_target_split_size_bytes() -> u64 { + 256 * 1024 * 1024 // 256 MiB +} + +fn default_max_finalize_merge_operations() -> usize { + 3 +} + +/// Configuration for the Parquet (metrics/sketches) merge policy. +/// +/// Controls how Parquet splits within a compaction scope are merged. +/// Splits at the same `num_merge_ops` level are greedily accumulated +/// until reaching `max_merge_factor` or `target_split_size_bytes`. +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] +pub struct ParquetMergePolicyConfig { + /// Minimum number of splits to trigger a merge. + #[serde(default = "default_merge_factor")] + pub merge_factor: usize, + /// Maximum number of splits in a single merge operation. + #[serde(default = "default_max_merge_factor")] + pub max_merge_factor: usize, + /// Maximum number of merges a split can undergo before becoming mature. + /// Bounds total write amplification. + #[serde(default = "default_parquet_max_merge_ops")] + pub max_merge_ops: u32, + /// Target size for merged output splits in bytes. Merges are triggered + /// when accumulated bytes reach this threshold, even if `merge_factor` + /// is not reached. + #[serde(default = "default_target_split_size_bytes")] + pub target_split_size_bytes: u64, + /// Duration after creation when a split becomes mature regardless of + /// size or merge count. Mature splits are never merged. + #[schema(value_type = String)] + #[serde(default = "default_maturation_period")] + #[serde(deserialize_with = "parse_human_duration")] + #[serde(serialize_with = "serialize_duration")] + pub maturation_period: Duration, + /// Maximum number of merge operations emitted during cold-window + /// finalization at shutdown. Set to 0 to disable. + #[serde(default = "default_max_finalize_merge_operations")] + #[serde(skip_serializing_if = "is_zero")] + pub max_finalize_merge_operations: usize, +} + +fn default_parquet_max_merge_ops() -> u32 { + 4 +} + +impl Default for ParquetMergePolicyConfig { + fn default() -> Self { + Self { + merge_factor: default_merge_factor(), + max_merge_factor: default_max_merge_factor(), + max_merge_ops: default_parquet_max_merge_ops(), + target_split_size_bytes: default_target_split_size_bytes(), + maturation_period: default_maturation_period(), + max_finalize_merge_operations: default_max_finalize_merge_operations(), + } + } +} + fn parse_human_duration<'de, D>(deserializer: D) -> Result where D: Deserializer<'de> { let value: String = Deserialize::deserialize(deserializer)?; From 1fbece5456644986b9cbf0488aed1d6001963548 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Wed, 29 Apr 2026 16:45:18 -0400 Subject: [PATCH 2/5] feat: add ParquetIndexingConfig with sort_fields and window_duration_secs Adds `parquet_indexing` section to `IndexingSettings` for per-index Parquet pipeline configuration: - `sort_fields`: sort schema override (Husky-style pipe-delimited syntax with /V2 suffix). Controls row ordering, query pruning, compression locality, and compaction scope. When omitted, uses the product-type default. - `window_duration_secs`: time window for split partitioning (default 900s / 15 min). Must divide 3600. Updates docs/configuration/index-config.md with: - "Parquet indexing settings" section explaining both parameters - Full sort schema syntax reference (column types, direction overrides, & LSM cutoff marker) - Examples showing minimal, custom, and advanced configurations Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/configuration/index-config.md | 52 +++++++++++++ .../quickwit-config/src/index_config/mod.rs | 77 +++++++++++++++++++ quickwit/quickwit-config/src/lib.rs | 7 +- 3 files changed, 133 insertions(+), 3 deletions(-) diff --git a/docs/configuration/index-config.md b/docs/configuration/index-config.md index 4f502b47ece..067f16c7fc8 100644 --- a/docs/configuration/index-config.md +++ b/docs/configuration/index-config.md @@ -596,6 +596,7 @@ This section describes indexing settings for a given index. | `split_num_docs_target` | Target number of docs per split. | `10000000` | | `merge_policy` | Describes the strategy used to trigger split merge operations for logs/traces (see [Merge policies](#merge-policies) section below). | | `parquet_merge_policy` | Describes the merge policy for Parquet (metrics/sketches) splits (see [Parquet merge policy](#parquet-merge-policy) section below). | +| `parquet_indexing` | Parquet-specific indexing settings: sort schema, window duration (see [Parquet indexing settings](#parquet-indexing-settings) section below). | | `resources.heap_size` | Indexer heap size per source per index. | `2000000000` | | `docstore_compression_level` | Level of compression used by zstd for the docstore. Lower values may increase ingest speed, at the cost of index size | `8` | | `docstore_blocksize` | Size of blocks in the docstore, in bytes. Lower values may improve doc retrieval speed, at the cost of index size | `1000000` | @@ -688,6 +689,57 @@ indexing_settings: type: "no_merge" ``` +### Parquet indexing settings + +*For indexes using the Parquet indexing pipeline (metrics, sketches).* + +These settings control how the Parquet pipeline sorts, windows, and writes incoming data. They affect both ingest-time performance and downstream query/compaction efficiency. + +```yaml +version: 0.7 +index_id: "my-metrics-index" +# ... +indexing_settings: + parquet_indexing: + sort_fields: "metric_name|service|env|host|timeseries_id|timestamp_secs/V2" + window_duration_secs: 900 +``` + +| Variable | Description | Default value | +| ------------- | ------------- | ------------- | +| `sort_fields` | Sort schema for row ordering in Parquet files (see syntax below). When omitted, the product-type default is used. | `metric_name\|service\|env\|datacenter\|region\|host\|timeseries_id\|timestamp_secs/V2` | +| `window_duration_secs` | Time window duration in seconds for split partitioning. Must evenly divide 3600. Larger values = fewer splits but coarser time pruning. | `900` (15 minutes) | + +#### Sort schema syntax + +The sort schema uses pipe-delimited column names with a `/V2` version suffix: + +```text +column1|column2|...|timestamp_secs/V2 +``` + +**Column types** are inferred from name suffixes: +- `__s` → string (e.g., `custom_tag__s`) +- `__i` → int64 (e.g., `priority__i`) +- Well-known names like `metric_name`, `service`, `env`, `host`, `timestamp_secs`, and `timeseries_id` have built-in type mappings and don't need suffixes. + +**Sort direction** defaults to ascending for most columns and descending for timestamp columns. Override with `+` (ascending) or `-` (descending) as a prefix or suffix on the column name: + +```text +# Explicit descending timestamp +metric_name|host|-timestamp_secs/V2 + +# Ascending host (default), descending timestamp (default) +metric_name|host|timestamp_secs/V2 +``` + +**How the sort schema affects behavior:** +- **Query pruning**: queries filtering on leading columns (e.g., `metric_name`) can skip entire splits whose row key ranges don't match. +- **Compression**: grouping similar values together (e.g., all rows for the same metric name) improves columnar compression ratios. +- **Compaction scope**: splits with different sort schemas are never merged together. Changing the sort schema on an existing index creates a new compaction scope — old splits are not re-sorted. + +**The `&` marker** (advanced) sets the LSM comparison cutoff: columns after `&` are used for sort order but not for compaction locality decisions. For example, `metric_name|&host|timestamp_secs/V2` sorts by metric_name then host, but only metric_name determines which splits can be merged. + #### Parquet merge policy *For indexes using the Parquet indexing pipeline (metrics, sketches).* diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index df95cac8eec..d4098f315dd 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -123,10 +123,86 @@ pub struct IndexingSettings { /// indexes that use the Parquet indexing pipeline. #[serde(default)] pub parquet_merge_policy: ParquetMergePolicyConfig, + /// Parquet-specific indexing settings (sort schema, window duration, + /// compression). Only used by indexes that use the Parquet pipeline. + #[serde(default)] + pub parquet_indexing: ParquetIndexingConfig, #[serde(default)] pub resources: IndexingResources, } +/// Configuration for the Parquet indexing pipeline (metrics, sketches). +/// +/// Controls how incoming data is sorted, windowed, and compressed before +/// writing to Parquet split files. These settings affect both ingest-time +/// performance and downstream query/compaction efficiency. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] +pub struct ParquetIndexingConfig { + /// Sort schema defining the physical sort order of rows in Parquet files. + /// + /// Uses Husky-style pipe-delimited syntax with a `/V2` version suffix. + /// Each column is sorted ascending by default; use `+` or `-` prefix/suffix + /// to override. Column types are inferred from well-known suffixes + /// (`__s` = string, `__i` = int64, `_secs` = uint64 timestamp). + /// + /// The sort order determines: + /// - **Query pruning**: queries that filter on leading sort columns can + /// skip entire splits whose row key ranges don't match. + /// - **Compression**: columns with good locality (e.g., metric_name first) + /// compress better in Parquet's columnar format. + /// - **Compaction scope**: splits with different sort schemas are never + /// merged together. + /// + /// When `None`, the product-type default is used (see below). + /// + /// # Default (metrics/sketches) + /// ```text + /// metric_name|service|env|datacenter|region|host|timeseries_id|timestamp_secs/V2 + /// ``` + /// + /// # Examples + /// ```text + /// # Minimal: just metric name and timestamp + /// metric_name|timestamp_secs/V2 + /// + /// # Custom tags in sort order + /// metric_name|service|cluster|host|timestamp_secs/V2 + /// + /// # Explicit descending timestamp + /// metric_name|host|-timestamp_secs/V2 + /// ``` + #[serde(default, skip_serializing_if = "Option::is_none")] + pub sort_fields: Option, + + /// Time window duration in seconds for split partitioning. + /// + /// Incoming data is partitioned into time windows of this duration. + /// Splits within the same window may be compacted together; splits in + /// different windows are never merged. Must evenly divide 3600 (one hour). + /// + /// Larger values produce fewer, larger splits (better for bulk queries) + /// but coarser time-based pruning. Smaller values give finer pruning + /// but more splits to manage. + #[serde(default = "ParquetIndexingConfig::default_window_duration_secs")] + pub window_duration_secs: u32, +} + +impl ParquetIndexingConfig { + fn default_window_duration_secs() -> u32 { + 900 + } +} + +impl Default for ParquetIndexingConfig { + fn default() -> Self { + Self { + sort_fields: None, + window_duration_secs: Self::default_window_duration_secs(), + } + } +} + impl IndexingSettings { pub fn commit_timeout(&self) -> Duration { Duration::from_secs(self.commit_timeout_secs as u64) @@ -166,6 +242,7 @@ impl Default for IndexingSettings { split_num_docs_target: Self::default_split_num_docs_target(), merge_policy: MergePolicyConfig::default(), parquet_merge_policy: ParquetMergePolicyConfig::default(), + parquet_indexing: ParquetIndexingConfig::default(), resources: IndexingResources::default(), } } diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 12cfdc20c68..b10afbc8b0b 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -45,9 +45,9 @@ pub use cluster_config::ClusterConfig; // See #2048 use index_config::serialize::{IndexConfigV0_8, VersionedIndexConfig}; pub use index_config::{ - IndexConfig, IndexingResources, IndexingSettings, IngestSettings, RetentionPolicy, - SearchSettings, build_doc_mapper, load_index_config_from_user_config, load_index_config_update, - prepare_doc_mapping_update, + IndexConfig, IndexingResources, IndexingSettings, IngestSettings, ParquetIndexingConfig, + RetentionPolicy, SearchSettings, build_doc_mapper, load_index_config_from_user_config, + load_index_config_update, prepare_doc_mapping_update, }; pub use quickwit_doc_mapper::DocMapping; use serde::Serialize; @@ -114,6 +114,7 @@ pub fn disable_ingest_v1() -> bool { KafkaSourceParams, KinesisSourceParams, MergePolicyConfig, + ParquetIndexingConfig, ParquetMergePolicyConfig, PubSubSourceParams, PulsarSourceAuth, From 58d07d06fe25961867d0443e3a9021c706b64de9 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Wed, 29 Apr 2026 22:29:15 -0400 Subject: [PATCH 3/5] fix: update indexing service fingerprint constants and nightly fmt Adding ParquetMergePolicyConfig and ParquetIndexingConfig to IndexingSettings changes the Hash output, which changes the pipeline params fingerprints. Updated the hardcoded test constants. Added a comment explaining how to recompute them when IndexingSettings fields change. Co-Authored-By: Claude Opus 4.6 (1M context) --- quickwit/quickwit-config/Cargo.toml | 1 + .../quickwit-config/src/index_config/mod.rs | 52 ++-- .../src/merge_policy_config.rs | 4 + quickwit/quickwit-indexing/Cargo.toml | 2 +- .../src/actors/indexing_service.rs | 10 +- .../file-backed-index/v0.7.expected.json | 224 +++++++++--------- .../file-backed-index/v0.8.expected.json | 224 +++++++++--------- .../file-backed-index/v0.9.expected.json | 224 +++++++++--------- .../test-data/file-backed-index/v0.9.json | 224 +++++++++--------- .../index-metadata/v0.9.expected.json | 6 +- .../test-data/index-metadata/v0.9.json | 6 +- .../test-data/manifest/v0.7.expected.json | 6 +- .../test-data/manifest/v0.8.expected.json | 2 +- .../test-data/manifest/v0.9.json | 6 +- 14 files changed, 508 insertions(+), 483 deletions(-) diff --git a/quickwit/quickwit-config/Cargo.toml b/quickwit/quickwit-config/Cargo.toml index 93a1fdf8de1..aebea5e9c31 100644 --- a/quickwit/quickwit-config/Cargo.toml +++ b/quickwit/quickwit-config/Cargo.toml @@ -45,5 +45,6 @@ quickwit-common = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true, features = ["testsuite"] } [features] +metrics = [] testsuite = [] vrl = ["dep:vrl"] diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index d4098f315dd..d536e4edab5 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -36,7 +36,9 @@ use siphasher::sip::SipHasher; use tracing::warn; use crate::index_config::serialize::VersionedIndexConfig; -use crate::merge_policy_config::{MergePolicyConfig, ParquetMergePolicyConfig}; +use crate::merge_policy_config::MergePolicyConfig; +#[cfg(feature = "metrics")] +use crate::merge_policy_config::ParquetMergePolicyConfig; #[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)] #[serde(deny_unknown_fields)] @@ -118,15 +120,14 @@ pub struct IndexingSettings { pub split_num_docs_target: usize, #[serde(default)] pub merge_policy: MergePolicyConfig, - /// Merge policy for Parquet (metrics/sketches) splits. Controls how - /// Parquet splits are compacted within time windows. Only used by - /// indexes that use the Parquet indexing pipeline. - #[serde(default)] - pub parquet_merge_policy: ParquetMergePolicyConfig, - /// Parquet-specific indexing settings (sort schema, window duration, - /// compression). Only used by indexes that use the Parquet pipeline. - #[serde(default)] - pub parquet_indexing: ParquetIndexingConfig, + /// Merge policy for Parquet (metrics/sketches) splits. + #[cfg(feature = "metrics")] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub parquet_merge_policy: Option, + /// Parquet-specific indexing settings (sort schema, window duration). + #[cfg(feature = "metrics")] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub parquet_indexing: Option, #[serde(default)] pub resources: IndexingResources, } @@ -147,12 +148,11 @@ pub struct ParquetIndexingConfig { /// (`__s` = string, `__i` = int64, `_secs` = uint64 timestamp). /// /// The sort order determines: - /// - **Query pruning**: queries that filter on leading sort columns can - /// skip entire splits whose row key ranges don't match. - /// - **Compression**: columns with good locality (e.g., metric_name first) - /// compress better in Parquet's columnar format. - /// - **Compaction scope**: splits with different sort schemas are never - /// merged together. + /// - **Query pruning**: queries that filter on leading sort columns can skip entire splits + /// whose row key ranges don't match. + /// - **Compression**: columns with good locality (e.g., metric_name first) compress better in + /// Parquet's columnar format. + /// - **Compaction scope**: splits with different sort schemas are never merged together. /// /// When `None`, the product-type default is used (see below). /// @@ -208,6 +208,20 @@ impl IndexingSettings { Duration::from_secs(self.commit_timeout_secs as u64) } + /// Returns the Parquet merge policy config, using defaults if not + /// explicitly configured. + #[cfg(feature = "metrics")] + pub fn parquet_merge_policy(&self) -> ParquetMergePolicyConfig { + self.parquet_merge_policy.clone().unwrap_or_default() + } + + /// Returns the Parquet indexing config, using defaults if not + /// explicitly configured. + #[cfg(feature = "metrics")] + pub fn parquet_indexing(&self) -> ParquetIndexingConfig { + self.parquet_indexing.clone().unwrap_or_default() + } + fn default_commit_timeout_secs() -> usize { 60 } @@ -241,8 +255,10 @@ impl Default for IndexingSettings { docstore_compression_level: Self::default_docstore_compression_level(), split_num_docs_target: Self::default_split_num_docs_target(), merge_policy: MergePolicyConfig::default(), - parquet_merge_policy: ParquetMergePolicyConfig::default(), - parquet_indexing: ParquetIndexingConfig::default(), + #[cfg(feature = "metrics")] + parquet_merge_policy: None, + #[cfg(feature = "metrics")] + parquet_indexing: None, resources: IndexingResources::default(), } } diff --git a/quickwit/quickwit-config/src/merge_policy_config.rs b/quickwit/quickwit-config/src/merge_policy_config.rs index d3357b288c0..d3c996891cf 100644 --- a/quickwit/quickwit-config/src/merge_policy_config.rs +++ b/quickwit/quickwit-config/src/merge_policy_config.rs @@ -120,6 +120,10 @@ impl Default for StableLogMergePolicyConfig { } // --- Parquet merge policy config --- +// +// The types are always available (for OpenAPI schema generation in +// quickwit-serve). The IndexingSettings fields that use them are +// gated behind cfg(feature = "metrics"). fn default_target_split_size_bytes() -> u64 { 256 * 1024 * 1024 // 256 MiB diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml index 21827a4fc02..797b59a5e97 100644 --- a/quickwit/quickwit-indexing/Cargo.toml +++ b/quickwit/quickwit-indexing/Cargo.toml @@ -105,7 +105,7 @@ testsuite = [ "quickwit-proto/testsuite", "quickwit-storage/testsuite" ] -metrics = ["dep:arrow", "dep:quickwit-parquet-engine", "quickwit-doc-mapper/metrics"] +metrics = ["dep:arrow", "dep:quickwit-parquet-engine", "quickwit-doc-mapper/metrics", "quickwit-config/metrics"] vrl = ["dep:vrl", "quickwit-config/vrl"] postgres = ["quickwit-metastore/postgres"] ci-test = [] diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index a8197d0058c..960ec903bac 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -1250,9 +1250,13 @@ mod tests { #[tokio::test] async fn test_indexing_service_apply_plan() { - const PARAMS_FINGERPRINT_INGEST_API: u64 = 1637744865450232394; - const PARAMS_FINGERPRINT_SOURCE_1: u64 = 1705211905504908791; - const PARAMS_FINGERPRINT_SOURCE_2: u64 = 8706667372658059428; + // These fingerprints are hashes of IndexConfig + SourceConfig. They + // change whenever IndexingSettings fields are added/removed. Recompute + // by temporarily adding a test that prints + // `indexing_pipeline_params_fingerprint(&index_config, &source_config)`. + const PARAMS_FINGERPRINT_INGEST_API: u64 = 7973087274884969148; + const PARAMS_FINGERPRINT_SOURCE_1: u64 = 9420938500552890840; + const PARAMS_FINGERPRINT_SOURCE_2: u64 = 16199199787360162635; quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json index cb00de2fbd8..ef890a7d291 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.7.expected.json @@ -1,200 +1,200 @@ { - "version": "0.9", + "delete_tasks": [ + { + "create_timestamp": 0, + "delete_query": { + "index_uid": "my-index:00000000000000000000000000", + "query_ast": "{\"type\":\"bool\",\"must\":[{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Harry\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}}},{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Potter\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}}}]}" + }, + "opstamp": 10 + } + ], "index": { - "version": "0.9", - "index_uid": "my-index:00000000000000000000000000", + "checkpoint": { + "kafka-source": { + "00000000000000000000": "00000000000000000042" + } + }, + "create_timestamp": 1789, "index_config": { - "version": "0.9", - "index_id": "my-index", - "index_uri": "s3://quickwit-indexes/my-index", "doc_mapping": { "doc_mapping_uid": "00000000000000000000000000", - "mode": "dynamic", "dynamic_mapping": { - "indexed": true, - "tokenizer": "raw", - "record": "basic", - "stored": true, "expand_dots": true, "fast": { "normalizer": "raw" - } + }, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "raw" }, "field_mappings": [ { + "coerce": true, + "fast": true, + "indexed": true, "name": "tenant_id", - "type": "u64", + "output_format": "number", "stored": true, - "indexed": true, - "fast": true, - "coerce": true, - "output_format": "number" + "type": "u64" }, { - "name": "timestamp", - "type": "datetime", + "fast": true, + "fast_precision": "seconds", + "indexed": true, "input_formats": [ "rfc3339", "unix_timestamp" ], + "name": "timestamp", "output_format": "rfc3339", - "fast_precision": "seconds", - "indexed": true, "stored": true, - "fast": true + "type": "datetime" }, { - "name": "log_level", - "type": "text", + "fast": false, + "fieldnorms": false, "indexed": true, - "tokenizer": "raw", + "name": "log_level", "record": "basic", - "fieldnorms": false, "stored": true, - "fast": false + "tokenizer": "raw", + "type": "text" }, { - "name": "message", - "type": "text", + "fast": false, + "fieldnorms": false, "indexed": true, - "tokenizer": "default", + "name": "message", "record": "position", - "fieldnorms": false, "stored": true, - "fast": false + "tokenizer": "default", + "type": "text" } ], - "timestamp_field": "timestamp", + "index_field_presence": true, + "max_num_partitions": 100, + "mode": "dynamic", + "partition_key": "tenant_id", + "store_document_size": false, + "store_source": true, "tag_fields": [ "log_level", "tenant_id" ], - "partition_key": "tenant_id", - "max_num_partitions": 100, - "index_field_presence": true, - "store_document_size": false, - "store_source": true, + "timestamp_field": "timestamp", "tokenizers": [ { + "filters": [], "name": "custom_tokenizer", - "type": "regex", "pattern": "[^\\p{L}\\p{N}]+", - "filters": [] + "type": "regex" } ] }, + "index_id": "my-index", + "index_uri": "s3://quickwit-indexes/my-index", "indexing_settings": { "commit_timeout_secs": 301, - "docstore_compression_level": 8, "docstore_blocksize": 1000000, - "split_num_docs_target": 10000001, + "docstore_compression_level": 8, "merge_policy": { - "type": "stable_log", - "min_level_num_docs": 100000, - "merge_factor": 9, + "maturation_period": "2days", "max_merge_factor": 11, - "maturation_period": "2days" + "merge_factor": 9, + "min_level_num_docs": 100000, + "type": "stable_log" }, "resources": { "heap_size": 50000000 - } + }, + "split_num_docs_target": 10000001 }, "ingest_settings": { "min_shards": 1 }, + "retention": { + "period": "90 days", + "schedule": "daily" + }, "search_settings": { "default_search_fields": [ "message" ] }, - "retention": { - "period": "90 days", - "schedule": "daily" - } - }, - "checkpoint": { - "kafka-source": { - "00000000000000000000": "00000000000000000042" - } + "version": "0.9" }, - "create_timestamp": 1789, + "index_uid": "my-index:00000000000000000000000000", "sources": [ { - "version": "0.9", - "source_id": "kafka-source", - "num_pipelines": 2, "enabled": true, - "source_type": "kafka", + "input_format": "json", + "num_pipelines": 2, "params": { - "topic": "kafka-topic", - "client_params": {} + "client_params": {}, + "topic": "kafka-topic" }, + "source_id": "kafka-source", + "source_type": "kafka", "transform": { "script": ".message = downcase(string!(.message))", "timezone": "UTC" }, - "input_format": "json" + "version": "0.9" + } + ], + "version": "0.9" + }, + "shards": { + "_ingest-source": [ + { + "doc_mapping_uid": "00000000000000000000000000", + "follower_id": "follower-ingester", + "index_uid": "my-index:00000000000000000000000000", + "leader_id": "leader-ingester", + "publish_position_inclusive": "", + "shard_id": "00000000000000000001", + "shard_state": 1, + "source_id": "_ingest-source", + "update_timestamp": 1704067200 } ] }, "splits": [ { - "split_state": "Published", - "update_timestamp": 1789, - "publish_timestamp": 1789, - "version": "0.9", - "split_id": "split", - "index_uid": "my-index:00000000000000000000000000", - "partition_id": 7, - "source_id": "source", - "node_id": "node", - "num_docs": 12303, - "uncompressed_docs_size_in_bytes": 234234, - "time_range": { - "start": 121000, - "end": 130198 - }, "create_timestamp": 3, + "delete_opstamp": 10, + "doc_mapping_uid": "00000000000000000000000000", + "footer_offsets": { + "end": 2000, + "start": 1000 + }, + "index_uid": "my-index:00000000000000000000000000", "maturity": { - "type": "immature", - "maturation_period_millis": 4000 + "maturation_period_millis": 4000, + "type": "immature" }, + "node_id": "node", + "num_docs": 12303, + "num_merge_ops": 3, + "partition_id": 7, + "publish_timestamp": 1789, + "source_id": "source", + "split_id": "split", + "split_state": "Published", "tags": [ "234", "aaa" ], - "footer_offsets": { - "start": 1000, - "end": 2000 + "time_range": { + "end": 130198, + "start": 121000 }, - "delete_opstamp": 10, - "num_merge_ops": 3, - "doc_mapping_uid": "00000000000000000000000000" + "uncompressed_docs_size_in_bytes": 234234, + "update_timestamp": 1789, + "version": "0.9" } ], - "shards": { - "_ingest-source": [ - { - "index_uid": "my-index:00000000000000000000000000", - "source_id": "_ingest-source", - "shard_id": "00000000000000000001", - "leader_id": "leader-ingester", - "follower_id": "follower-ingester", - "shard_state": 1, - "publish_position_inclusive": "", - "doc_mapping_uid": "00000000000000000000000000", - "update_timestamp": 1704067200 - } - ] - }, - "delete_tasks": [ - { - "create_timestamp": 0, - "opstamp": 10, - "delete_query": { - "index_uid": "my-index:00000000000000000000000000", - "query_ast": "{\"type\":\"bool\",\"must\":[{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Harry\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}}},{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Potter\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}}}]}" - } - } - ] + "version": "0.9" } diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json index cb00de2fbd8..ef890a7d291 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.8.expected.json @@ -1,200 +1,200 @@ { - "version": "0.9", + "delete_tasks": [ + { + "create_timestamp": 0, + "delete_query": { + "index_uid": "my-index:00000000000000000000000000", + "query_ast": "{\"type\":\"bool\",\"must\":[{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Harry\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}}},{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Potter\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}}}]}" + }, + "opstamp": 10 + } + ], "index": { - "version": "0.9", - "index_uid": "my-index:00000000000000000000000000", + "checkpoint": { + "kafka-source": { + "00000000000000000000": "00000000000000000042" + } + }, + "create_timestamp": 1789, "index_config": { - "version": "0.9", - "index_id": "my-index", - "index_uri": "s3://quickwit-indexes/my-index", "doc_mapping": { "doc_mapping_uid": "00000000000000000000000000", - "mode": "dynamic", "dynamic_mapping": { - "indexed": true, - "tokenizer": "raw", - "record": "basic", - "stored": true, "expand_dots": true, "fast": { "normalizer": "raw" - } + }, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "raw" }, "field_mappings": [ { + "coerce": true, + "fast": true, + "indexed": true, "name": "tenant_id", - "type": "u64", + "output_format": "number", "stored": true, - "indexed": true, - "fast": true, - "coerce": true, - "output_format": "number" + "type": "u64" }, { - "name": "timestamp", - "type": "datetime", + "fast": true, + "fast_precision": "seconds", + "indexed": true, "input_formats": [ "rfc3339", "unix_timestamp" ], + "name": "timestamp", "output_format": "rfc3339", - "fast_precision": "seconds", - "indexed": true, "stored": true, - "fast": true + "type": "datetime" }, { - "name": "log_level", - "type": "text", + "fast": false, + "fieldnorms": false, "indexed": true, - "tokenizer": "raw", + "name": "log_level", "record": "basic", - "fieldnorms": false, "stored": true, - "fast": false + "tokenizer": "raw", + "type": "text" }, { - "name": "message", - "type": "text", + "fast": false, + "fieldnorms": false, "indexed": true, - "tokenizer": "default", + "name": "message", "record": "position", - "fieldnorms": false, "stored": true, - "fast": false + "tokenizer": "default", + "type": "text" } ], - "timestamp_field": "timestamp", + "index_field_presence": true, + "max_num_partitions": 100, + "mode": "dynamic", + "partition_key": "tenant_id", + "store_document_size": false, + "store_source": true, "tag_fields": [ "log_level", "tenant_id" ], - "partition_key": "tenant_id", - "max_num_partitions": 100, - "index_field_presence": true, - "store_document_size": false, - "store_source": true, + "timestamp_field": "timestamp", "tokenizers": [ { + "filters": [], "name": "custom_tokenizer", - "type": "regex", "pattern": "[^\\p{L}\\p{N}]+", - "filters": [] + "type": "regex" } ] }, + "index_id": "my-index", + "index_uri": "s3://quickwit-indexes/my-index", "indexing_settings": { "commit_timeout_secs": 301, - "docstore_compression_level": 8, "docstore_blocksize": 1000000, - "split_num_docs_target": 10000001, + "docstore_compression_level": 8, "merge_policy": { - "type": "stable_log", - "min_level_num_docs": 100000, - "merge_factor": 9, + "maturation_period": "2days", "max_merge_factor": 11, - "maturation_period": "2days" + "merge_factor": 9, + "min_level_num_docs": 100000, + "type": "stable_log" }, "resources": { "heap_size": 50000000 - } + }, + "split_num_docs_target": 10000001 }, "ingest_settings": { "min_shards": 1 }, + "retention": { + "period": "90 days", + "schedule": "daily" + }, "search_settings": { "default_search_fields": [ "message" ] }, - "retention": { - "period": "90 days", - "schedule": "daily" - } - }, - "checkpoint": { - "kafka-source": { - "00000000000000000000": "00000000000000000042" - } + "version": "0.9" }, - "create_timestamp": 1789, + "index_uid": "my-index:00000000000000000000000000", "sources": [ { - "version": "0.9", - "source_id": "kafka-source", - "num_pipelines": 2, "enabled": true, - "source_type": "kafka", + "input_format": "json", + "num_pipelines": 2, "params": { - "topic": "kafka-topic", - "client_params": {} + "client_params": {}, + "topic": "kafka-topic" }, + "source_id": "kafka-source", + "source_type": "kafka", "transform": { "script": ".message = downcase(string!(.message))", "timezone": "UTC" }, - "input_format": "json" + "version": "0.9" + } + ], + "version": "0.9" + }, + "shards": { + "_ingest-source": [ + { + "doc_mapping_uid": "00000000000000000000000000", + "follower_id": "follower-ingester", + "index_uid": "my-index:00000000000000000000000000", + "leader_id": "leader-ingester", + "publish_position_inclusive": "", + "shard_id": "00000000000000000001", + "shard_state": 1, + "source_id": "_ingest-source", + "update_timestamp": 1704067200 } ] }, "splits": [ { - "split_state": "Published", - "update_timestamp": 1789, - "publish_timestamp": 1789, - "version": "0.9", - "split_id": "split", - "index_uid": "my-index:00000000000000000000000000", - "partition_id": 7, - "source_id": "source", - "node_id": "node", - "num_docs": 12303, - "uncompressed_docs_size_in_bytes": 234234, - "time_range": { - "start": 121000, - "end": 130198 - }, "create_timestamp": 3, + "delete_opstamp": 10, + "doc_mapping_uid": "00000000000000000000000000", + "footer_offsets": { + "end": 2000, + "start": 1000 + }, + "index_uid": "my-index:00000000000000000000000000", "maturity": { - "type": "immature", - "maturation_period_millis": 4000 + "maturation_period_millis": 4000, + "type": "immature" }, + "node_id": "node", + "num_docs": 12303, + "num_merge_ops": 3, + "partition_id": 7, + "publish_timestamp": 1789, + "source_id": "source", + "split_id": "split", + "split_state": "Published", "tags": [ "234", "aaa" ], - "footer_offsets": { - "start": 1000, - "end": 2000 + "time_range": { + "end": 130198, + "start": 121000 }, - "delete_opstamp": 10, - "num_merge_ops": 3, - "doc_mapping_uid": "00000000000000000000000000" + "uncompressed_docs_size_in_bytes": 234234, + "update_timestamp": 1789, + "version": "0.9" } ], - "shards": { - "_ingest-source": [ - { - "index_uid": "my-index:00000000000000000000000000", - "source_id": "_ingest-source", - "shard_id": "00000000000000000001", - "leader_id": "leader-ingester", - "follower_id": "follower-ingester", - "shard_state": 1, - "publish_position_inclusive": "", - "doc_mapping_uid": "00000000000000000000000000", - "update_timestamp": 1704067200 - } - ] - }, - "delete_tasks": [ - { - "create_timestamp": 0, - "opstamp": 10, - "delete_query": { - "index_uid": "my-index:00000000000000000000000000", - "query_ast": "{\"type\":\"bool\",\"must\":[{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Harry\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}}},{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Potter\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}}}]}" - } - } - ] + "version": "0.9" } diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json index cf23e2349e5..0d576bbc777 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.expected.json @@ -1,200 +1,200 @@ { - "version": "0.9", + "delete_tasks": [ + { + "create_timestamp": 0, + "delete_query": { + "index_uid": "my-index:00000000000000000000000001", + "query_ast": "{\"type\":\"bool\",\"must\":[{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Harry\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}},\"lenient\":false},{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Potter\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}},\"lenient\":false}]}" + }, + "opstamp": 10 + } + ], "index": { - "version": "0.9", - "index_uid": "my-index:00000000000000000000000001", + "checkpoint": { + "kafka-source": { + "00000000000000000000": "00000000000000000042" + } + }, + "create_timestamp": 1789, "index_config": { - "version": "0.9", - "index_id": "my-index", - "index_uri": "s3://quickwit-indexes/my-index", "doc_mapping": { "doc_mapping_uid": "00000000000000000000000001", - "mode": "dynamic", "dynamic_mapping": { - "indexed": true, - "tokenizer": "raw", - "record": "basic", - "stored": true, "expand_dots": true, "fast": { "normalizer": "raw" - } + }, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "raw" }, "field_mappings": [ { + "coerce": true, + "fast": true, + "indexed": true, "name": "tenant_id", - "type": "u64", + "output_format": "number", "stored": true, - "indexed": true, - "fast": true, - "coerce": true, - "output_format": "number" + "type": "u64" }, { - "name": "timestamp", - "type": "datetime", + "fast": true, + "fast_precision": "seconds", + "indexed": true, "input_formats": [ "rfc3339", "unix_timestamp" ], + "name": "timestamp", "output_format": "rfc3339", - "fast_precision": "seconds", - "indexed": true, "stored": true, - "fast": true + "type": "datetime" }, { - "name": "log_level", - "type": "text", + "fast": false, + "fieldnorms": false, "indexed": true, - "tokenizer": "raw", + "name": "log_level", "record": "basic", - "fieldnorms": false, "stored": true, - "fast": false + "tokenizer": "raw", + "type": "text" }, { - "name": "message", - "type": "text", + "fast": false, + "fieldnorms": false, "indexed": true, - "tokenizer": "default", + "name": "message", "record": "position", - "fieldnorms": false, "stored": true, - "fast": false + "tokenizer": "default", + "type": "text" } ], - "timestamp_field": "timestamp", + "index_field_presence": true, + "max_num_partitions": 100, + "mode": "dynamic", + "partition_key": "tenant_id", + "store_document_size": false, + "store_source": true, "tag_fields": [ "log_level", "tenant_id" ], - "partition_key": "tenant_id", - "max_num_partitions": 100, - "index_field_presence": true, - "store_document_size": false, - "store_source": true, + "timestamp_field": "timestamp", "tokenizers": [ { + "filters": [], "name": "custom_tokenizer", - "type": "regex", "pattern": "[^\\p{L}\\p{N}]+", - "filters": [] + "type": "regex" } ] }, + "index_id": "my-index", + "index_uri": "s3://quickwit-indexes/my-index", "indexing_settings": { "commit_timeout_secs": 301, - "docstore_compression_level": 8, "docstore_blocksize": 1000000, - "split_num_docs_target": 10000001, + "docstore_compression_level": 8, "merge_policy": { - "type": "stable_log", - "min_level_num_docs": 100000, - "merge_factor": 9, + "maturation_period": "2days", "max_merge_factor": 11, - "maturation_period": "2days" + "merge_factor": 9, + "min_level_num_docs": 100000, + "type": "stable_log" }, "resources": { "heap_size": 50000000 - } + }, + "split_num_docs_target": 10000001 }, "ingest_settings": { "min_shards": 12 }, + "retention": { + "period": "90 days", + "schedule": "daily" + }, "search_settings": { "default_search_fields": [ "message" ] }, - "retention": { - "period": "90 days", - "schedule": "daily" - } - }, - "checkpoint": { - "kafka-source": { - "00000000000000000000": "00000000000000000042" - } + "version": "0.9" }, - "create_timestamp": 1789, + "index_uid": "my-index:00000000000000000000000001", "sources": [ { - "version": "0.9", - "source_id": "kafka-source", - "num_pipelines": 2, "enabled": true, - "source_type": "kafka", + "input_format": "json", + "num_pipelines": 2, "params": { - "topic": "kafka-topic", - "client_params": {} + "client_params": {}, + "topic": "kafka-topic" }, + "source_id": "kafka-source", + "source_type": "kafka", "transform": { "script": ".message = downcase(string!(.message))", "timezone": "UTC" }, - "input_format": "json" + "version": "0.9" + } + ], + "version": "0.9" + }, + "shards": { + "_ingest-source": [ + { + "doc_mapping_uid": "00000000000000000000000001", + "follower_id": "follower-ingester", + "index_uid": "my-index:00000000000000000000000001", + "leader_id": "leader-ingester", + "publish_position_inclusive": "", + "shard_id": "00000000000000000001", + "shard_state": 1, + "source_id": "_ingest-source", + "update_timestamp": 1724240908 } ] }, "splits": [ { - "split_state": "Published", - "update_timestamp": 1789, - "publish_timestamp": 1789, - "version": "0.9", - "split_id": "split", - "index_uid": "my-index:00000000000000000000000001", - "partition_id": 7, - "source_id": "source", - "node_id": "node", - "num_docs": 12303, - "uncompressed_docs_size_in_bytes": 234234, - "time_range": { - "start": 121000, - "end": 130198 - }, "create_timestamp": 3, + "delete_opstamp": 10, + "doc_mapping_uid": "00000000000000000000000000", + "footer_offsets": { + "end": 2000, + "start": 1000 + }, + "index_uid": "my-index:00000000000000000000000001", "maturity": { - "type": "immature", - "maturation_period_millis": 4000 + "maturation_period_millis": 4000, + "type": "immature" }, + "node_id": "node", + "num_docs": 12303, + "num_merge_ops": 3, + "partition_id": 7, + "publish_timestamp": 1789, + "source_id": "source", + "split_id": "split", + "split_state": "Published", "tags": [ "234", "aaa" ], - "footer_offsets": { - "start": 1000, - "end": 2000 + "time_range": { + "end": 130198, + "start": 121000 }, - "delete_opstamp": 10, - "num_merge_ops": 3, - "doc_mapping_uid": "00000000000000000000000000" + "uncompressed_docs_size_in_bytes": 234234, + "update_timestamp": 1789, + "version": "0.9" } ], - "shards": { - "_ingest-source": [ - { - "index_uid": "my-index:00000000000000000000000001", - "source_id": "_ingest-source", - "shard_id": "00000000000000000001", - "leader_id": "leader-ingester", - "follower_id": "follower-ingester", - "shard_state": 1, - "publish_position_inclusive": "", - "doc_mapping_uid": "00000000000000000000000001", - "update_timestamp": 1724240908 - } - ] - }, - "delete_tasks": [ - { - "create_timestamp": 0, - "opstamp": 10, - "delete_query": { - "index_uid": "my-index:00000000000000000000000001", - "query_ast": "{\"type\":\"bool\",\"must\":[{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Harry\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}},\"lenient\":false},{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Potter\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}},\"lenient\":false}]}" - } - } - ] + "version": "0.9" } diff --git a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json index cf23e2349e5..0d576bbc777 100644 --- a/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json +++ b/quickwit/quickwit-metastore/test-data/file-backed-index/v0.9.json @@ -1,200 +1,200 @@ { - "version": "0.9", + "delete_tasks": [ + { + "create_timestamp": 0, + "delete_query": { + "index_uid": "my-index:00000000000000000000000001", + "query_ast": "{\"type\":\"bool\",\"must\":[{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Harry\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}},\"lenient\":false},{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Potter\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}},\"lenient\":false}]}" + }, + "opstamp": 10 + } + ], "index": { - "version": "0.9", - "index_uid": "my-index:00000000000000000000000001", + "checkpoint": { + "kafka-source": { + "00000000000000000000": "00000000000000000042" + } + }, + "create_timestamp": 1789, "index_config": { - "version": "0.9", - "index_id": "my-index", - "index_uri": "s3://quickwit-indexes/my-index", "doc_mapping": { "doc_mapping_uid": "00000000000000000000000001", - "mode": "dynamic", "dynamic_mapping": { - "indexed": true, - "tokenizer": "raw", - "record": "basic", - "stored": true, "expand_dots": true, "fast": { "normalizer": "raw" - } + }, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "raw" }, "field_mappings": [ { + "coerce": true, + "fast": true, + "indexed": true, "name": "tenant_id", - "type": "u64", + "output_format": "number", "stored": true, - "indexed": true, - "fast": true, - "coerce": true, - "output_format": "number" + "type": "u64" }, { - "name": "timestamp", - "type": "datetime", + "fast": true, + "fast_precision": "seconds", + "indexed": true, "input_formats": [ "rfc3339", "unix_timestamp" ], + "name": "timestamp", "output_format": "rfc3339", - "fast_precision": "seconds", - "indexed": true, "stored": true, - "fast": true + "type": "datetime" }, { - "name": "log_level", - "type": "text", + "fast": false, + "fieldnorms": false, "indexed": true, - "tokenizer": "raw", + "name": "log_level", "record": "basic", - "fieldnorms": false, "stored": true, - "fast": false + "tokenizer": "raw", + "type": "text" }, { - "name": "message", - "type": "text", + "fast": false, + "fieldnorms": false, "indexed": true, - "tokenizer": "default", + "name": "message", "record": "position", - "fieldnorms": false, "stored": true, - "fast": false + "tokenizer": "default", + "type": "text" } ], - "timestamp_field": "timestamp", + "index_field_presence": true, + "max_num_partitions": 100, + "mode": "dynamic", + "partition_key": "tenant_id", + "store_document_size": false, + "store_source": true, "tag_fields": [ "log_level", "tenant_id" ], - "partition_key": "tenant_id", - "max_num_partitions": 100, - "index_field_presence": true, - "store_document_size": false, - "store_source": true, + "timestamp_field": "timestamp", "tokenizers": [ { + "filters": [], "name": "custom_tokenizer", - "type": "regex", "pattern": "[^\\p{L}\\p{N}]+", - "filters": [] + "type": "regex" } ] }, + "index_id": "my-index", + "index_uri": "s3://quickwit-indexes/my-index", "indexing_settings": { "commit_timeout_secs": 301, - "docstore_compression_level": 8, "docstore_blocksize": 1000000, - "split_num_docs_target": 10000001, + "docstore_compression_level": 8, "merge_policy": { - "type": "stable_log", - "min_level_num_docs": 100000, - "merge_factor": 9, + "maturation_period": "2days", "max_merge_factor": 11, - "maturation_period": "2days" + "merge_factor": 9, + "min_level_num_docs": 100000, + "type": "stable_log" }, "resources": { "heap_size": 50000000 - } + }, + "split_num_docs_target": 10000001 }, "ingest_settings": { "min_shards": 12 }, + "retention": { + "period": "90 days", + "schedule": "daily" + }, "search_settings": { "default_search_fields": [ "message" ] }, - "retention": { - "period": "90 days", - "schedule": "daily" - } - }, - "checkpoint": { - "kafka-source": { - "00000000000000000000": "00000000000000000042" - } + "version": "0.9" }, - "create_timestamp": 1789, + "index_uid": "my-index:00000000000000000000000001", "sources": [ { - "version": "0.9", - "source_id": "kafka-source", - "num_pipelines": 2, "enabled": true, - "source_type": "kafka", + "input_format": "json", + "num_pipelines": 2, "params": { - "topic": "kafka-topic", - "client_params": {} + "client_params": {}, + "topic": "kafka-topic" }, + "source_id": "kafka-source", + "source_type": "kafka", "transform": { "script": ".message = downcase(string!(.message))", "timezone": "UTC" }, - "input_format": "json" + "version": "0.9" + } + ], + "version": "0.9" + }, + "shards": { + "_ingest-source": [ + { + "doc_mapping_uid": "00000000000000000000000001", + "follower_id": "follower-ingester", + "index_uid": "my-index:00000000000000000000000001", + "leader_id": "leader-ingester", + "publish_position_inclusive": "", + "shard_id": "00000000000000000001", + "shard_state": 1, + "source_id": "_ingest-source", + "update_timestamp": 1724240908 } ] }, "splits": [ { - "split_state": "Published", - "update_timestamp": 1789, - "publish_timestamp": 1789, - "version": "0.9", - "split_id": "split", - "index_uid": "my-index:00000000000000000000000001", - "partition_id": 7, - "source_id": "source", - "node_id": "node", - "num_docs": 12303, - "uncompressed_docs_size_in_bytes": 234234, - "time_range": { - "start": 121000, - "end": 130198 - }, "create_timestamp": 3, + "delete_opstamp": 10, + "doc_mapping_uid": "00000000000000000000000000", + "footer_offsets": { + "end": 2000, + "start": 1000 + }, + "index_uid": "my-index:00000000000000000000000001", "maturity": { - "type": "immature", - "maturation_period_millis": 4000 + "maturation_period_millis": 4000, + "type": "immature" }, + "node_id": "node", + "num_docs": 12303, + "num_merge_ops": 3, + "partition_id": 7, + "publish_timestamp": 1789, + "source_id": "source", + "split_id": "split", + "split_state": "Published", "tags": [ "234", "aaa" ], - "footer_offsets": { - "start": 1000, - "end": 2000 + "time_range": { + "end": 130198, + "start": 121000 }, - "delete_opstamp": 10, - "num_merge_ops": 3, - "doc_mapping_uid": "00000000000000000000000000" + "uncompressed_docs_size_in_bytes": 234234, + "update_timestamp": 1789, + "version": "0.9" } ], - "shards": { - "_ingest-source": [ - { - "index_uid": "my-index:00000000000000000000000001", - "source_id": "_ingest-source", - "shard_id": "00000000000000000001", - "leader_id": "leader-ingester", - "follower_id": "follower-ingester", - "shard_state": 1, - "publish_position_inclusive": "", - "doc_mapping_uid": "00000000000000000000000001", - "update_timestamp": 1724240908 - } - ] - }, - "delete_tasks": [ - { - "create_timestamp": 0, - "opstamp": 10, - "delete_query": { - "index_uid": "my-index:00000000000000000000000001", - "query_ast": "{\"type\":\"bool\",\"must\":[{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Harry\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}},\"lenient\":false},{\"type\":\"full_text\",\"field\":\"body\",\"text\":\"Potter\",\"params\":{\"mode\":{\"type\":\"phrase_fallback_to_intersection\"}},\"lenient\":false}]}" - } - } - ] + "version": "0.9" } diff --git a/quickwit/quickwit-metastore/test-data/index-metadata/v0.9.expected.json b/quickwit/quickwit-metastore/test-data/index-metadata/v0.9.expected.json index 8fca7405352..d8b0ec2dc8b 100644 --- a/quickwit/quickwit-metastore/test-data/index-metadata/v0.9.expected.json +++ b/quickwit/quickwit-metastore/test-data/index-metadata/v0.9.expected.json @@ -100,13 +100,13 @@ }, "split_num_docs_target": 10000001 }, + "ingest_settings": { + "min_shards": 12 + }, "retention": { "period": "90 days", "schedule": "daily" }, - "ingest_settings": { - "min_shards": 12 - }, "search_settings": { "default_search_fields": [ "message" diff --git a/quickwit/quickwit-metastore/test-data/index-metadata/v0.9.json b/quickwit/quickwit-metastore/test-data/index-metadata/v0.9.json index 8fca7405352..d8b0ec2dc8b 100644 --- a/quickwit/quickwit-metastore/test-data/index-metadata/v0.9.json +++ b/quickwit/quickwit-metastore/test-data/index-metadata/v0.9.json @@ -100,13 +100,13 @@ }, "split_num_docs_target": 10000001 }, + "ingest_settings": { + "min_shards": 12 + }, "retention": { "period": "90 days", "schedule": "daily" }, - "ingest_settings": { - "min_shards": 12 - }, "search_settings": { "default_search_fields": [ "message" diff --git a/quickwit/quickwit-metastore/test-data/manifest/v0.7.expected.json b/quickwit/quickwit-metastore/test-data/manifest/v0.7.expected.json index 674e583d56f..303dcc533c6 100644 --- a/quickwit/quickwit-metastore/test-data/manifest/v0.7.expected.json +++ b/quickwit/quickwit-metastore/test-data/manifest/v0.7.expected.json @@ -74,14 +74,14 @@ }, "split_num_docs_target": 10000000 }, + "ingest_settings": { + "min_shards": 1 + }, "priority": 100, "retention": { "period": "42 days", "schedule": "daily" }, - "ingest_settings": { - "min_shards": 1 - }, "search_settings": { "default_search_fields": [] }, diff --git a/quickwit/quickwit-metastore/test-data/manifest/v0.8.expected.json b/quickwit/quickwit-metastore/test-data/manifest/v0.8.expected.json index 2b6819d6bbb..303dcc533c6 100644 --- a/quickwit/quickwit-metastore/test-data/manifest/v0.8.expected.json +++ b/quickwit/quickwit-metastore/test-data/manifest/v0.8.expected.json @@ -74,10 +74,10 @@ }, "split_num_docs_target": 10000000 }, - "priority": 100, "ingest_settings": { "min_shards": 1 }, + "priority": 100, "retention": { "period": "42 days", "schedule": "daily" diff --git a/quickwit/quickwit-metastore/test-data/manifest/v0.9.json b/quickwit/quickwit-metastore/test-data/manifest/v0.9.json index 914047f5421..11f3c3287a0 100644 --- a/quickwit/quickwit-metastore/test-data/manifest/v0.9.json +++ b/quickwit/quickwit-metastore/test-data/manifest/v0.9.json @@ -74,14 +74,14 @@ }, "split_num_docs_target": 10000000 }, + "ingest_settings": { + "min_shards": 1 + }, "priority": 100, "retention": { "period": "42 days", "schedule": "daily" }, - "ingest_settings": { - "min_shards": 1 - }, "search_settings": { "default_search_fields": [] }, From 6eda0e565b9fff7b003dc1f9f3db5a506c033c09 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Wed, 29 Apr 2026 11:25:17 -0400 Subject: [PATCH 4/5] feat: compute per-output split metadata in merge engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The merge engine now extracts metric_names, time_range, and low_cardinality_tags from each output file's actual rows during the merge write pass. Previously, MergeOutputFile only contained physical metadata (num_rows, size_bytes, row_keys, zonemaps). The downstream metadata_aggregation function inferred logical metadata by unioning all input splits — which is incorrect when num_outputs > 1, since each output contains only a subset of the globally sorted rows. Now each MergeOutputFile carries: - metric_names: distinct metrics in this output's rows - time_range: min/max timestamp_secs from this output's rows - low_cardinality_tags: service names from this output's rows Reuses existing extract_metric_names, extract_service_names, and extract_time_range from split_writer (made pub(crate)). Includes test that verifies per-output metadata is computed from actual rows when merging 2 inputs into 2 outputs with different metric names. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../quickwit-parquet-engine/src/merge/mod.rs | 15 ++++ .../src/merge/tests.rs | 76 ++++++++++++++++++- .../src/merge/writer.rs | 18 +++++ .../src/storage/mod.rs | 2 +- .../src/storage/split_writer.rs | 6 +- 5 files changed, 112 insertions(+), 5 deletions(-) diff --git a/quickwit/quickwit-parquet-engine/src/merge/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/mod.rs index c167860fe5a..c2dc2215b22 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/mod.rs @@ -70,6 +70,11 @@ struct InputMetadata { } /// Result of a single output file from the merge. +/// +/// Contains both physical metadata (file size, row count) and per-output +/// logical metadata (metric names, tags, time range) extracted from the +/// actual rows in this output file. When the merge produces multiple +/// outputs, each has metadata reflecting only its own rows. pub struct MergeOutputFile { /// Path to the output Parquet file. pub path: PathBuf, @@ -85,6 +90,16 @@ pub struct MergeOutputFile { /// Per-column zonemap regex strings. pub zonemap_regexes: std::collections::HashMap, + + /// Distinct metric names in this output file. + pub metric_names: std::collections::HashSet, + + /// Time range covered by rows in this output file. + pub time_range: crate::split::TimeRange, + + /// Low-cardinality tag values extracted from this output file's rows. + /// Currently tracks "service" to match the ingest path. + pub low_cardinality_tags: std::collections::HashMap>, } /// Merge N sorted Parquet files into M sorted output files. diff --git a/quickwit/quickwit-parquet-engine/src/merge/tests.rs b/quickwit/quickwit-parquet-engine/src/merge/tests.rs index fbf8933fe51..2eb8a46cc5a 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/tests.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/tests.rs @@ -274,10 +274,84 @@ fn test_merge_multiple_outputs() { let total_rows: usize = outputs.iter().map(|o| o.num_rows).sum(); assert_eq!(total_rows, 6); - // Each output should have row keys. + // Each output should have row keys and per-output metadata. + let mut all_metric_names: std::collections::HashSet = std::collections::HashSet::new(); for output in &outputs { assert!(output.row_keys_proto.is_some()); + // Each output has its own metric names (subset of all inputs). + assert!(!output.metric_names.is_empty()); + all_metric_names.extend(output.metric_names.iter().cloned()); + // Time range should be valid. + assert!(output.time_range.start_secs <= output.time_range.end_secs); } + // The union of all output metric names should be the full set. + assert!(all_metric_names.contains("alpha")); + assert!(all_metric_names.contains("beta")); + assert!(all_metric_names.contains("gamma")); +} + +/// Verifies that per-output metadata (metric_names, time_range) is computed +/// from the actual rows in each output file, not aggregated from all inputs. +#[test] +fn test_merge_per_output_metadata_from_actual_rows() { + let dir = TempDir::new().unwrap(); + + // Input 1: metric "cpu" at timestamps 100, 200 + let input1 = write_test_split( + dir.path(), + "in1.parquet", + &["cpu", "cpu"], + &[100, 200], + &[1.0, 2.0], + &[1, 1], + ); + // Input 2: metric "mem" at timestamps 300, 400 + let input2 = write_test_split( + dir.path(), + "in2.parquet", + &["mem", "mem"], + &[300, 400], + &[3.0, 4.0], + &[2, 2], + ); + + let output_dir = dir.path().join("output"); + std::fs::create_dir_all(&output_dir).unwrap(); + + // Single output: should contain all metrics and full time range. + let config = MergeConfig { + num_outputs: 1, + writer_config: ParquetWriterConfig::default(), + }; + let outputs = merge_sorted_parquet_files(&[input1.clone(), input2.clone()], &output_dir, &config).unwrap(); + assert_eq!(outputs.len(), 1); + let output = &outputs[0]; + assert!(output.metric_names.contains("cpu")); + assert!(output.metric_names.contains("mem")); + assert_eq!(output.time_range.start_secs, 100); + assert_eq!(output.time_range.end_secs, 401); // end is exclusive + + // Two outputs: each should have only its own metrics and time range. + let output_dir2 = dir.path().join("output2"); + std::fs::create_dir_all(&output_dir2).unwrap(); + let config2 = MergeConfig { + num_outputs: 2, + writer_config: ParquetWriterConfig::default(), + }; + let outputs2 = merge_sorted_parquet_files(&[input1, input2], &output_dir2, &config2).unwrap(); + assert_eq!(outputs2.len(), 2); + + // After sorted merge, "cpu" (sorted_series for tsid=1) and "mem" (tsid=2) + // are in separate outputs. Each output should have only one metric. + let output_a = &outputs2[0]; + let output_b = &outputs2[1]; + assert_eq!(output_a.metric_names.len(), 1); + assert_eq!(output_b.metric_names.len(), 1); + assert_ne!(output_a.metric_names, output_b.metric_names); + + // Time ranges should be disjoint or specific to each output's rows. + assert!(output_a.time_range.start_secs <= output_a.time_range.end_secs); + assert!(output_b.time_range.start_secs <= output_b.time_range.end_secs); } #[test] diff --git a/quickwit/quickwit-parquet-engine/src/merge/writer.rs b/quickwit/quickwit-parquet-engine/src/merge/writer.rs index 60879f7ddf6..c6766bd9afe 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/writer.rs @@ -39,6 +39,8 @@ use super::{InputMetadata, MergeConfig, MergeOutputFile}; use crate::row_keys; use crate::sort_fields::parse_sort_fields; use crate::sorted_series::SORTED_SERIES_COLUMN; +use crate::split::TAG_SERVICE; +use crate::storage::split_writer::{extract_metric_names, extract_service_names, extract_time_range}; use crate::storage::{ PARQUET_META_NUM_MERGE_OPS, PARQUET_META_ROW_KEYS, PARQUET_META_ROW_KEYS_JSON, PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START, @@ -114,6 +116,19 @@ pub fn write_merge_outputs( let output_filename = format!("merge_output_{}.parquet", Ulid::new()); let output_path = output_dir.join(&output_filename); + // Extract per-output logical metadata from the actual rows. + let metric_names = extract_metric_names(&sorted_batch) + .context("extracting metric names from merge output")?; + let time_range = extract_time_range(&sorted_batch) + .context("extracting time range from merge output")?; + let service_names = extract_service_names(&sorted_batch) + .context("extracting service names from merge output")?; + + let mut low_cardinality_tags = std::collections::HashMap::new(); + if !service_names.is_empty() { + low_cardinality_tags.insert(TAG_SERVICE.to_string(), service_names); + } + let size_bytes = write_parquet_file(&sorted_batch, &output_path, props)?; outputs.push(MergeOutputFile { @@ -122,6 +137,9 @@ pub fn write_merge_outputs( size_bytes, row_keys_proto, zonemap_regexes, + metric_names, + time_range, + low_cardinality_tags, }); } diff --git a/quickwit/quickwit-parquet-engine/src/storage/mod.rs b/quickwit/quickwit-parquet-engine/src/storage/mod.rs index 59e3059422a..8b87063c25e 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/mod.rs @@ -15,7 +15,7 @@ //! Storage layer for Parquet files. mod config; -mod split_writer; +pub(crate) mod split_writer; mod writer; pub use config::{Compression, ParquetWriterConfig}; diff --git a/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs b/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs index 100e46bddd1..7c7975b7f4b 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs @@ -170,7 +170,7 @@ impl ParquetSplitWriter { } /// Extracts the time range (min/max timestamp_secs) from a RecordBatch. -fn extract_time_range(batch: &RecordBatch) -> Result { +pub(crate) fn extract_time_range(batch: &RecordBatch) -> Result { let timestamp_idx = batch .schema() .index_of("timestamp_secs") @@ -194,7 +194,7 @@ fn extract_time_range(batch: &RecordBatch) -> Result Result, ParquetWriteError> { +pub(crate) fn extract_metric_names(batch: &RecordBatch) -> Result, ParquetWriteError> { let metric_idx = batch .schema() .index_of("metric_name") @@ -227,7 +227,7 @@ fn extract_metric_names(batch: &RecordBatch) -> Result, ParquetW } /// Extracts distinct service names from a RecordBatch. -fn extract_service_names(batch: &RecordBatch) -> Result, ParquetWriteError> { +pub(crate) fn extract_service_names(batch: &RecordBatch) -> Result, ParquetWriteError> { let service_idx = match batch.schema().index_of("service").ok() { Some(idx) => idx, None => return Ok(HashSet::new()), From c639cf6b45ad4b89c5e5b20d367be7ed503bc18b Mon Sep 17 00:00:00 2001 From: George Talbot Date: Wed, 29 Apr 2026 14:08:54 -0400 Subject: [PATCH 5/5] fix: nightly rustfmt import ordering Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/merge/tests.rs | 4 +- .../src/merge/writer.rs | 8 +- .../src/storage/split_writer.rs | 103 ++++++++++-------- 3 files changed, 68 insertions(+), 47 deletions(-) diff --git a/quickwit/quickwit-parquet-engine/src/merge/tests.rs b/quickwit/quickwit-parquet-engine/src/merge/tests.rs index 2eb8a46cc5a..385a42d40eb 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/tests.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/tests.rs @@ -323,7 +323,9 @@ fn test_merge_per_output_metadata_from_actual_rows() { num_outputs: 1, writer_config: ParquetWriterConfig::default(), }; - let outputs = merge_sorted_parquet_files(&[input1.clone(), input2.clone()], &output_dir, &config).unwrap(); + let outputs = + merge_sorted_parquet_files(&[input1.clone(), input2.clone()], &output_dir, &config) + .unwrap(); assert_eq!(outputs.len(), 1); let output = &outputs[0]; assert!(output.metric_names.contains("cpu")); diff --git a/quickwit/quickwit-parquet-engine/src/merge/writer.rs b/quickwit/quickwit-parquet-engine/src/merge/writer.rs index c6766bd9afe..44cb85d37ec 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/writer.rs @@ -40,7 +40,9 @@ use crate::row_keys; use crate::sort_fields::parse_sort_fields; use crate::sorted_series::SORTED_SERIES_COLUMN; use crate::split::TAG_SERVICE; -use crate::storage::split_writer::{extract_metric_names, extract_service_names, extract_time_range}; +use crate::storage::split_writer::{ + extract_metric_names, extract_service_names, extract_time_range, +}; use crate::storage::{ PARQUET_META_NUM_MERGE_OPS, PARQUET_META_ROW_KEYS, PARQUET_META_ROW_KEYS_JSON, PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START, @@ -119,8 +121,8 @@ pub fn write_merge_outputs( // Extract per-output logical metadata from the actual rows. let metric_names = extract_metric_names(&sorted_batch) .context("extracting metric names from merge output")?; - let time_range = extract_time_range(&sorted_batch) - .context("extracting time range from merge output")?; + let time_range = + extract_time_range(&sorted_batch).context("extracting time range from merge output")?; let service_names = extract_service_names(&sorted_batch) .context("extracting service names from merge output")?; diff --git a/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs b/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs index 7c7975b7f4b..3776e18df8e 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/split_writer.rs @@ -194,69 +194,86 @@ pub(crate) fn extract_time_range(batch: &RecordBatch) -> Result Result, ParquetWriteError> { +pub(crate) fn extract_metric_names( + batch: &RecordBatch, +) -> Result, ParquetWriteError> { let metric_idx = batch .schema() .index_of("metric_name") .map_err(|_| ParquetWriteError::SchemaValidation("missing metric_name column".into()))?; - let metric_col = batch.column(metric_idx); - let mut names = HashSet::new(); - - // The column is Dictionary(Int32, Utf8) - if let Some(dict_array) = metric_col - .as_any() - .downcast_ref::>() - { - let values = dict_array.values(); - if let Some(string_values) = values.as_any().downcast_ref::() { - // Get all dictionary values that are actually used - for i in 0..dict_array.len() { - if !dict_array.is_null(i) - && let Ok(key) = dict_array.keys().value(i).try_into() - { - let key: usize = key; - if key < string_values.len() && !string_values.is_null(key) { - names.insert(string_values.value(key).to_string()); - } - } - } - } - } - - Ok(names) + extract_distinct_strings(batch.column(metric_idx)) } /// Extracts distinct service names from a RecordBatch. -pub(crate) fn extract_service_names(batch: &RecordBatch) -> Result, ParquetWriteError> { +pub(crate) fn extract_service_names( + batch: &RecordBatch, +) -> Result, ParquetWriteError> { let service_idx = match batch.schema().index_of("service").ok() { Some(idx) => idx, None => return Ok(HashSet::new()), }; - let service_col = batch.column(service_idx); - let mut names = HashSet::new(); + extract_distinct_strings(batch.column(service_idx)) +} - // The column is Dictionary(Int32, Utf8) - if let Some(dict_array) = service_col +/// Extracts distinct non-null string values from a column. +/// +/// Handles both Dictionary(Int32, Utf8) encoding (common at ingest) and +/// plain Utf8/LargeUtf8 (possible after optimize_output_batch in the merge +/// path when cardinality is too high for dictionary encoding). +fn extract_distinct_strings( + col: &dyn arrow::array::Array, +) -> Result, ParquetWriteError> { + let mut values = HashSet::new(); + + // Try Dictionary(Int32, Utf8) first — the common case at ingest. + if let Some(dict_array) = col .as_any() .downcast_ref::>() + && let Some(string_values) = dict_array + .values() + .as_any() + .downcast_ref::() { - let values = dict_array.values(); - if let Some(string_values) = values.as_any().downcast_ref::() { - // Get all dictionary values that are actually used - for i in 0..dict_array.len() { - if !dict_array.is_null(i) - && let Ok(key) = dict_array.keys().value(i).try_into() - { - let key: usize = key; - if key < string_values.len() && !string_values.is_null(key) { - names.insert(string_values.value(key).to_string()); - } + for i in 0..dict_array.len() { + if !dict_array.is_null(i) + && let Ok(key) = dict_array.keys().value(i).try_into() + { + let key: usize = key; + if key < string_values.len() && !string_values.is_null(key) { + values.insert(string_values.value(key).to_string()); } } } + return Ok(values); + } + + // Fall back to plain Utf8 (after optimize_output_batch strips dictionary + // encoding for high-cardinality columns). + if let Some(string_array) = col.as_any().downcast_ref::() { + for i in 0..string_array.len() { + if !string_array.is_null(i) { + values.insert(string_array.value(i).to_string()); + } + } + return Ok(values); + } + + // LargeUtf8 variant. + if let Some(string_array) = col + .as_any() + .downcast_ref::() + { + for i in 0..string_array.len() { + if !string_array.is_null(i) { + values.insert(string_array.value(i).to_string()); + } + } + return Ok(values); } - Ok(names) + // Unrecognized column type — return empty rather than error, since the + // column may legitimately be a type we don't extract strings from. + Ok(values) } #[cfg(test)]