diff --git a/asap-summary-ingest/run_arroyosketch.py b/asap-summary-ingest/run_arroyosketch.py index 8fad3912..d15b0cdc 100644 --- a/asap-summary-ingest/run_arroyosketch.py +++ b/asap-summary-ingest/run_arroyosketch.py @@ -443,7 +443,6 @@ def create_pipeline( template_path = os.path.join(udf_dir, f"{udf_name}.rs.j2") regular_path = os.path.join(udf_dir, f"{udf_name}.rs") - # Get parameters for this UDF (impl_mode injected in main() for sketch UDFs) params = dict(agg_function_params.get(udf_name, {})) if len(params) > 0 and not os.path.exists(template_path): @@ -943,20 +942,6 @@ def main(args): filter_metric_name, ) - parameters = dict(parameters) - if agg_function in ("countminsketch_count", "countminsketch_sum"): - parameters["impl_mode"] = getattr( - args, "sketch_cms_impl", "legacy" - ).capitalize() - elif agg_function == "countminsketchwithheap_topk": - parameters["impl_mode"] = getattr( - args, "sketch_cmwh_impl", "legacy" - ).capitalize() - elif agg_function in ("datasketcheskll_", "hydrakll_"): - parameters["impl_mode"] = getattr( - args, "sketch_kll_impl", "sketchlib" - ).capitalize() - sql_queries.append(sql_query) # if not is_labels_accumulator: agg_functions_with_params.append((agg_function, parameters)) @@ -1111,29 +1096,6 @@ def main(args): help="Query language for schema interpretation (default: promql)", ) - # Sketch implementation mode - must match QueryEngine (--sketch-cms-impl etc.) - parser.add_argument( - "--sketch_cms_impl", - type=str, - choices=["legacy", "sketchlib"], - default="sketchlib", - help="Count-Min Sketch backend (legacy | sketchlib). Must match QueryEngine.", - ) - parser.add_argument( - "--sketch_kll_impl", - type=str, - choices=["legacy", "sketchlib"], - default="sketchlib", - help="KLL Sketch backend (legacy | sketchlib). Must match QueryEngine.", - ) - parser.add_argument( - "--sketch_cmwh_impl", - type=str, - choices=["legacy", "sketchlib"], - default="sketchlib", - help="Count-Min-With-Heap backend (legacy | sketchlib). Must match QueryEngine.", - ) - args = parser.parse_args() check_args(args) main(args) diff --git a/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 b/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 index f49acd1a..91135553 100644 --- a/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 +++ b/asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2 @@ -2,34 +2,18 @@ [dependencies] rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] } -twox-hash = "2.1.0" asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" } */ use arroyo_udf_plugin::udf; use rmp_serde::Serializer; use serde::{Deserialize, Serialize}; -use twox_hash::XxHash32; use asap_sketchlib::{CountMin as SketchlibCountMin, RegularPath, DataInput, Vector2D}; -// Count-Min Sketch parameters const DEPTH: usize = {{ depth }}; // Number of hash functions const WIDTH: usize = {{ width }}; // Number of buckets per hash function -// Implementation mode for Count-Min Sketch. Set at compile time; no env vars. -enum ImplMode { - Legacy, - Sketchlib, -} - -{% set _impl_mode = impl_mode | default("Sketchlib") %} -const IMPL_MODE: ImplMode = ImplMode::{% if _impl_mode == "Legacy" or _impl_mode == "Sketchlib" %}{{ _impl_mode }}{% else %}Sketchlib{% endif %}; - -fn use_sketchlib_for_cms() -> bool { - matches!(IMPL_MODE, ImplMode::Sketchlib) -} - type SketchlibCms = SketchlibCountMin, RegularPath>; #[derive(Serialize, Deserialize, Clone)] @@ -39,75 +23,37 @@ struct CountMinSketch { col_num: usize, } -impl CountMinSketch { - fn new() -> Self { - CountMinSketch { - sketch: vec![vec![0.0; WIDTH]; DEPTH], - row_num: DEPTH, - col_num: WIDTH, - } - } - - // Legacy path: update the sketch with a key-value pair using twox-hash. - fn update(&mut self, key: &str, value: f64) { - for i in 0..self.row_num { - // already UTF-8 - let hash = XxHash32::oneshot(i as u32, key.as_bytes()); - let bucket = (hash as usize) % self.col_num; - self.sketch[i][bucket] += value; - } - } -} - #[udf] fn countminsketch_count(keys: Vec<&str>, values: Vec) -> Option> { - if use_sketchlib_for_cms() { - // asap_sketchlib backed implementation: integer counters + internal hashing. - let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH); + let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH); - for &key in keys.iter() { - let input = DataInput::String(key.to_owned()); - inner.insert_many(&input, 1); - } + for &key in keys.iter() { + let input = DataInput::String(key.to_owned()); + inner.insert_many(&input, 1); + } - // Convert sketchlib storage to legacy matrix wire format. - let storage: &Vector2D = inner.as_storage(); - let rows = storage.rows(); - let cols = storage.cols(); - let mut sketch = vec![vec![0.0; cols]; rows]; + let storage: &Vector2D = inner.as_storage(); + let rows = storage.rows(); + let cols = storage.cols(); + let mut sketch = vec![vec![0.0; cols]; rows]; - for r in 0..rows { - for c in 0..cols { - if let Some(v) = storage.get(r, c) { - sketch[r][c] = *v as f64; - } + for r in 0..rows { + for c in 0..cols { + if let Some(v) = storage.get(r, c) { + sketch[r][c] = *v as f64; } } - - let countminsketch = CountMinSketch { - sketch, - row_num: rows, - col_num: cols, - }; - - let mut buf = Vec::new(); - countminsketch - .serialize(&mut Serializer::new(&mut buf)) - .ok()?; - Some(buf) - } else { - // Legacy twox-hash backed implementation (unchanged). - let mut countminsketch = CountMinSketch::new(); - - // Iterate through the keys and update the sketch for each entry - for &key in keys.iter() { - countminsketch.update(key, 1.0); - } - - let mut buf = Vec::new(); - countminsketch - .serialize(&mut Serializer::new(&mut buf)) - .ok()?; - Some(buf) } + + let countminsketch = CountMinSketch { + sketch, + row_num: rows, + col_num: cols, + }; + + let mut buf = Vec::new(); + countminsketch + .serialize(&mut Serializer::new(&mut buf)) + .ok()?; + Some(buf) } diff --git a/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 b/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 index 3ce62c57..b343bc44 100644 --- a/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 +++ b/asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2 @@ -2,34 +2,18 @@ [dependencies] rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] } -twox-hash = "2.1.0" asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" } */ use arroyo_udf_plugin::udf; use rmp_serde::Serializer; use serde::{Deserialize, Serialize}; -use twox_hash::XxHash32; use asap_sketchlib::{CountMin as SketchlibCountMin, RegularPath, DataInput, Vector2D}; -// Count-Min Sketch parameters const DEPTH: usize = {{ depth }}; // Number of hash functions const WIDTH: usize = {{ width }}; // Number of buckets per hash function -// Implementation mode for Count-Min Sketch. Set at compile time; no env vars. -enum ImplMode { - Legacy, - Sketchlib, -} - -{% set _impl_mode = impl_mode | default("Sketchlib") %} -const IMPL_MODE: ImplMode = ImplMode::{% if _impl_mode == "Legacy" or _impl_mode == "Sketchlib" %}{{ _impl_mode }}{% else %}Sketchlib{% endif %}; - -fn use_sketchlib_for_cms() -> bool { - matches!(IMPL_MODE, ImplMode::Sketchlib) -} - type SketchlibCms = SketchlibCountMin, RegularPath>; #[derive(Serialize, Deserialize, Clone)] @@ -39,86 +23,46 @@ struct CountMinSketch { col_num: usize, } -impl CountMinSketch { - fn new() -> Self { - CountMinSketch { - sketch: vec![vec![0.0; WIDTH]; DEPTH], - row_num: DEPTH, - col_num: WIDTH, - } - } - - // Legacy path: update the sketch with a key-value pair using twox-hash. - fn update(&mut self, key: &str, value: f64) { - for i in 0..self.row_num { - // already UTF-8 - let hash = XxHash32::oneshot(i as u32, key.as_bytes()); - let bucket = (hash as usize) % self.col_num; - self.sketch[i][bucket] += value; - } - } -} - #[udf] fn countminsketch_sum(keys: Vec<&str>, values: Vec) -> Option> { - // Check that keys and values have equal length if keys.len() != values.len() { return None; } - if use_sketchlib_for_cms() { - // asap_sketchlib backed implementation: integer counters + internal hashing. - let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH); + let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH); - for (i, &key) in keys.iter().enumerate() { - let value = values[i]; - // Values arrive as f64; Count-Min counters are integers. - let many = value.round() as i64; - if many <= 0 { - continue; - } - let input = DataInput::String(key.to_owned()); - inner.insert_many(&input, many); + for (i, &key) in keys.iter().enumerate() { + let value = values[i]; + let many = value.round() as i64; + if many <= 0 { + continue; } + let input = DataInput::String(key.to_owned()); + inner.insert_many(&input, many); + } - // Convert sketchlib storage to legacy matrix wire format. - let storage: &Vector2D = inner.as_storage(); - let rows = storage.rows(); - let cols = storage.cols(); - let mut sketch = vec![vec![0.0; cols]; rows]; + let storage: &Vector2D = inner.as_storage(); + let rows = storage.rows(); + let cols = storage.cols(); + let mut sketch = vec![vec![0.0; cols]; rows]; - for r in 0..rows { - for c in 0..cols { - if let Some(v) = storage.get(r, c) { - sketch[r][c] = *v as f64; - } + for r in 0..rows { + for c in 0..cols { + if let Some(v) = storage.get(r, c) { + sketch[r][c] = *v as f64; } } - - let countminsketch = CountMinSketch { - sketch, - row_num: rows, - col_num: cols, - }; - - let mut buf = Vec::new(); - countminsketch - .serialize(&mut Serializer::new(&mut buf)) - .ok()?; - Some(buf) - } else { - // Legacy twox-hash backed implementation (unchanged). - let mut countminsketch = CountMinSketch::new(); - - // Iterate through the keys and values and update the sketch for each entry - for (i, &key) in keys.iter().enumerate() { - countminsketch.update(key, values[i]); - } - - let mut buf = Vec::new(); - countminsketch - .serialize(&mut Serializer::new(&mut buf)) - .ok()?; - Some(buf) } + + let countminsketch = CountMinSketch { + sketch, + row_num: rows, + col_num: cols, + }; + + let mut buf = Vec::new(); + countminsketch + .serialize(&mut Serializer::new(&mut buf)) + .ok()?; + Some(buf) } diff --git a/asap-summary-ingest/templates/udfs/countminsketchwithheap_topk.rs.j2 b/asap-summary-ingest/templates/udfs/countminsketchwithheap_topk.rs.j2 index 1518fe0f..694d1c10 100644 --- a/asap-summary-ingest/templates/udfs/countminsketchwithheap_topk.rs.j2 +++ b/asap-summary-ingest/templates/udfs/countminsketchwithheap_topk.rs.j2 @@ -2,7 +2,6 @@ [dependencies] rmp-serde = "1.1" serde = { version = "1.0", features = ["derive"] } -twox-hash = "2.1.0" asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" } */ @@ -12,27 +11,13 @@ use std::collections::BinaryHeap; use arroyo_udf_plugin::udf; use rmp_serde::Serializer; use serde::{Deserialize, Serialize}; -use twox_hash::XxHash32; use asap_sketchlib::{CountMin as SketchlibCountMin, RegularPath, DataInput, Vector2D}; -// Count-Min Sketch with Heap parameters const DEPTH: usize = {{ depth }}; // Number of hash functions const WIDTH: usize = {{ width }}; // Number of buckets per hash function const HEAP_SIZE: usize = {{ heapsize }}; // Maximum number of top-k items to track -// Implementation mode for Count-Min Sketch with Heap. Set at compile time; no env vars. -enum ImplMode { - Legacy, - Sketchlib, -} - -const IMPL_MODE: ImplMode = ImplMode::Sketchlib; - -fn use_sketchlib_for_cmwh() -> bool { - matches!(IMPL_MODE, ImplMode::Sketchlib) -} - type SketchlibCms = SketchlibCountMin, RegularPath>; #[derive(Serialize, Deserialize, Clone)] @@ -42,43 +27,6 @@ struct CountMinSketch { col_num: usize, } -impl CountMinSketch { - fn new() -> Self { - CountMinSketch { - sketch: vec![vec![0.0; WIDTH]; DEPTH], - row_num: DEPTH, - col_num: WIDTH, - } - } - - // Update the sketch with a key-value pair - fn update(&mut self, key: &str, value: f64) { - for i in 0..self.row_num { - // already UTF-8 - let hash = XxHash32::oneshot(i as u32, key.as_bytes()); - let bucket = (hash as usize) % self.col_num; - self.sketch[i][bucket] += value; - } - } - - // Update the sketch and return the estimated frequency in one pass - fn update_with_query(&mut self, key: &str, value: f64) -> f64 { - let mut min_estimate = f64::MAX; - for i in 0..self.row_num { - // already UTF-8 - let hash = XxHash32::oneshot(i as u32, key.as_bytes()); - let bucket = (hash as usize) % self.col_num; - self.sketch[i][bucket] += value; - // Track minimum while updating - let estimate = self.sketch[i][bucket]; - if estimate < min_estimate { - min_estimate = estimate; - } - } - min_estimate - } -} - // HeapItem: equality and ordering based on value only #[derive(Serialize, Deserialize, Clone)] struct HeapItem { @@ -112,11 +60,8 @@ impl PartialOrd for HeapItem { } struct CountMinSketchWithHeap { - // Legacy wire-format matrix representation. - sketch: CountMinSketch, - // Optional asap_sketchlib Count-Min used when ARROYO_SKETCH_CMWH_IMPL selects sketchlib mode. - sketchlib: Option, - topk_heap: BinaryHeap, // Maintain as heap during processing + sketchlib: SketchlibCms, + topk_heap: BinaryHeap, heap_size: usize, } @@ -131,14 +76,8 @@ struct CountMinSketchWithHeapSerialized { impl CountMinSketchWithHeap { fn new() -> Self { - let use_sketchlib = use_sketchlib_for_cmwh(); CountMinSketchWithHeap { - sketch: CountMinSketch::new(), - sketchlib: if use_sketchlib { - Some(SketchlibCms::with_dimensions(DEPTH, WIDTH)) - } else { - None - }, + sketchlib: SketchlibCms::with_dimensions(DEPTH, WIDTH), topk_heap: BinaryHeap::new(), heap_size: HEAP_SIZE, } @@ -146,25 +85,13 @@ impl CountMinSketchWithHeap { // Update the sketch and maintain the top-k heap fn update_with_topk(&mut self, key: &str, value: f64) { - // Compute estimated frequency using either legacy or sketchlib implementation. - let estimated_freq = if use_sketchlib_for_cmwh() { - let inner = self - .sketchlib - .as_mut() - .expect("sketchlib mode enabled but sketchlib state is missing"); - - // Values arrive as f64; Count-Min counters are integers. - let many = value.round() as i64; - if many <= 0 { - return; - } - let input = DataInput::String(key.to_owned()); - inner.insert_many(&input, many); - inner.estimate(&input) as f64 - } else { - // Legacy Count-Min update + query in one pass. - self.sketch.update_with_query(key, value) - }; + let many = value.round() as i64; + if many <= 0 { + return; + } + let input = DataInput::String(key.to_owned()); + self.sketchlib.insert_many(&input, many); + let estimated_freq = self.sketchlib.estimate(&input) as f64; // Check if the key already exists in the heap // TODO: This takes O(k) time, can we do better? @@ -172,9 +99,9 @@ impl CountMinSketchWithHeap { // Or can we optimize this with a HashMap? let existing_item = self.topk_heap.iter().find(|item| item.key == key).cloned(); - if let Some(existing) = existing_item { + if let Some(_existing) = existing_item { // Remove the old entry and add updated one - self.topk_heap.retain(|item| item.key != key); // retain others = remove this one + self.topk_heap.retain(|item| item.key != key); self.topk_heap.push(HeapItem { key: key.to_string(), value: estimated_freq, @@ -182,17 +109,15 @@ impl CountMinSketchWithHeap { } else { // New key: add to heap if self.topk_heap.len() < self.heap_size { - // Heap not full, just add self.topk_heap.push(HeapItem { key: key.to_string(), value: estimated_freq, }); } else { // Heap is full, check if this item should replace the minimum - // Peeking is cheap. No worries. if let Some(min_item) = self.topk_heap.peek() { if estimated_freq > min_item.value { - self.topk_heap.pop(); // Remove minimum + self.topk_heap.pop(); self.topk_heap.push(HeapItem { key: key.to_string(), value: estimated_freq, @@ -204,32 +129,26 @@ impl CountMinSketchWithHeap { } // Convert to serializable format - fn to_serializable(mut self) -> CountMinSketchWithHeapSerialized { - // In sketchlib mode, derive the matrix from the inner Count-Min sketch so that - // the wire format matches QueryEngineRust expectations. - if let Some(inner) = &self.sketchlib { - let storage: &Vector2D = inner.as_storage(); - let rows = storage.rows(); - let cols = storage.cols(); - let mut sketch = vec![vec![0.0; cols]; rows]; - - for r in 0..rows { - for c in 0..cols { - if let Some(v) = storage.get(r, c) { - sketch[r][c] = *v as f64; - } + fn to_serializable(self) -> CountMinSketchWithHeapSerialized { + let storage: &Vector2D = self.sketchlib.as_storage(); + let rows = storage.rows(); + let cols = storage.cols(); + let mut sketch_matrix = vec![vec![0.0; cols]; rows]; + + for r in 0..rows { + for c in 0..cols { + if let Some(v) = storage.get(r, c) { + sketch_matrix[r][c] = *v as f64; } } - - self.sketch = CountMinSketch { - sketch, - row_num: rows, - col_num: cols, - }; } CountMinSketchWithHeapSerialized { - sketch: self.sketch, + sketch: CountMinSketch { + sketch: sketch_matrix, + row_num: rows, + col_num: cols, + }, topk_heap: self.topk_heap.into_iter().collect(), heap_size: self.heap_size, } @@ -238,20 +157,16 @@ impl CountMinSketchWithHeap { #[udf] fn countminsketchwithheap_topk(keys: Vec<&str>, values: Vec) -> Option> { - // Check that keys and values have equal length if keys.len() != values.len() { return None; } - // Create a new Count-Min Sketch with Heap let mut cms_with_heap = CountMinSketchWithHeap::new(); - // Iterate through the keys and values and update the sketch and heap for (i, &key) in keys.iter().enumerate() { cms_with_heap.update_with_topk(key, values[i]); } - // Convert to serializable format (heap to vec conversion happens only once here) let serializable = cms_with_heap.to_serializable(); let mut buf = Vec::new(); diff --git a/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 b/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 index d6d26d18..40f1933a 100644 --- a/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 +++ b/asap-summary-ingest/templates/udfs/datasketcheskll_.rs.j2 @@ -1,6 +1,5 @@ /* [dependencies] -dsrs = { git = "https://github.com/ProjectASAP/datasketches-rs", rev = "d748ec75c80fff21f7b24897244dd1c895df2e9a" } asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" } arroyo-udf-plugin = "0.1" rmp-serde = "1.1" @@ -8,24 +7,11 @@ serde = { version = "1.0", features = ["derive"] } */ use arroyo_udf_plugin::udf; -use dsrs::KllDoubleSketch; use serde::{Deserialize, Serialize}; use asap_sketchlib::KLL; const DEFAULT_K: u16 = {{ k }}; -enum ImplMode { - Legacy, - Sketchlib, -} - -{% set _impl_mode = impl_mode | default("Sketchlib") %} -const IMPL_MODE: ImplMode = ImplMode::{% if _impl_mode == "Legacy" or _impl_mode == "Sketchlib" %}{{ _impl_mode }}{% else %}Sketchlib{% endif %}; - -fn use_sketchlib_for_kll() -> bool { - matches!(IMPL_MODE, ImplMode::Sketchlib) -} - #[derive(Serialize, Deserialize)] struct KllSketchData { k: u16, @@ -34,19 +20,11 @@ struct KllSketchData { #[udf] fn datasketcheskll_(values: Vec) -> Option> { - let sketch_bytes = if use_sketchlib_for_kll() { - let mut sketch = KLL::init_kll(DEFAULT_K as i32); - for &value in &values { - sketch.update(&value); - } - sketch.serialize_to_bytes().ok()? - } else { - let mut sketch = KllDoubleSketch::with_k(DEFAULT_K); - for &value in &values { - sketch.update(value); - } - sketch.serialize().as_ref().to_vec() - }; + let mut sketch = KLL::init_kll(DEFAULT_K as i32); + for &value in &values { + sketch.update(&value); + } + let sketch_bytes = sketch.serialize_to_bytes().ok()?; let serialized = KllSketchData { k: DEFAULT_K, sketch_bytes, diff --git a/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 b/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 index 1edfbd16..0888804c 100644 --- a/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 +++ b/asap-summary-ingest/templates/udfs/hydrakll_.rs.j2 @@ -1,6 +1,5 @@ /* [dependencies] -dsrs = { git = "https://github.com/ProjectASAP/datasketches-rs", rev = "d748ec75c80fff21f7b24897244dd1c895df2e9a" } asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" } arroyo-udf-plugin = "0.1" rmp-serde = "1.1" @@ -9,24 +8,11 @@ xxhash-rust = { version = "0.8", features = ["xxh32"] } */ use arroyo_udf_plugin::udf; -use dsrs::KllDoubleSketch; use rmp_serde::Serializer; use serde::{Deserialize, Serialize}; use asap_sketchlib::KLL; use xxhash_rust::xxh32::xxh32; -enum ImplMode { - Legacy, - Sketchlib, -} - -{% set _impl_mode = impl_mode | default("Sketchlib") %} -const IMPL_MODE: ImplMode = ImplMode::{% if _impl_mode == "Legacy" or _impl_mode == "Sketchlib" %}{{ _impl_mode }}{% else %}Sketchlib{% endif %}; - -fn use_sketchlib_for_kll() -> bool { - matches!(IMPL_MODE, ImplMode::Sketchlib) -} - const ROW_NUM: usize = {{ row_num }}; const COL_NUM: usize = {{ col_num }}; const DEFAULT_K: u16 = {{ k }}; @@ -46,76 +32,40 @@ struct HydraKllSketchData { #[udf] fn hydrakll_(keys: Vec<&str>, values: Vec) -> Option> { - let sketch_data: Vec> = if use_sketchlib_for_kll() { - let mut sketches: Vec> = (0..ROW_NUM) - .map(|_| { - (0..COL_NUM) - .map(|_| KLL::init_kll(DEFAULT_K as i32)) - .collect() - }) - .collect(); + let mut sketches: Vec> = (0..ROW_NUM) + .map(|_| { + (0..COL_NUM) + .map(|_| KLL::init_kll(DEFAULT_K as i32)) + .collect() + }) + .collect(); - for (i, &key) in keys.iter().enumerate() { - if i >= values.len() { - break; - } - let key_bytes = key.as_bytes(); - for row in 0..ROW_NUM { - let hash_value = xxh32(key_bytes, row as u32); - let col_index = (hash_value as usize) % COL_NUM; - sketches[row][col_index].update(&values[i]); - } + for (i, &key) in keys.iter().enumerate() { + if i >= values.len() { + break; } - - sketches - .iter() - .map(|row| { - row.iter() - .map(|sketch| { - let sketch_bytes = sketch.serialize_to_bytes().ok()?; - Some(KllSketchData { - k: DEFAULT_K, - sketch_bytes, - }) - }) - .collect::>>() - }) - .collect::>>()? - } else { - let mut sketches: Vec> = (0..ROW_NUM) - .map(|_| { - (0..COL_NUM) - .map(|_| KllDoubleSketch::with_k(DEFAULT_K)) - .collect() - }) - .collect(); - - for (i, &key) in keys.iter().enumerate() { - if i >= values.len() { - break; - } - let key_bytes = key.as_bytes(); - for row in 0..ROW_NUM { - let hash_value = xxh32(key_bytes, row as u32); - let col_index = (hash_value as usize) % COL_NUM; - sketches[row][col_index].update(values[i]); - } + let key_bytes = key.as_bytes(); + for row in 0..ROW_NUM { + let hash_value = xxh32(key_bytes, row as u32); + let col_index = (hash_value as usize) % COL_NUM; + sketches[row][col_index].update(&values[i]); } + } - sketches - .iter() - .map(|row| { - row.iter() - .map(|sketch| { - Some(KllSketchData { - k: DEFAULT_K, - sketch_bytes: sketch.serialize().as_ref().to_vec(), - }) + let sketch_data: Vec> = sketches + .iter() + .map(|row| { + row.iter() + .map(|sketch| { + let sketch_bytes = sketch.serialize_to_bytes().ok()?; + Some(KllSketchData { + k: DEFAULT_K, + sketch_bytes, }) - .collect::>>() - }) - .collect::>>()? - }; + }) + .collect::>>() + }) + .collect::>>()?; let hydra_data = HydraKllSketchData { row_num: ROW_NUM, diff --git a/asap-summary-ingest/tests/test_integration.py b/asap-summary-ingest/tests/test_integration.py index c4e5ee04..834133b4 100644 --- a/asap-summary-ingest/tests/test_integration.py +++ b/asap-summary-ingest/tests/test_integration.py @@ -442,34 +442,13 @@ def udf_template_dir(self): "udfs", ) - def test_template_variables_ignore_local_jinja_assignments(self): - """Test that local Jinja variables are not treated as required params.""" - template_source = """ -{% set _impl_mode = impl_mode | default("Sketchlib") %} -{{ _impl_mode }} {{ k }} -""" - template_vars = jinja_utils.get_template_variables(template_source) - - assert template_vars == {"impl_mode", "k"} - - def test_datasketches_kll_template_renders_legacy_mode(self, udf_template_dir): - """Test rendering the single KLL UDF with the legacy backend.""" - template = jinja_utils.load_template(udf_template_dir, "datasketcheskll_.rs.j2") - rendered = template.render(k=200, impl_mode="Legacy") - - assert "const IMPL_MODE: ImplMode = ImplMode::Legacy;" in rendered - assert "KllDoubleSketch::with_k(DEFAULT_K)" in rendered - assert "asap_sketchlib" in rendered - assert "{{" not in rendered - - def test_hydra_kll_template_renders_sketchlib_mode(self, udf_template_dir): - """Test rendering the Hydra KLL UDF with the sketchlib backend.""" + def test_hydra_kll_template_renders(self, udf_template_dir): + """Test rendering the Hydra KLL UDF template.""" template = jinja_utils.load_template(udf_template_dir, "hydrakll_.rs.j2") - rendered = template.render(row_num=3, col_num=128, k=200, impl_mode="Sketchlib") + rendered = template.render(row_num=3, col_num=128, k=200) - assert "const IMPL_MODE: ImplMode = ImplMode::Sketchlib;" in rendered assert "KLL::init_kll(DEFAULT_K as i32)" in rendered - assert "KllDoubleSketch::with_k(DEFAULT_K)" in rendered + assert "asap_sketchlib" in rendered assert "{{" not in rendered diff --git a/asap-tools/experiments/experiment_utils/services/arroyo.py b/asap-tools/experiments/experiment_utils/services/arroyo.py index 80f9b649..e329a80b 100644 --- a/asap-tools/experiments/experiment_utils/services/arroyo.py +++ b/asap-tools/experiments/experiment_utils/services/arroyo.py @@ -105,9 +105,6 @@ def run_arroyosketch( use_kafka_ingest: bool = False, enable_optimized_remote_write: bool = False, avoid_long_ssh: bool = False, - sketch_cms_impl: str = "sketchlib", - sketch_kll_impl: str = "sketchlib", - sketch_cmwh_impl: str = "sketchlib", ) -> str: """ Run ArroyoSketch pipeline. @@ -125,9 +122,6 @@ def run_arroyosketch( parallelism: Pipeline parallelism enable_optimized_remote_write: If True, use optimized Prometheus remote_write source (10-20x faster) avoid_long_ssh: If True, run command in background to avoid long SSH connections - sketch_cms_impl: Count-Min Sketch backend (legacy|sketchlib). Must match QueryEngine. - sketch_kll_impl: KLL Sketch backend (legacy|sketchlib). Must match QueryEngine. - sketch_cmwh_impl: Count-Min-With-Heap backend (legacy|sketchlib). Must match QueryEngine. Returns: Pipeline ID @@ -140,7 +134,7 @@ def run_arroyosketch( ) if use_kafka_ingest: - cmd = "python run_arroyosketch.py --source_type kafka --kafka_input_format {} --output_format {} --pipeline_name {} --config_file_path {}/streaming_config.yaml --input_kafka_topic {} --output_kafka_topic {} --output_dir {} --sketch_cms_impl {} --sketch_kll_impl {} --sketch_cmwh_impl {}".format( + cmd = "python run_arroyosketch.py --source_type kafka --kafka_input_format {} --output_format {} --pipeline_name {} --config_file_path {}/streaming_config.yaml --input_kafka_topic {} --output_kafka_topic {} --output_dir {}".format( flink_input_format, flink_output_format, experiment_name, @@ -148,9 +142,6 @@ def run_arroyosketch( constants.FLINK_INPUT_TOPIC, constants.FLINK_OUTPUT_TOPIC, arroyosketch_output_dir, - sketch_cms_impl, - sketch_kll_impl, - sketch_cmwh_impl, ) else: # Build base command for Prometheus remote write @@ -168,12 +159,6 @@ def run_arroyosketch( # Add optimized source flag if enabled if enable_optimized_remote_write: cmd += " --prometheus_remote_write_source optimized" - # Sketch impl mode - must match QueryEngine - cmd += ( - " --sketch_cms_impl {} --sketch_kll_impl {} --sketch_cmwh_impl {}".format( - sketch_cms_impl, sketch_kll_impl, sketch_cmwh_impl - ) - ) cmd_dir = os.path.join( constants.CLOUDLAB_HOME_DIR, "code", "asap-summary-ingest" )