From 618a40c7bcf403f69d20013e356c7a85ed26d8f2 Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Thu, 19 Mar 2026 16:26:23 -0400 Subject: [PATCH 1/8] Integrate sketchlib-rust for Count-Min Sketch --- Cargo.lock | 16 +- asap-common/sketch-core/Cargo.toml | 5 + asap-common/sketch-core/report.md | 126 ++++++++ .../sketch-core/src/bin/sketchlib_fidelity.rs | 209 +++++++++++++ asap-common/sketch-core/src/config.rs | 75 +++++ asap-common/sketch-core/src/count_min.rs | 275 ++++++++++++++---- .../sketch-core/src/count_min_sketchlib.rs | 59 ++++ asap-common/sketch-core/src/lib.rs | 10 + asap-query-engine/Cargo.toml | 8 +- asap-query-engine/src/lib.rs | 17 ++ asap-query-engine/src/main.rs | 22 ++ .../count_min_sketch_accumulator.rs | 104 ++++--- asap-query-engine/tests/test_both_backends.rs | 30 ++ .../templates/udfs/countminsketch_count.rs.j2 | 78 ++++- .../templates/udfs/countminsketch_sum.rs.j2 | 84 +++++- 15 files changed, 991 insertions(+), 127 deletions(-) create mode 100644 asap-common/sketch-core/report.md create mode 100644 asap-common/sketch-core/src/bin/sketchlib_fidelity.rs create mode 100644 asap-common/sketch-core/src/config.rs create mode 100644 asap-common/sketch-core/src/count_min_sketchlib.rs create mode 100644 asap-query-engine/tests/test_both_backends.rs diff --git a/Cargo.lock b/Cargo.lock index b520550..f12cb9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -376,7 +376,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap 4.5.60", - "indexmap", + "indexmap 2.13.0", "pretty_assertions", "promql-parser", "promql_utilities", @@ -945,6 +945,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctor" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a2785755761f3ddc1492979ce1e48d2c00d09311c39e4466429188f3dd6501" +dependencies = [ + "quote", + "syn 2.0.117", +] + [[package]] name = "cxx" version = "1.0.194" @@ -3334,6 +3344,7 @@ dependencies = [ "bincode", "chrono", "clap 4.5.60", + "ctor", "dashmap 5.5.3", "datafusion", "datafusion_summary_library", @@ -3881,9 +3892,12 @@ checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" name = "sketch-core" version = "0.1.0" dependencies = [ + "clap 4.5.60", + "ctor", "dsrs", "rmp-serde", "serde", + "sketchlib-rust", "xxhash-rust", ] diff --git a/asap-common/sketch-core/Cargo.toml b/asap-common/sketch-core/Cargo.toml index 2dbea8b..70195a2 100644 --- a/asap-common/sketch-core/Cargo.toml +++ b/asap-common/sketch-core/Cargo.toml @@ -9,3 +9,8 @@ serde.workspace = true rmp-serde = "1.1" xxhash-rust = { version = "0.8", features = ["xxh32"] } dsrs = { git = "https://github.com/ProjectASAP/datasketches-rs" } +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } +clap = { version = "4.0", features = ["derive"] } + +[dev-dependencies] +ctor = "0.2" diff --git a/asap-common/sketch-core/report.md b/asap-common/sketch-core/report.md new file mode 100644 index 0000000..e1859cc --- /dev/null +++ b/asap-common/sketch-core/report.md @@ -0,0 +1,126 @@ +# Report + +Compares the **legacy** sketch implementations in `sketch-core` vs the new **sketchlib-rust** backends for: + +- `CountMinSketch` +- `CountMinSketchWithHeap` (Count-Min portion) +- `KllSketch` +- `HydraKllSketch` (via `KllSketch`) + + + + +### Fidelity harness + +The fidelity binary now selects backends via CLI flags instead of environment variables. + +| Goal | Command | +|--------------------------|--------------------------------------------------------------------------------------------------------------| +| Default (all sketchlib) | `cargo run -p sketch-core --bin sketchlib_fidelity` | +| All legacy | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl legacy --kll-impl legacy --cmwh-impl legacy` | +| Legacy KLL only | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl sketchlib --kll-impl legacy --cmwh-impl sketchlib` | + +### Unit tests + +Unit tests always run with **legacy** backends enabled (the test ctor calls +`force_legacy_mode_for_tests()`), so you only need: + +```bash +cargo test -p sketch-core +``` + +## Results + +### CountMinSketch (accuracy vs exact counts) + +#### depth=3 + +| width | n | domain | Mode | Pearson corr | MAPE (%) | RMSE (%) | +|-------|--------|--------|----------------|----------------|----------|----------| +| 1024 | 100000 | 1000 | Legacy | 0.9998451189 | 24.48 | 52.76 | +| 1024 | 100000 | 1000 | sketchlib-rust | 0.9998387103 | 24.36 | 54.11 | + +#### depth=5 + +| width | n | domain | Mode | Pearson corr | MAPE (%) | RMSE (%) | +|-------|--------|--------|----------------|----------------|----------|----------| +| 2048 | 200000 | 2000 | Legacy | 0.9999733814 | 8.75 | 29.94 | +| 2048 | 200000 | 2000 | sketchlib-rust | 0.9999744627 | 8.37 | 28.84 | +| 2048 | 50000 | 500 | Legacy | 1.0000000000 | 0.00 | 0.00 | +| 2048 | 50000 | 500 | sketchlib-rust | 1.0000000000 | 0.00 | 0.00 | + +#### depth=7 + +| width | n | domain | Mode | Pearson corr | MAPE (%) | RMSE (%) | +|-------|--------|--------|----------------|----------------|----------|----------| +| 4096 | 200000 | 2000 | Legacy | 0.9999993694 | 0.20 | 3.69 | +| 4096 | 200000 | 2000 | sketchlib-rust | 0.9999993499 | 0.21 | 4.27 | + +--- + +### CountMinSketchWithHeap (top-k + CMS accuracy on exact top-k) + +The heap is maintained by local updates; recall is measured against the **true** top-k at the end of the stream. + +#### depth=3 + +| width | n | domain | heap_size | Mode | Top-k recall | Pearson (top-k) | MAPE (%) | RMSE (%) | +|-------|--------|--------|-----------|----------------|--------------|-----------------|----------|----------| +| 1024 | 100000 | 1000 | 10 | Legacy | 0.40 | 0.9571 | 0.174 | 0.319 | +| 1024 | 100000 | 1000 | 10 | sketchlib-rust | 0.40 | 1.0000 | 0.000 | 0.000 | + +#### depth=5 + +| width | n | domain | heap_size | Mode | Top-k recall | Pearson (top-k) | MAPE (%) | RMSE (%) | +|-------|--------|--------|-----------|----------------|--------------|-----------------|----------|----------| +| 2048 | 200000 | 2000 | 20 | Legacy | 0.60 | 0.9964 | 0.045 | 0.101 | +| 2048 | 200000 | 2000 | 20 | sketchlib-rust | 0.60 | 0.9982 | 0.021 | 0.067 | +| 2048 | 200000 | 2000 | 50 | Legacy | 0.40 | 0.9999983 | 5.60 | 16.49 | +| 2048 | 200000 | 2000 | 50 | sketchlib-rust | 0.40 | 0.9999990 | 3.90 | 12.95 | + +--- + +### KllSketch (quantiles, absolute rank error) + +For each quantile \(q\), we compute the sketch estimate `est_value`, then: +`abs_rank_error = |rank_fraction(exact_sorted_values, est_value) - q|`. + +#### k=20 + +| n_updates | Mode | q=0.5 | q=0.9 | q=0.99 | +|-----------|----------------|---------|---------|---------| +| 200000 | Legacy | 0.0104 | 0.0145 | 0.0028 | +| 200000 | sketchlib-rust | 0.0275 | 0.0470 | 0.0061 | +| 50000 | Legacy | 0.0131 | 0.0091 | 0.0054 | +| 50000 | sketchlib-rust | 0.0110 | 0.0116 | 0.0031 | + +#### k=50 + +| n_updates | Mode | q=0.5 | q=0.9 | q=0.99 | +|-----------|----------------|---------|---------|---------| +### CountMinSketch (accuracy vs exact counts) + +#### depth=3 + +| width | n | domain | Mode | Pearson corr | MAPE (%) | RMSE (%) | +|-------|--------|--------|----------------|----------------|----------|----------| +| 1024 | 100000 | 1000 | Legacy | 0.9998451189 | 24.48 | 52.76 | +| 1024 | 100000 | 1000 | sketchlib-rust | 0.9998387103 | 24.36 | 54.11 | + +#### depth=5 + +| width | n | domain | Mode | Pearson corr | MAPE (%) | RMSE (%) | +|-------|--------|--------|----------------|----------------|----------|----------| +| 2048 | 200000 | 2000 | Legacy | 0.9999733814 | 8.75 | 29.94 | +| 2048 | 200000 | 2000 | sketchlib-rust | 0.9999744627 | 8.37 | 28.84 | +| 2048 | 50000 | 500 | Legacy | 1.0000000000 | 0.00 | 0.00 | +| 2048 | 50000 | 500 | sketchlib-rust | 1.0000000000 | 0.00 | 0.00 | + +#### depth=7 + +| width | n | domain | Mode | Pearson corr | MAPE (%) | RMSE (%) | +|-------|--------|--------|----------------|----------------|----------|----------| +| 4096 | 200000 | 2000 | Legacy | 0.9999993694 | 0.20 | 3.69 | +| 4096 | 200000 | 2000 | sketchlib-rust | 0.9999993499 | 0.21 | 4.27 | + +--- diff --git a/asap-common/sketch-core/src/bin/sketchlib_fidelity.rs b/asap-common/sketch-core/src/bin/sketchlib_fidelity.rs new file mode 100644 index 0000000..c9c6f97 --- /dev/null +++ b/asap-common/sketch-core/src/bin/sketchlib_fidelity.rs @@ -0,0 +1,209 @@ +// rank_fraction used for KLL benchmarks (added in later PR) +#![allow(dead_code)] + +use clap::Parser; +use sketch_core::config::{self, ImplMode}; +use sketch_core::count_min::CountMinSketch; + +#[derive(Clone)] +struct Lcg64 { + state: u64, +} + +impl Lcg64 { + fn new(seed: u64) -> Self { + Self { state: seed } + } + + fn next_u64(&mut self) -> u64 { + self.state = self + .state + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + self.state + } + + fn next_f64_0_1(&mut self) -> f64 { + let x = self.next_u64() >> 11; + (x as f64) / ((1u64 << 53) as f64) + } +} + +fn pearson_corr(exact: &[f64], est: &[f64]) -> f64 { + let n = exact.len().min(est.len()); + if n == 0 { + return f64::NAN; + } + let (mut sum_x, mut sum_y) = (0.0, 0.0); + for i in 0..n { + sum_x += exact[i]; + sum_y += est[i]; + } + let mean_x = sum_x / (n as f64); + let mean_y = sum_y / (n as f64); + let (mut num, mut den_x, mut den_y) = (0.0, 0.0, 0.0); + for i in 0..n { + let dx = exact[i] - mean_x; + let dy = est[i] - mean_y; + num += dx * dy; + den_x += dx * dx; + den_y += dy * dy; + } + if den_x == 0.0 || den_y == 0.0 { + return f64::NAN; + } + num / (den_x.sqrt() * den_y.sqrt()) +} + +fn mape(exact: &[f64], est: &[f64]) -> f64 { + let n = exact.len().min(est.len()); + let mut num = 0.0; + let mut denom = 0.0; + for i in 0..n { + if exact[i] == 0.0 { + continue; + } + num += ((exact[i] - est[i]) / exact[i]).abs(); + denom += 1.0; + } + if denom == 0.0 { + return if exact == est { 0.0 } else { f64::INFINITY }; + } + (num / denom) * 100.0 +} + +fn rmse_percentage(exact: &[f64], est: &[f64]) -> f64 { + let n = exact.len().min(est.len()); + let mut sum_sq = 0.0; + let mut denom = 0.0; + for i in 0..n { + if exact[i] == 0.0 { + continue; + } + let rel = (exact[i] - est[i]) / exact[i]; + sum_sq += rel * rel; + denom += 1.0; + } + if denom == 0.0 { + return if exact == est { 0.0 } else { f64::INFINITY }; + } + (sum_sq / denom).sqrt() * 100.0 +} + +fn rank_fraction(sorted: &[f64], x: f64) -> f64 { + if sorted.is_empty() { + return 0.0; + } + let idx = sorted.partition_point(|v| *v <= x); + (idx as f64) / (sorted.len() as f64) +} + +// --- CountMinSketch parameter sets and runner --- + +struct CmsParams { + depth: usize, + width: usize, + n: usize, + domain: usize, +} + +struct CmsResult { + pearson: f64, + mape: f64, + rmse: f64, +} + +fn run_countmin_once(seed: u64, p: &CmsParams) -> CmsResult { + let mut rng = Lcg64::new(seed); + let mut exact: Vec = vec![0.0; p.domain]; + let mut cms = CountMinSketch::new(p.depth, p.width); + + for _ in 0..p.n { + let r = rng.next_u64(); + let key_id = if (r & 0xFF) < 200 { + (r as usize) % 20 + } else { + (r as usize) % p.domain + }; + let key = format!("k{key_id}"); + cms.update(&key, 1.0); + exact[key_id] += 1.0; + } + + let mut est: Vec = Vec::with_capacity(p.domain); + for key_id in 0..p.domain { + let key = format!("k{key_id}"); + est.push(cms.query_key(&key)); + } + + CmsResult { + pearson: pearson_corr(&exact, &est), + mape: mape(&exact, &est), + rmse: rmse_percentage(&exact, &est), + } +} + +#[derive(Parser)] +struct Args { + #[arg(long, value_enum, default_value = "sketchlib")] + cms_impl: ImplMode, + #[arg(long, value_enum, default_value = "sketchlib")] + kll_impl: ImplMode, + #[arg(long, value_enum, default_value = "sketchlib")] + cmwh_impl: ImplMode, +} + +fn main() { + let args = Args::parse(); + config::configure(args.cms_impl, args.kll_impl, args.cmwh_impl) + .expect("sketch backend already initialised"); + + let seed = 0xC0FFEE_u64; + let mode = if matches!(args.cms_impl, ImplMode::Legacy) + || matches!(args.kll_impl, ImplMode::Legacy) + || matches!(args.cmwh_impl, ImplMode::Legacy) + { + "Legacy" + } else { + "sketchlib-rust" + }; + + // CountMinSketch: multiple (depth, width, n, domain) + let cms_param_sets: Vec = vec![ + CmsParams { + depth: 3, + width: 1024, + n: 100_000, + domain: 1000, + }, + CmsParams { + depth: 5, + width: 2048, + n: 200_000, + domain: 2000, + }, + CmsParams { + depth: 7, + width: 4096, + n: 200_000, + domain: 2000, + }, + CmsParams { + depth: 5, + width: 2048, + n: 50_000, + domain: 500, + }, + ]; + + println!("## CountMinSketch ({mode})"); + println!("| depth | width | n_updates | domain | Pearson corr | MAPE (%) | RMSE (%) |"); + println!("|-------|-------|------------|--------|--------------|----------|----------|"); + for p in &cms_param_sets { + let r = run_countmin_once(seed, p); + println!( + "| {} | {} | {} | {} | {:.10} | {:.6} | {:.6} |", + p.depth, p.width, p.n, p.domain, r.pearson, r.mape, r.rmse + ); + } +} diff --git a/asap-common/sketch-core/src/config.rs b/asap-common/sketch-core/src/config.rs new file mode 100644 index 0000000..84267b0 --- /dev/null +++ b/asap-common/sketch-core/src/config.rs @@ -0,0 +1,75 @@ +use std::sync::OnceLock; + +/// Implementation mode for sketch-core internals. +#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] +pub enum ImplMode { + /// Use the original hand-written implementations. + Legacy, + /// Use sketchlib-rust backed implementations. + Sketchlib, +} + +static COUNTMIN_MODE: OnceLock = OnceLock::new(); + +/// Returns true if Count-Min operations should use sketchlib-rust internally. +pub fn use_sketchlib_for_count_min() -> bool { + *COUNTMIN_MODE.get_or_init(|| ImplMode::Sketchlib) == ImplMode::Sketchlib +} + +static KLL_MODE: OnceLock = OnceLock::new(); + +/// Returns true if KLL operations should use sketchlib-rust internally. +pub fn use_sketchlib_for_kll() -> bool { + *KLL_MODE.get_or_init(|| ImplMode::Sketchlib) == ImplMode::Sketchlib +} + +static COUNTMIN_WITH_HEAP_MODE: OnceLock = OnceLock::new(); + +/// Returns true if Count-Min-With-Heap operations should use sketchlib-rust internally for the +/// Count-Min portion. +pub fn use_sketchlib_for_count_min_with_heap() -> bool { + *COUNTMIN_WITH_HEAP_MODE.get_or_init(|| ImplMode::Sketchlib) == ImplMode::Sketchlib +} + +/// Set backend modes for all sketch types. Call once at process startup, +/// before any sketch operation. Returns Err if any OnceLock was already set. +pub fn configure(cms: ImplMode, kll: ImplMode, cmwh: ImplMode) -> Result<(), &'static str> { + let a = COUNTMIN_MODE.set(cms); + let b = KLL_MODE.set(kll); + let c = COUNTMIN_WITH_HEAP_MODE.set(cmwh); + if a.is_err() || b.is_err() || c.is_err() { + Err("configure() called after sketch backends were already initialised") + } else { + Ok(()) + } +} + +pub fn force_legacy_mode_for_tests() { + let _ = COUNTMIN_MODE.set(ImplMode::Legacy); + let _ = KLL_MODE.set(ImplMode::Legacy); + let _ = COUNTMIN_WITH_HEAP_MODE.set(ImplMode::Legacy); +} + +/// Helper used by UDF templates and documentation examples to parse implementation mode +/// from environment variables in a robust way. This is not used in the hot path. +pub fn parse_mode(var: Result) -> ImplMode { + match var { + Ok(v) => match v.to_ascii_lowercase().as_str() { + "legacy" => ImplMode::Legacy, + "sketchlib" => ImplMode::Sketchlib, + other => { + eprintln!( + "sketch-core: unrecognised IMPL value {other:?}, defaulting to Sketchlib" + ); + ImplMode::Sketchlib + } + }, + Err(std::env::VarError::NotPresent) => ImplMode::Sketchlib, + Err(std::env::VarError::NotUnicode(v)) => { + eprintln!( + "sketch-core: IMPL env var has invalid UTF-8 ({v:?}), defaulting to Sketchlib" + ); + ImplMode::Sketchlib + } + } +} diff --git a/asap-common/sketch-core/src/count_min.rs b/asap-common/sketch-core/src/count_min.rs index fcd7794..a77e8bb 100644 --- a/asap-common/sketch-core/src/count_min.rs +++ b/asap-common/sketch-core/src/count_min.rs @@ -14,47 +14,113 @@ use serde::{Deserialize, Serialize}; use xxhash_rust::xxh32::xxh32; +use crate::config::use_sketchlib_for_count_min; +use crate::count_min_sketchlib::{ + matrix_from_sketchlib_cms, new_sketchlib_cms, sketchlib_cms_from_matrix, sketchlib_cms_query, + sketchlib_cms_update, SketchlibCms, +}; + +#[derive(Serialize, Deserialize)] +struct WireFormat { + sketch: Vec>, + row_num: usize, + col_num: usize, +} + +/// Backend implementation for Count-Min Sketch. Only one is active at a time. +#[derive(Debug, Clone)] +pub enum CountMinBackend { + /// Original hand-written matrix implementation. + Legacy(Vec>), + /// sketchlib-rust backed implementation. + Sketchlib(SketchlibCms), +} + /// Count-Min Sketch probabilistic data structure for frequency counting. /// Provides approximate frequency counts with error bounds. /// This is the canonical shared implementation; the msgpack wire format is the /// contract between Arroyo UDAFs (producers) and QueryEngineRust (consumer). -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] pub struct CountMinSketch { - pub sketch: Vec>, pub row_num: usize, pub col_num: usize, + pub backend: CountMinBackend, } impl CountMinSketch { pub fn new(row_num: usize, col_num: usize) -> Self { - let sketch = vec![vec![0.0; col_num]; row_num]; + let backend = if use_sketchlib_for_count_min() { + CountMinBackend::Sketchlib(new_sketchlib_cms(row_num, col_num)) + } else { + CountMinBackend::Legacy(vec![vec![0.0; col_num]; row_num]) + }; Self { - sketch, row_num, col_num, + backend, + } + } + + /// Returns the sketch matrix (for wire format, serialization, tests). + pub fn sketch(&self) -> Vec> { + match &self.backend { + CountMinBackend::Legacy(m) => m.clone(), + CountMinBackend::Sketchlib(s) => matrix_from_sketchlib_cms(s), + } + } + + /// Mutable access to the matrix. Only `Some` for Legacy backend. + pub fn sketch_mut(&mut self) -> Option<&mut Vec>> { + match &mut self.backend { + CountMinBackend::Legacy(m) => Some(m), + CountMinBackend::Sketchlib(_) => None, + } + } + + /// Construct from a legacy matrix (used by deserialization and query engine). + pub fn from_legacy_matrix(sketch: Vec>, row_num: usize, col_num: usize) -> Self { + let backend = if use_sketchlib_for_count_min() { + CountMinBackend::Sketchlib(sketchlib_cms_from_matrix(row_num, col_num, &sketch)) + } else { + CountMinBackend::Legacy(sketch) + }; + Self { + row_num, + col_num, + backend, } } pub fn update(&mut self, key: &str, value: f64) { - let key_bytes = key.as_bytes(); - // Update each row using different hash functions - for i in 0..self.row_num { - let hash_value = xxh32(key_bytes, i as u32); - let col_index = (hash_value as usize) % self.col_num; - self.sketch[i][col_index] += value; + match &mut self.backend { + CountMinBackend::Legacy(sketch) => { + let key_bytes = key.as_bytes(); + for (i, row) in sketch.iter_mut().enumerate().take(self.row_num) { + let hash_value = xxh32(key_bytes, i as u32); + let col_index = (hash_value as usize) % self.col_num; + row[col_index] += value; + } + } + CountMinBackend::Sketchlib(s) => { + sketchlib_cms_update(s, key, value); + } } } pub fn query_key(&self, key: &str) -> f64 { - let key_bytes = key.as_bytes(); - let mut min_value = f64::MAX; - // Query each row and take the minimum - for i in 0..self.row_num { - let hash_value = xxh32(key_bytes, i as u32); - let col_index = (hash_value as usize) % self.col_num; - min_value = min_value.min(self.sketch[i][col_index]); + match &self.backend { + CountMinBackend::Legacy(sketch) => { + let key_bytes = key.as_bytes(); + let mut min_value = f64::MAX; + for (i, row) in sketch.iter().enumerate().take(self.row_num) { + let hash_value = xxh32(key_bytes, i as u32); + let col_index = (hash_value as usize) % self.col_num; + min_value = min_value.min(row[col_index]); + } + min_value + } + CountMinBackend::Sketchlib(s) => sketchlib_cms_query(s, key), } - min_value } pub fn merge( @@ -80,17 +146,44 @@ impl CountMinSketch { } } - let mut merged = accumulators[0].clone(); - // Add all sketches element-wise - for acc in &accumulators[1..] { - for (merged_row, acc_row) in merged.sketch.iter_mut().zip(&acc.sketch) { - for (m_cell, a_cell) in merged_row.iter_mut().zip(acc_row.iter()) { - *m_cell += *a_cell; + if use_sketchlib_for_count_min() { + let mut sketchlib_inners: Vec = Vec::with_capacity(accumulators.len()); + for acc in accumulators { + let matrix = acc.sketch(); + let inner = sketchlib_cms_from_matrix(acc.row_num, acc.col_num, &matrix); + sketchlib_inners.push(inner); + } + let merged_sketchlib = sketchlib_inners + .into_iter() + .reduce(|mut lhs: SketchlibCms, rhs: SketchlibCms| { + lhs.merge(&rhs); + lhs + }) + .ok_or("No accumulators to merge")?; + + let sketch = matrix_from_sketchlib_cms(&merged_sketchlib); + let row_num = sketch.len(); + let col_num = sketch.first().map(|r| r.len()).unwrap_or(0); + + Ok(Self { + row_num, + col_num, + backend: CountMinBackend::Sketchlib(merged_sketchlib), + }) + } else { + let mut merged = accumulators[0].clone(); + for acc in &accumulators[1..] { + let acc_matrix = acc.sketch(); + if let CountMinBackend::Legacy(merged_matrix) = &mut merged.backend { + for (merged_row, acc_row) in merged_matrix.iter_mut().zip(acc_matrix.iter()) { + for (m_cell, a_cell) in merged_row.iter_mut().zip(acc_row.iter()) { + *m_cell += *a_cell; + } + } } } + Ok(merged) } - - Ok(merged) } /// Merge from references, allocating only the output — no input clones. @@ -112,31 +205,94 @@ impl CountMinSketch { } } - let mut merged = Self::new(row_num, col_num); - for acc in accumulators { - for (merged_row, acc_row) in merged.sketch.iter_mut().zip(&acc.sketch) { - for (m_cell, a_cell) in merged_row.iter_mut().zip(acc_row.iter()) { - *m_cell += *a_cell; + if use_sketchlib_for_count_min() { + let mut sketchlib_inners: Vec = Vec::with_capacity(accumulators.len()); + for acc in accumulators { + let acc_matrix = acc.sketch(); + let matrix_has_values = acc_matrix + .iter() + .any(|row: &Vec| row.iter().any(|&v| v != 0.0)); + + let inner = if matrix_has_values { + sketchlib_cms_from_matrix(acc.row_num, acc.col_num, &acc_matrix) + } else if let CountMinBackend::Sketchlib(s) = &acc.backend { + s.clone() + } else { + sketchlib_cms_from_matrix(acc.row_num, acc.col_num, &acc_matrix) + }; + + sketchlib_inners.push(inner); + } + + let merged_sketchlib = sketchlib_inners + .into_iter() + .reduce(|mut lhs: SketchlibCms, rhs: SketchlibCms| { + lhs.merge(&rhs); + lhs + }) + .ok_or("No accumulators to merge")?; + + let sketch = matrix_from_sketchlib_cms(&merged_sketchlib); + let r = sketch.len(); + let c = sketch.first().map(|row| row.len()).unwrap_or(0); + + Ok(Self { + row_num: r, + col_num: c, + backend: CountMinBackend::Sketchlib(merged_sketchlib), + }) + } else { + let mut merged = Self::new(row_num, col_num); + if let CountMinBackend::Legacy(ref mut merged_sketch) = merged.backend { + for acc in accumulators { + let acc_matrix = acc.sketch(); + for (merged_row, acc_row) in merged_sketch.iter_mut().zip(acc_matrix.iter()) { + for (m_cell, a_cell) in merged_row.iter_mut().zip(acc_row.iter()) { + *m_cell += *a_cell; + } + } } } + Ok(merged) } - - Ok(merged) } /// Serialize to MessagePack — matches the Arroyo UDF wire format exactly. pub fn serialize_msgpack(&self) -> Vec { - // Match Arroyo UDF: countminsketch.serialize(&mut Serializer::new(&mut buf)) + let sketch = self.sketch(); + let wire = WireFormat { + sketch, + row_num: self.row_num, + col_num: self.col_num, + }; + let mut buf = Vec::new(); - self.serialize(&mut rmp_serde::Serializer::new(&mut buf)) + wire.serialize(&mut rmp_serde::Serializer::new(&mut buf)) .unwrap(); buf } /// Deserialize from MessagePack produced by the Arroyo UDF. pub fn deserialize_msgpack(buffer: &[u8]) -> Result> { - rmp_serde::from_slice(buffer).map_err(|e| { - format!("Failed to deserialize CountMinSketch from MessagePack: {e}").into() + let wire: WireFormat = + rmp_serde::from_slice(buffer).map_err(|e| -> Box { + format!("Failed to deserialize CountMinSketch from MessagePack: {e}").into() + })?; + + let backend = if use_sketchlib_for_count_min() { + CountMinBackend::Sketchlib(sketchlib_cms_from_matrix( + wire.row_num, + wire.col_num, + &wire.sketch, + )) + } else { + CountMinBackend::Legacy(wire.sketch) + }; + + Ok(Self { + row_num: wire.row_num, + col_num: wire.col_num, + backend, }) } @@ -178,11 +334,12 @@ mod tests { let cms = CountMinSketch::new(4, 1000); assert_eq!(cms.row_num, 4); assert_eq!(cms.col_num, 1000); - assert_eq!(cms.sketch.len(), 4); - assert_eq!(cms.sketch[0].len(), 1000); + let sketch = cms.sketch(); + assert_eq!(sketch.len(), 4); + assert_eq!(sketch[0].len(), 1000); // Check all values are initialized to 0 - for row in &cms.sketch { + for row in &sketch { for &value in row { assert_eq!(value, 0.0); } @@ -206,20 +363,23 @@ mod tests { #[test] fn test_count_min_sketch_merge() { - let mut cms1 = CountMinSketch::new(2, 3); - let mut cms2 = CountMinSketch::new(2, 3); - - cms1.sketch[0][0] = 5.0; - cms1.sketch[1][2] = 10.0; + // Use from_legacy_matrix so the test works regardless of sketchlib/legacy config + let mut sketch1 = vec![vec![0.0; 3]; 2]; + sketch1[0][0] = 5.0; + sketch1[1][2] = 10.0; + let cms1 = CountMinSketch::from_legacy_matrix(sketch1, 2, 3); - cms2.sketch[0][0] = 3.0; - cms2.sketch[0][1] = 7.0; + let mut sketch2 = vec![vec![0.0; 3]; 2]; + sketch2[0][0] = 3.0; + sketch2[0][1] = 7.0; + let cms2 = CountMinSketch::from_legacy_matrix(sketch2, 2, 3); let merged = CountMinSketch::merge(vec![cms1, cms2]).unwrap(); + let merged_sketch = merged.sketch(); - assert_eq!(merged.sketch[0][0], 8.0); // 5 + 3 - assert_eq!(merged.sketch[0][1], 7.0); // 0 + 7 - assert_eq!(merged.sketch[1][2], 10.0); // 10 + 0 + assert_eq!(merged_sketch[0][0], 8.0); // 5 + 3 + assert_eq!(merged_sketch[0][1], 7.0); // 0 + 7 + assert_eq!(merged_sketch[1][2], 10.0); // 10 + 0 } #[test] @@ -231,17 +391,18 @@ mod tests { #[test] fn test_count_min_sketch_msgpack_round_trip() { - let mut cms = CountMinSketch::new(2, 3); - cms.sketch[0][1] = 42.0; - cms.sketch[1][2] = 100.0; + let mut cms = CountMinSketch::new(4, 256); + cms.update("apple", 5.0); + cms.update("banana", 3.0); + cms.update("apple", 2.0); // total "apple" = 7 let bytes = cms.serialize_msgpack(); let deserialized = CountMinSketch::deserialize_msgpack(&bytes).unwrap(); - assert_eq!(deserialized.row_num, 2); - assert_eq!(deserialized.col_num, 3); - assert_eq!(deserialized.sketch[0][1], 42.0); - assert_eq!(deserialized.sketch[1][2], 100.0); + assert_eq!(deserialized.row_num, 4); + assert_eq!(deserialized.col_num, 256); + assert!(deserialized.query_key("apple") >= 7.0); + assert!(deserialized.query_key("banana") >= 3.0); } #[test] diff --git a/asap-common/sketch-core/src/count_min_sketchlib.rs b/asap-common/sketch-core/src/count_min_sketchlib.rs new file mode 100644 index 0000000..20fe7be --- /dev/null +++ b/asap-common/sketch-core/src/count_min_sketchlib.rs @@ -0,0 +1,59 @@ +use sketchlib_rust::{CountMin, RegularPath, SketchInput, Vector2D}; + +/// Concrete Count-Min type from sketchlib-rust when sketchlib backend is enabled. +/// Uses f64 counters (Vector2D) for weighted updates without integer rounding. +pub type SketchlibCms = CountMin, RegularPath>; + +/// Creates a fresh sketchlib Count-Min sketch with the given dimensions. +pub fn new_sketchlib_cms(row_num: usize, col_num: usize) -> SketchlibCms { + SketchlibCms::with_dimensions(row_num, col_num) +} + +/// Builds a sketchlib Count-Min sketch from an existing `sketch` matrix. +pub fn sketchlib_cms_from_matrix( + row_num: usize, + col_num: usize, + sketch: &[Vec], +) -> SketchlibCms { + let matrix = Vector2D::from_fn(row_num, col_num, |r, c| { + sketch + .get(r) + .and_then(|row| row.get(c)) + .copied() + .unwrap_or(0.0) + }); + SketchlibCms::from_storage(matrix) +} + +/// Converts a sketchlib Count-Min sketch into the legacy `Vec>` matrix. +pub fn matrix_from_sketchlib_cms(inner: &SketchlibCms) -> Vec> { + let storage: &Vector2D = inner.as_storage(); + let rows = storage.rows(); + let cols = storage.cols(); + let mut sketch = vec![vec![0.0; cols]; rows]; + + for (r, row) in sketch.iter_mut().enumerate().take(rows) { + for (c, cell) in row.iter_mut().enumerate().take(cols) { + if let Some(v) = storage.get(r, c) { + *cell = *v; + } + } + } + + sketch +} + +/// Helper to update a sketchlib Count-Min with a weighted key. +pub fn sketchlib_cms_update(inner: &mut SketchlibCms, key: &str, value: f64) { + if value <= 0.0 { + return; + } + let input = SketchInput::String(key.to_owned()); + inner.insert_many(&input, value); +} + +/// Helper to query a sketchlib Count-Min for a key, returning f64. +pub fn sketchlib_cms_query(inner: &SketchlibCms, key: &str) -> f64 { + let input = SketchInput::String(key.to_owned()); + inner.estimate(&input) +} diff --git a/asap-common/sketch-core/src/lib.rs b/asap-common/sketch-core/src/lib.rs index 461d43e..1cb0401 100644 --- a/asap-common/sketch-core/src/lib.rs +++ b/asap-common/sketch-core/src/lib.rs @@ -1,4 +1,14 @@ +// Force legacy sketch implementations during tests so that tests that mutate the +// matrix directly or rely on legacy behavior pass. +#[cfg(test)] +#[ctor::ctor] +fn init_sketch_legacy_for_tests() { + crate::config::force_legacy_mode_for_tests(); +} + +pub mod config; pub mod count_min; +pub mod count_min_sketchlib; pub mod count_min_with_heap; pub mod delta_set_aggregator; pub mod hydra_kll; diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index 2e2ccf4..8723bb9 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -58,9 +58,6 @@ zstd = "0.13" reqwest = { version = "0.11", features = ["json"] } tracing-appender = "0.2" -[dev-dependencies] -tempfile = "3.20.0" - [features] #default = ["lock_profiling", "extra_debugging"] default = [] @@ -68,3 +65,8 @@ default = [] lock_profiling = [] # Enable extra debugging output extra_debugging = [] +sketchlib-tests = [] + +[dev-dependencies] +ctor = "0.2" +tempfile = "3.20.0" diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index 22295ed..7e59fff 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -1,3 +1,20 @@ +// Configure sketch-core implementations during tests. +// Use sketchlib-tests feature to choose backend: without it = Legacy, with it = Sketchlib. +// A single `cargo test -p query_engine_rust` runs both: lib tests use Legacy, then +// tests/test_both_backends.rs spawns the sketchlib run. +#[cfg(test)] +#[ctor::ctor] +fn init_sketch_backend_for_tests() { + #[cfg(feature = "sketchlib-tests")] + let _ = sketch_core::config::configure( + sketch_core::config::ImplMode::Sketchlib, + sketch_core::config::ImplMode::Sketchlib, + sketch_core::config::ImplMode::Sketchlib, + ); + #[cfg(not(feature = "sketchlib-tests"))] + sketch_core::config::force_legacy_mode_for_tests(); +} + pub mod data_model; pub mod drivers; pub mod engines; diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index a950fba..f0752c3 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -5,6 +5,8 @@ use std::sync::Arc; use tokio::signal; use tracing::{error, info}; +use sketch_core::config::{self, ImplMode}; + use query_engine_rust::data_model::enums::{InputFormat, LockStrategy, StreamingEngine}; use query_engine_rust::drivers::AdapterConfig; use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config}; @@ -108,6 +110,18 @@ struct Args { #[arg(long)] promsketch_config: Option, + /// Backend implementation for Count-Min Sketch (legacy | sketchlib) + #[arg(long, value_enum, default_value = "sketchlib")] + sketch_cms_impl: ImplMode, + + /// Backend implementation for KLL Sketch (legacy | sketchlib) + #[arg(long, value_enum, default_value = "sketchlib")] + sketch_kll_impl: ImplMode, + + /// Backend implementation for Count-Min-With-Heap (legacy | sketchlib) + #[arg(long, value_enum, default_value = "sketchlib")] + sketch_cmwh_impl: ImplMode, + /// Enable OTLP metrics ingest (gRPC + HTTP) #[arg(long)] enable_otel_ingest: bool, @@ -125,6 +139,14 @@ struct Args { async fn main() -> Result<()> { let args = Args::parse(); + // Configure sketch-core backends before any sketch operations. + config::configure( + args.sketch_cms_impl, + args.sketch_kll_impl, + args.sketch_cmwh_impl, + ) + .expect("sketch backend already initialised"); + // Create output directory fs::create_dir_all(&args.output_dir)?; diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs index bba716d..e149cba 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs @@ -56,11 +56,7 @@ impl CountMinSketchAccumulator { } Ok(Self { - inner: CountMinSketch { - sketch, - row_num, - col_num, - }, + inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num), }) } @@ -111,11 +107,7 @@ impl CountMinSketchAccumulator { } Ok(Self { - inner: CountMinSketch { - row_num, - col_num, - sketch, - }, + inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num), }) } @@ -168,7 +160,7 @@ impl SerializableToSink for CountMinSketchAccumulator { serde_json::json!({ "row_num": self.inner.row_num, "col_num": self.inner.col_num, - "sketch": self.inner.sketch + "sketch": self.inner.sketch() }) } @@ -261,10 +253,11 @@ mod tests { let cms = CountMinSketchAccumulator::new(4, 1000); assert_eq!(cms.inner.row_num, 4); assert_eq!(cms.inner.col_num, 1000); - assert_eq!(cms.inner.sketch.len(), 4); - assert_eq!(cms.inner.sketch[0].len(), 1000); + let sketch = cms.inner.sketch(); + assert_eq!(sketch.len(), 4); + assert_eq!(sketch[0].len(), 1000); - for row in &cms.inner.sketch { + for row in &sketch { for &value in row { assert_eq!(value, 0.0); } @@ -292,19 +285,28 @@ mod tests { #[test] fn test_count_min_sketch_merge() { - let mut cms1 = CountMinSketchAccumulator::new(2, 3); - let mut cms2 = CountMinSketchAccumulator::new(2, 3); - - cms1.inner.sketch[0][0] = 5.0; - cms1.inner.sketch[1][2] = 10.0; - cms2.inner.sketch[0][0] = 3.0; - cms2.inner.sketch[0][1] = 7.0; + // Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends). + let cms1 = CountMinSketchAccumulator { + inner: CountMinSketch::from_legacy_matrix( + vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], + 2, + 3, + ), + }; + let cms2 = CountMinSketchAccumulator { + inner: CountMinSketch::from_legacy_matrix( + vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], + 2, + 3, + ), + }; let merged = CountMinSketchAccumulator::merge_accumulators(vec![cms1, cms2]).unwrap(); - assert_eq!(merged.inner.sketch[0][0], 8.0); - assert_eq!(merged.inner.sketch[0][1], 7.0); - assert_eq!(merged.inner.sketch[1][2], 10.0); + let merged_sketch = merged.inner.sketch(); + assert_eq!(merged_sketch[0][0], 8.0); + assert_eq!(merged_sketch[0][1], 7.0); + assert_eq!(merged_sketch[1][2], 10.0); } #[test] @@ -317,9 +319,13 @@ mod tests { #[test] fn test_count_min_sketch_serialization() { - let mut cms = CountMinSketchAccumulator::new(2, 3); - cms.inner.sketch[0][1] = 42.0; - cms.inner.sketch[1][2] = 100.0; + let cms = CountMinSketchAccumulator { + inner: CountMinSketch::from_legacy_matrix( + vec![vec![0.0, 42.0, 0.0], vec![0.0, 0.0, 100.0]], + 2, + 3, + ), + }; let bytes = cms.serialize_to_bytes(); let deserialized = @@ -327,8 +333,9 @@ mod tests { assert_eq!(deserialized.inner.row_num, 2); assert_eq!(deserialized.inner.col_num, 3); - assert_eq!(deserialized.inner.sketch[0][1], 42.0); - assert_eq!(deserialized.inner.sketch[1][2], 100.0); + let deser_sketch = deserialized.inner.sketch(); + assert_eq!(deser_sketch[0][1], 42.0); + assert_eq!(deser_sketch[1][2], 100.0); } #[test] @@ -396,25 +403,38 @@ mod tests { #[test] fn test_count_min_sketch_merge_multiple() { - let mut cms1 = CountMinSketchAccumulator::new(2, 3); - let mut cms2 = CountMinSketchAccumulator::new(2, 3); - let mut cms3 = CountMinSketchAccumulator::new(2, 3); - - cms1.inner.sketch[0][0] = 5.0; - cms1.inner.sketch[1][2] = 10.0; - cms2.inner.sketch[0][0] = 3.0; - cms2.inner.sketch[0][1] = 7.0; - cms3.inner.sketch[0][0] = 2.0; - cms3.inner.sketch[1][2] = 5.0; + // Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends). + let cms1 = CountMinSketchAccumulator { + inner: CountMinSketch::from_legacy_matrix( + vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], + 2, + 3, + ), + }; + let cms2 = CountMinSketchAccumulator { + inner: CountMinSketch::from_legacy_matrix( + vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], + 2, + 3, + ), + }; + let cms3 = CountMinSketchAccumulator { + inner: CountMinSketch::from_legacy_matrix( + vec![vec![2.0, 0.0, 0.0], vec![0.0, 0.0, 5.0]], + 2, + 3, + ), + }; let boxed_accs: Vec> = vec![Box::new(cms1), Box::new(cms2), Box::new(cms3)]; let merged = CountMinSketchAccumulator::merge_multiple(&boxed_accs).unwrap(); - assert_eq!(merged.inner.sketch[0][0], 10.0); - assert_eq!(merged.inner.sketch[0][1], 7.0); - assert_eq!(merged.inner.sketch[1][2], 15.0); + let merged_sketch = merged.inner.sketch(); + assert_eq!(merged_sketch[0][0], 10.0); + assert_eq!(merged_sketch[0][1], 7.0); + assert_eq!(merged_sketch[1][2], 15.0); } #[test] diff --git a/asap-query-engine/tests/test_both_backends.rs b/asap-query-engine/tests/test_both_backends.rs new file mode 100644 index 0000000..5643756 --- /dev/null +++ b/asap-query-engine/tests/test_both_backends.rs @@ -0,0 +1,30 @@ +//! Integration test that runs the library test suite with the sketchlib backend. +//! +//! When you run `cargo test -p query_engine_rust` (without --features sketchlib-tests), +//! the lib tests run with the legacy backend. This test spawns a second run with the +//! sketchlib backend so both modes are exercised in one `cargo test` invocation. +//! +//! This test is only compiled when sketchlib-tests is NOT enabled, to avoid recursion. + +#[cfg(not(feature = "sketchlib-tests"))] +#[test] +fn test_sketchlib_backend() { + use std::process::Command; + + let status = Command::new(env!("CARGO")) + .args([ + "test", + "-p", + "query_engine_rust", + "--lib", + "--features", + "sketchlib-tests", + ]) + .status() + .expect("failed to spawn cargo test"); + + assert!( + status.success(), + "sketchlib backend tests failed (run `cargo test -p query_engine_rust --lib --features sketchlib-tests` for details)" + ); +} diff --git a/asap-sketch-ingest/templates/udfs/countminsketch_count.rs.j2 b/asap-sketch-ingest/templates/udfs/countminsketch_count.rs.j2 index 16b532c..4e13ceb 100644 --- a/asap-sketch-ingest/templates/udfs/countminsketch_count.rs.j2 +++ b/asap-sketch-ingest/templates/udfs/countminsketch_count.rs.j2 @@ -3,16 +3,34 @@ rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] } twox-hash = "2.1.0" +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } */ + use arroyo_udf_plugin::udf; use rmp_serde::Serializer; use serde::{Deserialize, Serialize}; use twox_hash::XxHash32; +use sketchlib_rust::{CountMin as SketchlibCountMin, RegularPath, SketchInput, Vector2D}; + // Count-Min Sketch parameters const DEPTH: usize = {{ depth }}; // Number of hash functions const WIDTH: usize = {{ width }}; // Number of buckets per hash function +// Implementation mode for Count-Min Sketch. Set at compile time; no env vars. +enum ImplMode { + Legacy, + Sketchlib, +} + +const IMPL_MODE: ImplMode = ImplMode::Sketchlib; + +fn use_sketchlib_for_cms() -> bool { + matches!(IMPL_MODE, ImplMode::Sketchlib) +} + +type SketchlibCms = SketchlibCountMin, RegularPath>; + #[derive(Serialize, Deserialize, Clone)] struct CountMinSketch { sketch: Vec>, @@ -29,7 +47,7 @@ impl CountMinSketch { } } - // Update the sketch with a key-value pair + // Legacy path: update the sketch with a key-value pair using twox-hash. fn update(&mut self, key: &str, value: f64) { for i in 0..self.row_num { // already UTF-8 @@ -42,17 +60,53 @@ impl CountMinSketch { #[udf] fn countminsketch_count(keys: Vec<&str>, values: Vec) -> Option> { - // Create a new Count-Min Sketch - let mut countminsketch = CountMinSketch::new(); + if use_sketchlib_for_cms() { + // sketchlib-rust backed implementation: integer counters + internal hashing. + let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH); - // Iterate through the keys and values and update the sketch for each entry - for (i, &key) in keys.iter().enumerate() { - countminsketch.update(key, 1.0); - } + for &key in keys.iter() { + let input = SketchInput::String(key.to_owned()); + inner.insert_many(&input, 1); + } + + // Convert sketchlib storage to legacy matrix wire format. + let storage: &Vector2D = inner.as_storage(); + let rows = storage.rows(); + let cols = storage.cols(); + let mut sketch = vec![vec![0.0; cols]; rows]; + + for r in 0..rows { + for c in 0..cols { + if let Some(v) = storage.get(r, c) { + sketch[r][c] = *v as f64; + } + } + } + + let countminsketch = CountMinSketch { + sketch, + row_num: rows, + col_num: cols, + }; - let mut buf = Vec::new(); - countminsketch - .serialize(&mut Serializer::new(&mut buf)) - .ok()?; - Some(buf) + let mut buf = Vec::new(); + countminsketch + .serialize(&mut Serializer::new(&mut buf)) + .ok()?; + Some(buf) + } else { + // Legacy twox-hash backed implementation (unchanged). + let mut countminsketch = CountMinSketch::new(); + + // Iterate through the keys and update the sketch for each entry + for &key in keys.iter() { + countminsketch.update(key, 1.0); + } + + let mut buf = Vec::new(); + countminsketch + .serialize(&mut Serializer::new(&mut buf)) + .ok()?; + Some(buf) + } } diff --git a/asap-sketch-ingest/templates/udfs/countminsketch_sum.rs.j2 b/asap-sketch-ingest/templates/udfs/countminsketch_sum.rs.j2 index 8bf0530..e851d76 100644 --- a/asap-sketch-ingest/templates/udfs/countminsketch_sum.rs.j2 +++ b/asap-sketch-ingest/templates/udfs/countminsketch_sum.rs.j2 @@ -3,16 +3,34 @@ rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] } twox-hash = "2.1.0" +sketchlib-rust = { git = "https://github.com/ProjectASAP/sketchlib-rust" } */ + use arroyo_udf_plugin::udf; use rmp_serde::Serializer; use serde::{Deserialize, Serialize}; use twox_hash::XxHash32; +use sketchlib_rust::{CountMin as SketchlibCountMin, RegularPath, SketchInput, Vector2D}; + // Count-Min Sketch parameters const DEPTH: usize = {{ depth }}; // Number of hash functions const WIDTH: usize = {{ width }}; // Number of buckets per hash function +// Implementation mode for Count-Min Sketch. Set at compile time; no env vars. +enum ImplMode { + Legacy, + Sketchlib, +} + +const IMPL_MODE: ImplMode = ImplMode::Sketchlib; + +fn use_sketchlib_for_cms() -> bool { + matches!(IMPL_MODE, ImplMode::Sketchlib) +} + +type SketchlibCms = SketchlibCountMin, RegularPath>; + #[derive(Serialize, Deserialize, Clone)] struct CountMinSketch { sketch: Vec>, @@ -29,7 +47,7 @@ impl CountMinSketch { } } - // Update the sketch with a key-value pair + // Legacy path: update the sketch with a key-value pair using twox-hash. fn update(&mut self, key: &str, value: f64) { for i in 0..self.row_num { // already UTF-8 @@ -47,17 +65,59 @@ fn countminsketch_sum(keys: Vec<&str>, values: Vec) -> Option> { return None; } - // Create a new Count-Min Sketch - let mut countminsketch = CountMinSketch::new(); + if use_sketchlib_for_cms() { + // sketchlib-rust backed implementation: integer counters + internal hashing. + let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH); - // Iterate through the keys and values and update the sketch for each entry - for (i, &key) in keys.iter().enumerate() { - countminsketch.update(key, values[i]); - } + for (i, &key) in keys.iter().enumerate() { + let value = values[i]; + // Values arrive as f64; Count-Min counters are integers. + let many = value.round() as i64; + if many <= 0 { + continue; + } + let input = SketchInput::String(key.to_owned()); + inner.insert_many(&input, many); + } + + // Convert sketchlib storage to legacy matrix wire format. + let storage: &Vector2D = inner.as_storage(); + let rows = storage.rows(); + let cols = storage.cols(); + let mut sketch = vec![vec![0.0; cols]; rows]; + + for r in 0..rows { + for c in 0..cols { + if let Some(v) = storage.get(r, c) { + sketch[r][c] = *v as f64; + } + } + } + + let countminsketch = CountMinSketch { + sketch, + row_num: rows, + col_num: cols, + }; - let mut buf = Vec::new(); - countminsketch - .serialize(&mut Serializer::new(&mut buf)) - .ok()?; - Some(buf) + let mut buf = Vec::new(); + countminsketch + .serialize(&mut Serializer::new(&mut buf)) + .ok()?; + Some(buf) + } else { + // Legacy twox-hash backed implementation (unchanged). + let mut countminsketch = CountMinSketch::new(); + + // Iterate through the keys and values and update the sketch for each entry + for (i, &key) in keys.iter().enumerate() { + countminsketch.update(key, values[i]); + } + + let mut buf = Vec::new(); + countminsketch + .serialize(&mut Serializer::new(&mut buf)) + .ok()?; + Some(buf) + } } From 23bfaf2b0a69dec20f3712aa82714e96b14fa990 Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Fri, 20 Mar 2026 14:51:04 -0400 Subject: [PATCH 2/8] Restore per-backend default constants, global default Legacy --- asap-common/sketch-core/src/config.rs | 28 +++++++++++++++++++-------- asap-query-engine/src/main.rs | 6 +++--- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/asap-common/sketch-core/src/config.rs b/asap-common/sketch-core/src/config.rs index 9d4c9a5..9f49ad7 100644 --- a/asap-common/sketch-core/src/config.rs +++ b/asap-common/sketch-core/src/config.rs @@ -9,18 +9,26 @@ pub enum ImplMode { Sketchlib, } +/// Global default when impl mode is not explicitly configured (e.g. env var parsing). +pub const DEFAULT_IMPL_MODE: ImplMode = ImplMode::Legacy; + +/// Per-backend defaults. Used when configure() has not been called. +pub const DEFAULT_CMS_IMPL: ImplMode = ImplMode::Sketchlib; +pub const DEFAULT_KLL_IMPL: ImplMode = ImplMode::Legacy; +pub const DEFAULT_CMWH_IMPL: ImplMode = ImplMode::Legacy; + static COUNTMIN_MODE: OnceLock = OnceLock::new(); /// Returns true if Count-Min operations should use sketchlib-rust internally. pub fn use_sketchlib_for_count_min() -> bool { - *COUNTMIN_MODE.get_or_init(|| ImplMode::Sketchlib) == ImplMode::Sketchlib + *COUNTMIN_MODE.get_or_init(|| DEFAULT_CMS_IMPL) == ImplMode::Sketchlib } static KLL_MODE: OnceLock = OnceLock::new(); /// Returns true if KLL operations should use sketchlib-rust internally. pub fn use_sketchlib_for_kll() -> bool { - *KLL_MODE.get_or_init(|| ImplMode::Legacy) == ImplMode::Sketchlib + *KLL_MODE.get_or_init(|| DEFAULT_KLL_IMPL) == ImplMode::Sketchlib } static COUNTMIN_WITH_HEAP_MODE: OnceLock = OnceLock::new(); @@ -28,7 +36,7 @@ static COUNTMIN_WITH_HEAP_MODE: OnceLock = OnceLock::new(); /// Returns true if Count-Min-With-Heap operations should use sketchlib-rust internally for the /// Count-Min portion. pub fn use_sketchlib_for_count_min_with_heap() -> bool { - *COUNTMIN_WITH_HEAP_MODE.get_or_init(|| ImplMode::Legacy) == ImplMode::Sketchlib + *COUNTMIN_WITH_HEAP_MODE.get_or_init(|| DEFAULT_CMWH_IMPL) == ImplMode::Sketchlib } /// Set backend modes for all sketch types. Call once at process startup, @@ -58,14 +66,18 @@ pub fn parse_mode(var: Result) -> ImplMode { "legacy" => ImplMode::Legacy, "sketchlib" => ImplMode::Sketchlib, other => { - eprintln!("sketch-core: unrecognised IMPL value {other:?}, defaulting to Legacy"); - ImplMode::Legacy + eprintln!( + "sketch-core: unrecognised IMPL value {other:?}, defaulting to {DEFAULT_IMPL_MODE:?}" + ); + DEFAULT_IMPL_MODE } }, - Err(std::env::VarError::NotPresent) => ImplMode::Legacy, + Err(std::env::VarError::NotPresent) => DEFAULT_IMPL_MODE, Err(std::env::VarError::NotUnicode(v)) => { - eprintln!("sketch-core: IMPL env var has invalid UTF-8 ({v:?}), defaulting to Legacy"); - ImplMode::Legacy + eprintln!( + "sketch-core: IMPL env var has invalid UTF-8 ({v:?}), defaulting to {DEFAULT_IMPL_MODE:?}" + ); + DEFAULT_IMPL_MODE } } } diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index fa211e0..00be2fe 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -111,15 +111,15 @@ struct Args { promsketch_config: Option, /// Backend implementation for Count-Min Sketch (legacy | sketchlib) - #[arg(long, value_enum, default_value = "sketchlib")] + #[arg(long, value_enum, default_value_t = config::DEFAULT_CMS_IMPL)] sketch_cms_impl: ImplMode, /// Backend implementation for KLL Sketch (legacy | sketchlib) - #[arg(long, value_enum, default_value = "legacy")] + #[arg(long, value_enum, default_value_t = config::DEFAULT_KLL_IMPL)] sketch_kll_impl: ImplMode, /// Backend implementation for Count-Min-With-Heap (legacy | sketchlib) - #[arg(long, value_enum, default_value = "legacy")] + #[arg(long, value_enum, default_value_t = config::DEFAULT_CMWH_IMPL)] sketch_cmwh_impl: ImplMode, /// Enable OTLP metrics ingest (gRPC + HTTP) From 1733039a5b7a007787a58bc77d9591dbda1e88f3 Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Fri, 20 Mar 2026 15:00:13 -0400 Subject: [PATCH 3/8] Use per-backend defaults in fidelity, configurable impl_mode in UDF templates --- .../sketch-core/src/bin/sketchlib_fidelity.rs | 8 ++++---- asap-summary-ingest/run_arroyosketch.py | 12 ++++++++++++ .../templates/udfs/countminsketch_count.rs.j2 | 2 +- .../templates/udfs/countminsketch_sum.rs.j2 | 2 +- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/asap-common/sketch-core/src/bin/sketchlib_fidelity.rs b/asap-common/sketch-core/src/bin/sketchlib_fidelity.rs index c9c6f97..8fd08e3 100644 --- a/asap-common/sketch-core/src/bin/sketchlib_fidelity.rs +++ b/asap-common/sketch-core/src/bin/sketchlib_fidelity.rs @@ -1,4 +1,4 @@ -// rank_fraction used for KLL benchmarks (added in later PR) +// Fidelity benchmarks comparing legacy vs sketchlib implementations across sketch types. #![allow(dead_code)] use clap::Parser; @@ -145,11 +145,11 @@ fn run_countmin_once(seed: u64, p: &CmsParams) -> CmsResult { #[derive(Parser)] struct Args { - #[arg(long, value_enum, default_value = "sketchlib")] + #[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_CMS_IMPL)] cms_impl: ImplMode, - #[arg(long, value_enum, default_value = "sketchlib")] + #[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_KLL_IMPL)] kll_impl: ImplMode, - #[arg(long, value_enum, default_value = "sketchlib")] + #[arg(long, value_enum, default_value_t = sketch_core::config::DEFAULT_CMWH_IMPL)] cmwh_impl: ImplMode, } diff --git a/asap-summary-ingest/run_arroyosketch.py b/asap-summary-ingest/run_arroyosketch.py index af0a4fc..3769723 100644 --- a/asap-summary-ingest/run_arroyosketch.py +++ b/asap-summary-ingest/run_arroyosketch.py @@ -464,6 +464,18 @@ def create_pipeline( template_source, udf_template.environment ) + # Per-UDF impl mode defaults (aligned with sketch-core config) + UDF_IMPL_DEFAULTS = { + "countminsketch_count": "Sketchlib", + "countminsketch_sum": "Sketchlib", + "countminsketchwithheap_topk": "Sketchlib", + "datasketcheskll_": "Sketchlib", + "hydrakll_": "Sketchlib", + } + params.setdefault( + "impl_mode", UDF_IMPL_DEFAULTS.get(udf_name, "Sketchlib") + ) + # Handle config key mapping (K -> k for KLL) if "K" in params and "k" in required_params: params["k"] = params["K"] diff --git a/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 b/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 index 4e13ceb..1b78553 100644 --- a/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 +++ b/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 @@ -23,7 +23,7 @@ enum ImplMode { Sketchlib, } -const IMPL_MODE: ImplMode = ImplMode::Sketchlib; +const IMPL_MODE: ImplMode = ImplMode::{{ impl_mode | default("Sketchlib") }}; fn use_sketchlib_for_cms() -> bool { matches!(IMPL_MODE, ImplMode::Sketchlib) diff --git a/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 b/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 index e851d76..759963c 100644 --- a/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 +++ b/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 @@ -23,7 +23,7 @@ enum ImplMode { Sketchlib, } -const IMPL_MODE: ImplMode = ImplMode::Sketchlib; +const IMPL_MODE: ImplMode = ImplMode::{{ impl_mode | default("Sketchlib") }}; fn use_sketchlib_for_cms() -> bool { matches!(IMPL_MODE, ImplMode::Sketchlib) From dd37cf0ea5721b9044b1eb05b67542de5c8ecc8e Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Fri, 20 Mar 2026 15:11:17 -0400 Subject: [PATCH 4/8] report: scope to CMS only for PR 3 --- asap-common/sketch-core/report.md | 96 +++---------------------------- 1 file changed, 9 insertions(+), 87 deletions(-) diff --git a/asap-common/sketch-core/report.md b/asap-common/sketch-core/report.md index e1859cc..1230f10 100644 --- a/asap-common/sketch-core/report.md +++ b/asap-common/sketch-core/report.md @@ -1,26 +1,17 @@ -# Report +# Sketchlib Fidelity Report -Compares the **legacy** sketch implementations in `sketch-core` vs the new **sketchlib-rust** backends for: +Compares the **legacy** Count-Min Sketch implementation in `sketch-core` vs the new **sketchlib-rust** backend. -- `CountMinSketch` -- `CountMinSketchWithHeap` (Count-Min portion) -- `KllSketch` -- `HydraKllSketch` (via `KllSketch`) +## Fidelity harness +The fidelity binary selects backends via CLI flags. +| Goal | Command | +|-------------|---------------------------------------------------------------| +| CMS sketchlib | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl sketchlib` | +| CMS legacy | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl legacy` | - -### Fidelity harness - -The fidelity binary now selects backends via CLI flags instead of environment variables. - -| Goal | Command | -|--------------------------|--------------------------------------------------------------------------------------------------------------| -| Default (all sketchlib) | `cargo run -p sketch-core --bin sketchlib_fidelity` | -| All legacy | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl legacy --kll-impl legacy --cmwh-impl legacy` | -| Legacy KLL only | `cargo run -p sketch-core --bin sketchlib_fidelity -- --cms-impl sketchlib --kll-impl legacy --cmwh-impl sketchlib` | - -### Unit tests +## Unit tests Unit tests always run with **legacy** backends enabled (the test ctor calls `force_legacy_mode_for_tests()`), so you only need: @@ -55,72 +46,3 @@ cargo test -p sketch-core |-------|--------|--------|----------------|----------------|----------|----------| | 4096 | 200000 | 2000 | Legacy | 0.9999993694 | 0.20 | 3.69 | | 4096 | 200000 | 2000 | sketchlib-rust | 0.9999993499 | 0.21 | 4.27 | - ---- - -### CountMinSketchWithHeap (top-k + CMS accuracy on exact top-k) - -The heap is maintained by local updates; recall is measured against the **true** top-k at the end of the stream. - -#### depth=3 - -| width | n | domain | heap_size | Mode | Top-k recall | Pearson (top-k) | MAPE (%) | RMSE (%) | -|-------|--------|--------|-----------|----------------|--------------|-----------------|----------|----------| -| 1024 | 100000 | 1000 | 10 | Legacy | 0.40 | 0.9571 | 0.174 | 0.319 | -| 1024 | 100000 | 1000 | 10 | sketchlib-rust | 0.40 | 1.0000 | 0.000 | 0.000 | - -#### depth=5 - -| width | n | domain | heap_size | Mode | Top-k recall | Pearson (top-k) | MAPE (%) | RMSE (%) | -|-------|--------|--------|-----------|----------------|--------------|-----------------|----------|----------| -| 2048 | 200000 | 2000 | 20 | Legacy | 0.60 | 0.9964 | 0.045 | 0.101 | -| 2048 | 200000 | 2000 | 20 | sketchlib-rust | 0.60 | 0.9982 | 0.021 | 0.067 | -| 2048 | 200000 | 2000 | 50 | Legacy | 0.40 | 0.9999983 | 5.60 | 16.49 | -| 2048 | 200000 | 2000 | 50 | sketchlib-rust | 0.40 | 0.9999990 | 3.90 | 12.95 | - ---- - -### KllSketch (quantiles, absolute rank error) - -For each quantile \(q\), we compute the sketch estimate `est_value`, then: -`abs_rank_error = |rank_fraction(exact_sorted_values, est_value) - q|`. - -#### k=20 - -| n_updates | Mode | q=0.5 | q=0.9 | q=0.99 | -|-----------|----------------|---------|---------|---------| -| 200000 | Legacy | 0.0104 | 0.0145 | 0.0028 | -| 200000 | sketchlib-rust | 0.0275 | 0.0470 | 0.0061 | -| 50000 | Legacy | 0.0131 | 0.0091 | 0.0054 | -| 50000 | sketchlib-rust | 0.0110 | 0.0116 | 0.0031 | - -#### k=50 - -| n_updates | Mode | q=0.5 | q=0.9 | q=0.99 | -|-----------|----------------|---------|---------|---------| -### CountMinSketch (accuracy vs exact counts) - -#### depth=3 - -| width | n | domain | Mode | Pearson corr | MAPE (%) | RMSE (%) | -|-------|--------|--------|----------------|----------------|----------|----------| -| 1024 | 100000 | 1000 | Legacy | 0.9998451189 | 24.48 | 52.76 | -| 1024 | 100000 | 1000 | sketchlib-rust | 0.9998387103 | 24.36 | 54.11 | - -#### depth=5 - -| width | n | domain | Mode | Pearson corr | MAPE (%) | RMSE (%) | -|-------|--------|--------|----------------|----------------|----------|----------| -| 2048 | 200000 | 2000 | Legacy | 0.9999733814 | 8.75 | 29.94 | -| 2048 | 200000 | 2000 | sketchlib-rust | 0.9999744627 | 8.37 | 28.84 | -| 2048 | 50000 | 500 | Legacy | 1.0000000000 | 0.00 | 0.00 | -| 2048 | 50000 | 500 | sketchlib-rust | 1.0000000000 | 0.00 | 0.00 | - -#### depth=7 - -| width | n | domain | Mode | Pearson corr | MAPE (%) | RMSE (%) | -|-------|--------|--------|----------------|----------------|----------|----------| -| 4096 | 200000 | 2000 | Legacy | 0.9999993694 | 0.20 | 3.69 | -| 4096 | 200000 | 2000 | sketchlib-rust | 0.9999993499 | 0.21 | 4.27 | - ---- From 81126b1b98a421c64cea9c0188c7638209f392dd Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Fri, 20 Mar 2026 15:13:52 -0400 Subject: [PATCH 5/8] UDFs: use same impl mode as QueryEngine (sketch_cms_impl, etc.) --- asap-summary-ingest/run_arroyosketch.py | 41 ++++++++++++++++++- .../experiment_utils/services/arroyo.py | 15 ++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/asap-summary-ingest/run_arroyosketch.py b/asap-summary-ingest/run_arroyosketch.py index 3769723..dabf91c 100644 --- a/asap-summary-ingest/run_arroyosketch.py +++ b/asap-summary-ingest/run_arroyosketch.py @@ -380,6 +380,16 @@ def delete_connection_table(args, table_name): ) +# Map UDF names to the sketch impl CLI arg. UDFs use same impl mode as QueryEngine. +_UDF_IMPL_MODE_ARG = { + "countminsketch_count": "sketch_cms_impl", + "countminsketch_sum": "sketch_cms_impl", + "countminsketchwithheap_topk": "sketch_cmwh_impl", + "datasketcheskll_": "sketch_kll_impl", + "hydrakll_": "sketch_kll_impl", +} + + def create_pipeline( args: argparse.Namespace, sql_queries: List[str], @@ -444,7 +454,13 @@ def create_pipeline( regular_path = os.path.join(udf_dir, f"{udf_name}.rs") # Get parameters for this UDF - params = agg_function_params.get(udf_name, {}) + params = dict(agg_function_params.get(udf_name, {})) + + # Inject impl_mode from CLI so UDFs use same backend as QueryEngine + impl_arg = _UDF_IMPL_MODE_ARG.get(udf_name) + if impl_arg: + impl_val = getattr(args, impl_arg, "legacy") + params["impl_mode"] = impl_val.capitalize() # "Legacy" or "Sketchlib" if len(params) > 0 and not os.path.exists(template_path): raise ValueError( @@ -1112,6 +1128,29 @@ def main(args): help="Query language for schema interpretation (default: promql)", ) + # Sketch implementation mode - must match QueryEngine (--sketch-cms-impl etc.) + parser.add_argument( + "--sketch_cms_impl", + type=str, + choices=["legacy", "sketchlib"], + default="legacy", + help="Count-Min Sketch backend (legacy | sketchlib). Must match QueryEngine.", + ) + parser.add_argument( + "--sketch_kll_impl", + type=str, + choices=["legacy", "sketchlib"], + default="legacy", + help="KLL Sketch backend (legacy | sketchlib). Must match QueryEngine.", + ) + parser.add_argument( + "--sketch_cmwh_impl", + type=str, + choices=["legacy", "sketchlib"], + default="legacy", + help="Count-Min-With-Heap backend (legacy | sketchlib). Must match QueryEngine.", + ) + args = parser.parse_args() check_args(args) main(args) diff --git a/asap-tools/experiments/experiment_utils/services/arroyo.py b/asap-tools/experiments/experiment_utils/services/arroyo.py index e329a80..db9e9fc 100644 --- a/asap-tools/experiments/experiment_utils/services/arroyo.py +++ b/asap-tools/experiments/experiment_utils/services/arroyo.py @@ -105,6 +105,9 @@ def run_arroyosketch( use_kafka_ingest: bool = False, enable_optimized_remote_write: bool = False, avoid_long_ssh: bool = False, + sketch_cms_impl: str = "legacy", + sketch_kll_impl: str = "legacy", + sketch_cmwh_impl: str = "legacy", ) -> str: """ Run ArroyoSketch pipeline. @@ -122,6 +125,9 @@ def run_arroyosketch( parallelism: Pipeline parallelism enable_optimized_remote_write: If True, use optimized Prometheus remote_write source (10-20x faster) avoid_long_ssh: If True, run command in background to avoid long SSH connections + sketch_cms_impl: Count-Min Sketch backend (legacy|sketchlib). Must match QueryEngine. + sketch_kll_impl: KLL Sketch backend (legacy|sketchlib). Must match QueryEngine. + sketch_cmwh_impl: Count-Min-With-Heap backend (legacy|sketchlib). Must match QueryEngine. Returns: Pipeline ID @@ -134,7 +140,7 @@ def run_arroyosketch( ) if use_kafka_ingest: - cmd = "python run_arroyosketch.py --source_type kafka --kafka_input_format {} --output_format {} --pipeline_name {} --config_file_path {}/streaming_config.yaml --input_kafka_topic {} --output_kafka_topic {} --output_dir {}".format( + cmd = "python run_arroyosketch.py --source_type kafka --kafka_input_format {} --output_format {} --pipeline_name {} --config_file_path {}/streaming_config.yaml --input_kafka_topic {} --output_kafka_topic {} --output_dir {} --sketch_cms_impl {} --sketch_kll_impl {} --sketch_cmwh_impl {}".format( flink_input_format, flink_output_format, experiment_name, @@ -142,6 +148,9 @@ def run_arroyosketch( constants.FLINK_INPUT_TOPIC, constants.FLINK_OUTPUT_TOPIC, arroyosketch_output_dir, + sketch_cms_impl, + sketch_kll_impl, + sketch_cmwh_impl, ) else: # Build base command for Prometheus remote write @@ -159,6 +168,10 @@ def run_arroyosketch( # Add optimized source flag if enabled if enable_optimized_remote_write: cmd += " --prometheus_remote_write_source optimized" + # Sketch impl mode - must match QueryEngine + cmd += " --sketch_cms_impl {} --sketch_kll_impl {} --sketch_cmwh_impl {}".format( + sketch_cms_impl, sketch_kll_impl, sketch_cmwh_impl + ) cmd_dir = os.path.join( constants.CLOUDLAB_HOME_DIR, "code", "asap-summary-ingest" ) From 3bee5844549d34fccc2d8029e94db0e313f916be Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Fri, 20 Mar 2026 15:29:12 -0400 Subject: [PATCH 6/8] Simplify UDF impl mode, default CMS to sketchlib --- asap-summary-ingest/run_arroyosketch.py | 46 +++++++------------ .../experiment_utils/services/arroyo.py | 2 +- 2 files changed, 17 insertions(+), 31 deletions(-) diff --git a/asap-summary-ingest/run_arroyosketch.py b/asap-summary-ingest/run_arroyosketch.py index dabf91c..b0c07a5 100644 --- a/asap-summary-ingest/run_arroyosketch.py +++ b/asap-summary-ingest/run_arroyosketch.py @@ -380,16 +380,6 @@ def delete_connection_table(args, table_name): ) -# Map UDF names to the sketch impl CLI arg. UDFs use same impl mode as QueryEngine. -_UDF_IMPL_MODE_ARG = { - "countminsketch_count": "sketch_cms_impl", - "countminsketch_sum": "sketch_cms_impl", - "countminsketchwithheap_topk": "sketch_cmwh_impl", - "datasketcheskll_": "sketch_kll_impl", - "hydrakll_": "sketch_kll_impl", -} - - def create_pipeline( args: argparse.Namespace, sql_queries: List[str], @@ -453,15 +443,9 @@ def create_pipeline( template_path = os.path.join(udf_dir, f"{udf_name}.rs.j2") regular_path = os.path.join(udf_dir, f"{udf_name}.rs") - # Get parameters for this UDF + # Get parameters for this UDF (impl_mode injected in main() for sketch UDFs) params = dict(agg_function_params.get(udf_name, {})) - # Inject impl_mode from CLI so UDFs use same backend as QueryEngine - impl_arg = _UDF_IMPL_MODE_ARG.get(udf_name) - if impl_arg: - impl_val = getattr(args, impl_arg, "legacy") - params["impl_mode"] = impl_val.capitalize() # "Legacy" or "Sketchlib" - if len(params) > 0 and not os.path.exists(template_path): raise ValueError( f"UDF {udf_name} requires parameters {params} but no template found at {template_path}" @@ -480,18 +464,6 @@ def create_pipeline( template_source, udf_template.environment ) - # Per-UDF impl mode defaults (aligned with sketch-core config) - UDF_IMPL_DEFAULTS = { - "countminsketch_count": "Sketchlib", - "countminsketch_sum": "Sketchlib", - "countminsketchwithheap_topk": "Sketchlib", - "datasketcheskll_": "Sketchlib", - "hydrakll_": "Sketchlib", - } - params.setdefault( - "impl_mode", UDF_IMPL_DEFAULTS.get(udf_name, "Sketchlib") - ) - # Handle config key mapping (K -> k for KLL) if "K" in params and "k" in required_params: params["k"] = params["K"] @@ -974,6 +946,20 @@ def main(args): filter_metric_name, ) + parameters = dict(parameters) + if agg_function in ("countminsketch_count", "countminsketch_sum"): + parameters["impl_mode"] = getattr( + args, "sketch_cms_impl", "legacy" + ).capitalize() + elif agg_function == "countminsketchwithheap_topk": + parameters["impl_mode"] = getattr( + args, "sketch_cmwh_impl", "legacy" + ).capitalize() + elif agg_function in ("datasketcheskll_", "hydrakll_"): + parameters["impl_mode"] = getattr( + args, "sketch_kll_impl", "legacy" + ).capitalize() + sql_queries.append(sql_query) # if not is_labels_accumulator: agg_functions_with_params.append((agg_function, parameters)) @@ -1133,7 +1119,7 @@ def main(args): "--sketch_cms_impl", type=str, choices=["legacy", "sketchlib"], - default="legacy", + default="sketchlib", help="Count-Min Sketch backend (legacy | sketchlib). Must match QueryEngine.", ) parser.add_argument( diff --git a/asap-tools/experiments/experiment_utils/services/arroyo.py b/asap-tools/experiments/experiment_utils/services/arroyo.py index db9e9fc..aeee8bc 100644 --- a/asap-tools/experiments/experiment_utils/services/arroyo.py +++ b/asap-tools/experiments/experiment_utils/services/arroyo.py @@ -105,7 +105,7 @@ def run_arroyosketch( use_kafka_ingest: bool = False, enable_optimized_remote_write: bool = False, avoid_long_ssh: bool = False, - sketch_cms_impl: str = "legacy", + sketch_cms_impl: str = "sketchlib", sketch_kll_impl: str = "legacy", sketch_cmwh_impl: str = "legacy", ) -> str: From ba2ba37e350866fc904c6d781475b1f4dc22550a Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Fri, 20 Mar 2026 15:32:34 -0400 Subject: [PATCH 7/8] Fix black formatting in arroyo.py --- asap-tools/experiments/experiment_utils/services/arroyo.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/asap-tools/experiments/experiment_utils/services/arroyo.py b/asap-tools/experiments/experiment_utils/services/arroyo.py index aeee8bc..de58dc4 100644 --- a/asap-tools/experiments/experiment_utils/services/arroyo.py +++ b/asap-tools/experiments/experiment_utils/services/arroyo.py @@ -169,8 +169,10 @@ def run_arroyosketch( if enable_optimized_remote_write: cmd += " --prometheus_remote_write_source optimized" # Sketch impl mode - must match QueryEngine - cmd += " --sketch_cms_impl {} --sketch_kll_impl {} --sketch_cmwh_impl {}".format( - sketch_cms_impl, sketch_kll_impl, sketch_cmwh_impl + cmd += ( + " --sketch_cms_impl {} --sketch_kll_impl {} --sketch_cmwh_impl {}".format( + sketch_cms_impl, sketch_kll_impl, sketch_cmwh_impl + ) ) cmd_dir = os.path.join( constants.CLOUDLAB_HOME_DIR, "code", "asap-summary-ingest" From 5090ab9561521070c94a736ee632b9540682920b Mon Sep 17 00:00:00 2001 From: Gnanesh Date: Mon, 23 Mar 2026 11:31:46 -0400 Subject: [PATCH 8/8] Set deafult mode to sketchlib --- asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 | 3 ++- asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 b/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 index 1b78553..6da33fb 100644 --- a/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 +++ b/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 @@ -23,7 +23,8 @@ enum ImplMode { Sketchlib, } -const IMPL_MODE: ImplMode = ImplMode::{{ impl_mode | default("Sketchlib") }}; +{% set _impl_mode = impl_mode | default("Sketchlib") %} +const IMPL_MODE: ImplMode = ImplMode::{% if _impl_mode == "Legacy" or _impl_mode == "Sketchlib" %}{{ _impl_mode }}{% else %}Sketchlib{% endif %}; fn use_sketchlib_for_cms() -> bool { matches!(IMPL_MODE, ImplMode::Sketchlib) diff --git a/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 b/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 index 759963c..0c83694 100644 --- a/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 +++ b/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 @@ -23,7 +23,8 @@ enum ImplMode { Sketchlib, } -const IMPL_MODE: ImplMode = ImplMode::{{ impl_mode | default("Sketchlib") }}; +{% set _impl_mode = impl_mode | default("Sketchlib") %} +const IMPL_MODE: ImplMode = ImplMode::{% if _impl_mode == "Legacy" or _impl_mode == "Sketchlib" %}{{ _impl_mode }}{% else %}Sketchlib{% endif %}; fn use_sketchlib_for_cms() -> bool { matches!(IMPL_MODE, ImplMode::Sketchlib)