Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ ci-test = []
bytes = { workspace = true }
criterion = { workspace = true, features = ["async_tokio"] }
mockall = { workspace = true }
parquet = { workspace = true }
proptest = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
Expand Down
120 changes: 120 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ struct MergePipelineHandle {
handle: ActorHandle<MergePipeline>,
}

#[cfg(feature = "metrics")]
struct ParquetMergePipelineHandle {
mailbox: Mailbox<super::metrics_pipeline::ParquetMergePlanner>,
handle: ActorHandle<super::metrics_pipeline::ParquetMergePipeline>,
}

pub type BoxedPipelineHandle = Box<dyn PipelineHandle>;

/// The indexing service is (single) actor service running on indexer and in charge
Expand All @@ -108,6 +114,8 @@ pub struct IndexingService {
local_split_store: Arc<IndexingSplitCache>,
pub(crate) max_concurrent_split_uploads: usize,
merge_pipeline_handles: HashMap<MergePipelineId, MergePipelineHandle>,
#[cfg(feature = "metrics")]
parquet_merge_pipeline_handles: HashMap<IndexUid, ParquetMergePipelineHandle>,
cooperative_indexing_permits: Option<Arc<Semaphore>>,
merge_io_throughput_limiter_opt: Option<Limiter>,
pub(crate) event_broker: EventBroker,
Expand Down Expand Up @@ -171,6 +179,8 @@ impl IndexingService {
counters: Default::default(),
max_concurrent_split_uploads: indexer_config.max_concurrent_split_uploads,
merge_pipeline_handles: HashMap::new(),
#[cfg(feature = "metrics")]
parquet_merge_pipeline_handles: HashMap::new(),
merge_io_throughput_limiter_opt,
cooperative_indexing_permits,
event_broker,
Expand Down Expand Up @@ -577,6 +587,49 @@ impl IndexingService {
self.merge_pipeline_handles
.retain(|_, merge_pipeline_handle| merge_pipeline_handle.handle.state().is_running());
self.counters.num_running_merge_pipelines = self.merge_pipeline_handles.len();

// Parquet merge pipeline cleanup: shut down orphans whose parent
// metrics pipelines are gone, then reap completed/failed ones.
#[cfg(feature = "metrics")]
{
let parquet_index_uids_to_retain: HashSet<IndexUid> = self
.indexing_pipelines
.values()
.filter(|h| {
quickwit_common::is_parquet_pipeline_index(
&h.indexing_pipeline_id().index_uid.index_id,
)
})
.map(|h| h.indexing_pipeline_id().index_uid.clone())
.collect();

let parquet_to_shutdown: Vec<IndexUid> = self
.parquet_merge_pipeline_handles
.keys()
.filter(|uid| !parquet_index_uids_to_retain.contains(uid))
.cloned()
.collect();

for index_uid in parquet_to_shutdown {
if let Some((_, handle)) =
self.parquet_merge_pipeline_handles.remove_entry(&index_uid)
{
info!(
index_uid=%index_uid,
"shutting down orphan parquet merge pipeline"
);
handle
.handle
.mailbox()
.send_message(FinishPendingMergesAndShutdownPipeline)
.await
.expect("parquet merge pipeline mailbox should not be full");
}
}
self.parquet_merge_pipeline_handles
.retain(|_, handle| handle.handle.state().is_running());
}

self.update_chitchat_running_plan().await;

let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self
Expand Down Expand Up @@ -621,6 +674,73 @@ impl IndexingService {
Ok(merge_planner_mailbox)
}

/// Returns the Parquet merge planner mailbox for the given index, creating
/// a new ParquetMergePipeline if one isn't already running.
///
/// Keyed by IndexUid (not MergePipelineId) because Parquet merge pipelines
/// are shared across all sources for the same index — unlike Tantivy merge
/// pipelines which are per-source.
#[cfg(feature = "metrics")]
pub(crate) fn get_or_create_parquet_merge_pipeline(
&mut self,
index_uid: IndexUid,
index_config: &IndexConfig,
storage: Arc<dyn quickwit_storage::Storage>,
indexing_directory: quickwit_common::temp_dir::TempDirectory,
immature_splits_opt: Option<Vec<quickwit_parquet_engine::split::ParquetSplitMetadata>>,
ctx: &ActorContext<Self>,
) -> Result<Mailbox<super::metrics_pipeline::ParquetMergePlanner>, IndexingError> {
if let Some(handle) = self.parquet_merge_pipeline_handles.get(&index_uid) {
return Ok(handle.mailbox.clone());
}

// Convert the config-crate merge policy into the engine-crate type.
let cfg = index_config.indexing_settings.parquet_merge_policy();
let engine_config = quickwit_parquet_engine::merge::policy::ParquetMergePolicyConfig {
merge_factor: cfg.merge_factor,
max_merge_factor: cfg.max_merge_factor,
max_merge_ops: cfg.max_merge_ops,
target_split_size_bytes: cfg.target_split_size_bytes,
maturation_period: cfg.maturation_period,
max_finalize_merge_operations: cfg.max_finalize_merge_operations,
};
let merge_policy: Arc<dyn quickwit_parquet_engine::merge::policy::ParquetMergePolicy> =
Arc::new(
quickwit_parquet_engine::merge::policy::ConstWriteAmplificationParquetMergePolicy::new(
engine_config,
),
);

let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default();

let params = super::metrics_pipeline::ParquetMergePipelineParams {
index_uid: index_uid.clone(),
indexing_directory,
metastore: self.metastore.clone(),
storage,
merge_policy,
merge_scheduler_service: self.merge_scheduler_service.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
event_broker: self.event_broker.clone(),
writer_config,
};

let pipeline = super::metrics_pipeline::ParquetMergePipeline::new(
params,
immature_splits_opt,
ctx.spawn_ctx(),
);
let merge_planner_mailbox = pipeline.merge_planner_mailbox().clone();
let (_pipeline_mailbox, pipeline_handle) = ctx.spawn_actor().spawn(pipeline);
let handle = ParquetMergePipelineHandle {
mailbox: merge_planner_mailbox.clone(),
handle: pipeline_handle,
};
self.parquet_merge_pipeline_handles
.insert(index_uid, handle);
Ok(merge_planner_mailbox)
}

/// For all Ingest V2 pipelines, assigns the set of shards they should be working on.
/// This is done regardless of whether there has been a change in their shard list
/// or not.
Expand Down
39 changes: 21 additions & 18 deletions quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl MergeSchedulerService {
} = PeekMut::pop(next_merge);
// The permit is owned by the task and released via Drop when
// the executor finishes, triggering PermitReleased back here.
// Drop-based release ensures the semaphore is freed even on panic.
let parquet_merge_task = ParquetMergeTask {
merge_operation,
merge_permit,
Expand Down Expand Up @@ -318,24 +319,30 @@ struct ScheduleMerge {
split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
}

/// The higher, the sooner we will execute the merge operation.
/// A good merge operation
/// - strongly reduces the number splits
/// - is light.
fn score_merge_operation(merge_operation: &MergeOperation) -> u64 {
let total_num_bytes: u64 = merge_operation.total_num_bytes();
/// Scores a merge operation for priority scheduling.
///
/// Higher score = scheduled sooner. Prefers merges that strongly reduce
/// split count relative to their total byte cost. Used by both Tantivy
/// and Parquet merge scheduling.
fn score_merge(num_splits: usize, total_num_bytes: u64) -> u64 {
if total_num_bytes == 0 {
// Silly corner case that should never happen.
return u64::MAX;
}
// We will remove splits.len() and add 1 merge splits.
let delta_num_splits = (merge_operation.splits.len() - 1) as u64;
// We use integer arithmetic to avoid `f64 are not ordered` silliness.
// We will remove num_splits and add 1 merged split.
let delta_num_splits = (num_splits - 1) as u64;
// Integer arithmetic to avoid `f64 are not ordered` silliness.
(delta_num_splits << 48)
.checked_div(total_num_bytes)
.unwrap_or(1u64)
}

fn score_merge_operation(merge_operation: &MergeOperation) -> u64 {
score_merge(
merge_operation.splits.len(),
merge_operation.total_num_bytes(),
)
}

impl ScheduleMerge {
pub fn new(
merge_operation: TrackedObject<MergeOperation>,
Expand Down Expand Up @@ -406,14 +413,10 @@ impl Handler<PermitReleased> for MergeSchedulerService {

#[cfg(feature = "metrics")]
fn score_parquet_merge_operation(merge_operation: &ParquetMergeOperation) -> u64 {
let total_num_bytes = merge_operation.total_size_bytes();
if total_num_bytes == 0 {
return u64::MAX;
}
let delta_num_splits = (merge_operation.splits.len() - 1) as u64;
(delta_num_splits << 48)
.checked_div(total_num_bytes)
.unwrap_or(1u64)
score_merge(
merge_operation.splits.len(),
merge_operation.total_size_bytes(),
)
}

#[cfg(feature = "metrics")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ impl IndexingService {
IndexingError::Internal(format!("failed to parse partition_key: {error}"))
})?;

// Spawn the Parquet merge pipeline (or reuse an existing one for this
// index). The planner mailbox is wired into the MetricsPipeline's
// Publisher so newly ingested splits are fed back for merging.
let merge_planner_mailbox = self.get_or_create_parquet_merge_pipeline(
indexing_pipeline_id.index_uid.clone(),
&index_config,
storage.clone(),
indexing_directory.clone(),
// None here means the pipeline's fetch_immature_splits() will
// query the metastore on first spawn (same path as respawn).
None,
ctx,
)?;

let pipeline_params = MetricsPipelineParams {
pipeline_id: indexing_pipeline_id.clone(),
metastore: self.metastore.clone(),
Expand All @@ -83,6 +97,7 @@ impl IndexingService {
use_sketch_processors,
partition_key,
max_num_partitions: index_config.doc_mapping.max_num_partitions,
parquet_merge_planner_mailbox_opt: Some(merge_planner_mailbox),
};
let pipeline = MetricsPipeline::new(pipeline_params);
let (mailbox, handle) = ctx.spawn_actor().spawn(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ mod publisher_impl;
)]
mod parquet_e2e_test;

#[cfg(test)]
#[allow(clippy::disallowed_methods)]
mod parquet_merge_pipeline_test;

pub use parquet_doc_processor::{
ParquetDocProcessor, ParquetDocProcessorCounters, ParquetDocProcessorError, is_arrow_ipc,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,18 @@ use crate::models::PublishLock;
/// ready-to-upload Parquet files with complete metadata.
pub struct ParquetMergeExecutor {
uploader_mailbox: Mailbox<ParquetUploader>,
writer_config: ParquetWriterConfig,
}

impl ParquetMergeExecutor {
pub fn new(uploader_mailbox: Mailbox<ParquetUploader>) -> Self {
Self { uploader_mailbox }
pub fn new(
uploader_mailbox: Mailbox<ParquetUploader>,
writer_config: ParquetWriterConfig,
) -> Self {
Self {
uploader_mailbox,
writer_config,
}
}
}

Expand Down Expand Up @@ -99,36 +106,37 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
// Run the CPU-intensive merge on the dedicated thread pool.
let input_paths = scratch.downloaded_parquet_files.clone();
let output_dir_clone = output_dir.clone();
let writer_config = self.writer_config.clone();
let merge_result = run_cpu_intensive(move || {
let config = MergeConfig {
num_outputs: 1,
writer_config: ParquetWriterConfig::default(),
writer_config,
};
merge_sorted_parquet_files(&input_paths, &output_dir_clone, &config)
})
.await;

// We return Ok(()) on merge failure rather than Err to keep the actor
// alive — same strategy as Tantivy's MergeExecutor. This prevents a
// single "split of death" from crash-looping the entire pipeline.
// The trade-off: failed splits aren't retried until pipeline respawn.
let outputs: Vec<MergeOutputFile> = match merge_result {
Ok(Ok(outputs)) => outputs,
Ok(Err(merge_err)) => {
warn!(
error = %merge_err,
merge_split_id = %merge_split_id,
"parquet merge failed"
"parquet merge failed — input splits will not be retried until \
the pipeline restarts with metastore re-seeding"
);
// Input splits were drained from the planner by operations().
// They remain published but won't be re-planned until respawn.
// The input splits were drained from the planner by operations().
// They remain published in the metastore and will be re-seeded
// into the planner when the pipeline respawns (via
// fetch_immature_splits on the ParquetMergePipeline supervisor).
return Ok(());
}
Err(panicked) => {
warn!(
error = %panicked,
merge_split_id = %merge_split_id,
"parquet merge panicked"
"parquet merge panicked — input splits will not be retried until \
the pipeline restarts with metastore re-seeding"
);
return Ok(());
}
Expand Down
Loading
Loading