diff --git a/Cargo.lock b/Cargo.lock index e6f6925..540764e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,7 +412,7 @@ dependencies = [ [[package]] name = "asap_sketchlib" version = "0.1.0" -source = "git+https://github.com/ProjectASAP/asap_sketchlib#81c3436dde44cc587c098d42bf42db77acdb4fa5" +source = "git+https://github.com/ProjectASAP/asap_sketchlib?branch=refactor%2Fmodule-restructure#aea7d055f1906c6ee05f750989b335ed47c98e3f" dependencies = [ "bytes", "prost", @@ -1435,7 +1435,7 @@ dependencies = [ "itertools 0.13.0", "log", "paste", - "petgraph 0.6.5", + "petgraph", ] [[package]] @@ -1618,7 +1618,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -1675,12 +1675,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" -[[package]] -name = "fixedbitset" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" - [[package]] name = "flatbuffers" version = "24.12.23" @@ -2373,7 +2367,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi 0.5.2", "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2400,15 +2394,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "1.0.18" @@ -2427,7 +2412,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -3152,17 +3137,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ - "fixedbitset 0.4.2", - "indexmap 2.14.0", -] - -[[package]] -name = "petgraph" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" -dependencies = [ - "fixedbitset 0.5.7", + "fixedbitset", "indexmap 2.14.0", ] @@ -3424,11 +3399,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.14.0", + "itertools 0.10.5", "log", "multimap", "once_cell", - "petgraph 0.7.1", + "petgraph", "prettyplease", "prost", "prost-types", @@ -3444,7 +3419,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.117", @@ -3887,7 +3862,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -4389,7 +4364,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -5105,7 +5080,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index d2886c3..484f5d2 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -59,7 +59,7 @@ figment = { version = "0.10", features = ["yaml"] } arc-swap = "1" csv = "1" elastic_dsl_utilities.workspace = true -asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" } +asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib", branch = "refactor/module-restructure" } [[bin]] name = "precompute_engine" diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 13c86b0..30fbc7e 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -464,7 +464,11 @@ impl AccumulatorUpdater for CmsAccumulatorUpdater { } fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) { - self.acc.inner.update(&key.to_semicolon_str(), value); + crate::precompute_operators::sketchlib_runtime::cms_update( + &mut self.acc.inner, + &key.to_semicolon_str(), + value, + ); } impl_clone_accumulator_methods!(acc); @@ -855,6 +859,6 @@ mod tests { .as_any() .downcast_ref::() .expect("should be KLL"); - assert_eq!(kll.inner.k, 50, "k should be 50 from capital-K param"); + assert_eq!(kll.inner.k(), 50, "k should be 50 from capital-K param"); } } diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 02c330b..0578197 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -776,7 +776,7 @@ mod tests { use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator; use crate::precompute_operators::multiple_sum_accumulator::MultipleSumAccumulator; use crate::precompute_operators::sum_accumulator::SumAccumulator; - use asap_sketchlib::sketches::kll::KllSketch; + use asap_sketchlib::KllSketch; use asap_types::enums::{AggregationType, WindowType}; fn make_agg_config( @@ -1511,7 +1511,7 @@ mod tests { handcrafted_output.end_timestamp, arroyo_output.end_timestamp ); - assert_eq!(handcrafted_acc.inner.k, arroyo_acc.inner.k); + assert_eq!(handcrafted_acc.inner.k(), arroyo_acc.inner.k()); assert_eq!(handcrafted_acc.inner.count(), arroyo_acc.inner.count()); for quantile in [0.0, 0.5, 1.0] { diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs index 6840cb5..98de3d1 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_accumulator.rs @@ -2,34 +2,37 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::countminsketch::CountMinSketch; +use crate::precompute_operators::sketchlib_runtime::{ + RuntimeCountMin, cms_estimate, cms_from_matrix, cms_from_msgpack, cms_matrix, + cms_merge_refs, cms_new, cms_to_msgpack, cms_update, +}; use serde_json::Value; use std::collections::HashMap; use promql_utilities::query_logics::enums::Statistic; -/// Count-Min Sketch accumulator — wraps asap_sketchlib::sketches::CountMinSketch. -/// Core struct, update/merge/serde logic live in `asap_sketchlib::sketches`. -/// This file retains QE-specific trait impls, legacy deserializers, and JSON output. +/// Count-Min Sketch accumulator — holds `sketches::CountMin<.., FastPath>` +/// directly. Wire format (Go-compatible MessagePack envelope) and +/// matrix-shape conversions live in `sketchlib_runtime`. #[derive(Debug, Clone)] pub struct CountMinSketchAccumulator { - pub inner: CountMinSketch, + pub inner: RuntimeCountMin, } impl CountMinSketchAccumulator { pub fn new(row_num: usize, col_num: usize) -> Self { Self { - inner: CountMinSketch::new(row_num, col_num), + inner: cms_new(row_num, col_num), } } // Marked as _update and kept private; only called internally. fn _update(&mut self, key: &KeyByLabelValues, value: f64) { - self.inner.update(&key.to_semicolon_str(), value); + cms_update(&mut self.inner, &key.to_semicolon_str(), value); } pub fn query_key(&self, key: &KeyByLabelValues) -> f64 { - self.inner.estimate(&key.to_semicolon_str()) + cms_estimate(&self.inner, &key.to_semicolon_str()) } pub fn deserialize_from_json(data: &Value) -> Result> { @@ -56,7 +59,7 @@ impl CountMinSketchAccumulator { } Ok(Self { - inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num), + inner: cms_from_matrix(sketch, row_num, col_num), }) } @@ -64,8 +67,7 @@ impl CountMinSketchAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: CountMinSketch::deserialize_msgpack(buffer) - .map_err(|e| -> Box { e.to_string().into() })?, + inner: cms_from_msgpack(buffer)?, }) } @@ -108,7 +110,7 @@ impl CountMinSketchAccumulator { } Ok(Self { - inner: CountMinSketch::from_legacy_matrix(sketch, row_num, col_num), + inner: cms_from_matrix(sketch, row_num, col_num), }) } @@ -136,20 +138,9 @@ impl CountMinSketchAccumulator { cms_accumulators.push(cms_acc); } - // Check dimensions are consistent - let rows = cms_accumulators[0].inner.rows(); - let cols = cms_accumulators[0].inner.cols(); - for acc in &cms_accumulators { - if acc.inner.rows() != rows || acc.inner.cols() != cols { - return Err( - "Cannot merge CountMinSketch accumulators with different dimensions".into(), - ); - } - } - - let inner_refs: Vec<&CountMinSketch> = + let inner_refs: Vec<&RuntimeCountMin> = cms_accumulators.iter().map(|acc| &acc.inner).collect(); - let merged_inner = CountMinSketch::merge_refs(&inner_refs)?; + let merged_inner = cms_merge_refs(&inner_refs)?; Ok(Self { inner: merged_inner, }) @@ -161,12 +152,12 @@ impl SerializableToSink for CountMinSketchAccumulator { serde_json::json!({ "row_num": self.inner.rows(), "col_num": self.inner.cols(), - "sketch": self.inner.sketch() + "sketch": cms_matrix(&self.inner) }) } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + cms_to_msgpack(&self.inner) } } @@ -200,7 +191,7 @@ impl AggregateCore for CountMinSketchAccumulator { .downcast_ref::() .ok_or("Failed to downcast to CountMinSketchAccumulator")?; - let merged_inner = CountMinSketch::merge_refs(&[&self.inner, &other_cms.inner])?; + let merged_inner = cms_merge_refs(&[&self.inner, &other_cms.inner])?; Ok(Box::new(Self { inner: merged_inner, })) @@ -250,12 +241,12 @@ impl MergeableAccumulator for CountMinSketchAccumulat if accumulators.is_empty() { return Err("No accumulators to merge".into()); } - let mut iter = accumulators.into_iter(); - let mut merged = iter.next().unwrap(); - for acc in iter { - merged.inner.merge(&acc.inner)?; - } - Ok(merged) + let inner_refs: Vec<&RuntimeCountMin> = + accumulators.iter().map(|acc| &acc.inner).collect(); + let merged_inner = cms_merge_refs(&inner_refs)?; + Ok(Self { + inner: merged_inner, + }) } } @@ -268,7 +259,7 @@ mod tests { let cms = CountMinSketchAccumulator::new(4, 1000); assert_eq!(cms.inner.rows(), 4); assert_eq!(cms.inner.cols(), 1000); - let sketch = cms.inner.sketch(); + let sketch = cms_matrix(&cms.inner); assert_eq!(sketch.len(), 4); assert_eq!(sketch[0].len(), 1000); @@ -300,16 +291,15 @@ mod tests { #[test] fn test_count_min_sketch_merge() { - // Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends). let cms1 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3, ), }; let cms2 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3, @@ -318,7 +308,7 @@ mod tests { let merged = CountMinSketchAccumulator::merge_accumulators(vec![cms1, cms2]).unwrap(); - let merged_sketch = merged.inner.sketch(); + let merged_sketch = cms_matrix(&merged.inner); assert_eq!(merged_sketch[0][0], 8.0); assert_eq!(merged_sketch[0][1], 7.0); assert_eq!(merged_sketch[1][2], 10.0); @@ -335,7 +325,7 @@ mod tests { #[test] fn test_count_min_sketch_serialization() { let cms = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![0.0, 42.0, 0.0], vec![0.0, 0.0, 100.0]], 2, 3, @@ -348,7 +338,7 @@ mod tests { assert_eq!(deserialized.inner.rows(), 2); assert_eq!(deserialized.inner.cols(), 3); - let deser_sketch = deserialized.inner.sketch(); + let deser_sketch = cms_matrix(&deserialized.inner); assert_eq!(deser_sketch[0][1], 42.0); assert_eq!(deser_sketch[1][2], 100.0); } @@ -378,10 +368,6 @@ mod tests { #[test] fn test_update_and_query_use_same_key_encoding() { - // Regression test: _update and query_key must hash the same key string. - // Previously _update went through serialize_to_json (which returns a JSON - // array, so as_object() is always None) and always stored under key "". - // query_key correctly used key.labels.join(";"), so they never matched. let mut cms = CountMinSketchAccumulator::new(4, 1000); let key = KeyByLabelValues::new_with_labels(vec!["web".to_string(), "prod".to_string()]); cms._update(&key, 5.0); @@ -391,11 +377,8 @@ mod tests { "_update and query_key used different key encodings: got {result}" ); - // Also verify a different key does not interfere. let other_key = KeyByLabelValues::new_with_labels(vec!["api".to_string()]); - // other_key was never updated; its estimate should be lower than key's. let other_result = cms.query_key(&other_key); - // In a sketch this large there should be no collision, so other_result == 0. assert_eq!( other_result, 0.0, "unrelated key returned non-zero: {other_result}" @@ -418,23 +401,22 @@ mod tests { #[test] fn test_count_min_sketch_merge_multiple() { - // Build controlled state via from_legacy_matrix (works for both Legacy and Sketchlib backends). let cms1 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![5.0, 0.0, 0.0], vec![0.0, 0.0, 10.0]], 2, 3, ), }; let cms2 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![3.0, 7.0, 0.0], vec![0.0, 0.0, 0.0]], 2, 3, ), }; let cms3 = CountMinSketchAccumulator { - inner: CountMinSketch::from_legacy_matrix( + inner: cms_from_matrix( vec![vec![2.0, 0.0, 0.0], vec![0.0, 0.0, 5.0]], 2, 3, @@ -446,7 +428,7 @@ mod tests { let merged = CountMinSketchAccumulator::merge_multiple(&boxed_accs).unwrap(); - let merged_sketch = merged.inner.sketch(); + let merged_sketch = cms_matrix(&merged.inner); assert_eq!(merged_sketch[0][0], 10.0); assert_eq!(merged_sketch[0][1], 7.0); assert_eq!(merged_sketch[1][2], 15.0); diff --git a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs index c96cde9..119bbc3 100644 --- a/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/count_min_sketch_with_heap_accumulator.rs @@ -2,7 +2,8 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::countminsketch_topk::{CmsHeapItem, CountMinSketchWithHeap}; +use asap_sketchlib::message_pack_format::MessagePackCodec; +use asap_sketchlib::{CmsHeapItem, CountMinSketchWithHeap}; use serde_json::Value; use std::collections::HashMap; @@ -17,7 +18,7 @@ pub struct CountMinSketchWithHeapAccumulator { } // Re-export HeapItem so existing code using CountMinSketchWithHeapAccumulator::HeapItem still works. -pub use asap_sketchlib::sketches::countminsketch_topk::CmsHeapItem as HeapItemReexport; +pub use asap_sketchlib::CmsHeapItem as HeapItemReexport; impl CountMinSketchWithHeapAccumulator { pub fn new(row_num: usize, col_num: usize, heap_size: usize) -> Self { @@ -85,7 +86,7 @@ impl CountMinSketchWithHeapAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: CountMinSketchWithHeap::deserialize_msgpack(buffer) + inner: CountMinSketchWithHeap::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -131,7 +132,7 @@ impl SerializableToSink for CountMinSketchWithHeapAccumulator { } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs index 33e085d..6bdb1d5 100644 --- a/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs @@ -2,8 +2,11 @@ use crate::data_model::{ AggregateCore, AggregationType, MergeableAccumulator, SerializableToSink, SingleSubpopulationAggregate, }; -use asap_sketchlib::sketches::kll::KllSketch; -use base64::{engine::general_purpose, Engine as _}; +use crate::precompute_operators::sketchlib_runtime::{ + RuntimeKll, kll_from_msgpack, kll_merge_refs, kll_new, kll_quantile, kll_sketch_bytes, + kll_to_msgpack, kll_update, +}; +use base64::{Engine as _, engine::general_purpose}; use serde_json::Value; use std::collections::HashMap; #[cfg(feature = "extra_debugging")] @@ -12,26 +15,26 @@ use tracing::debug; use promql_utilities::query_logics::enums::Statistic; -/// KLL sketch accumulator — wraps asap_sketchlib::sketches::KllSketch. -/// Core struct, update/merge/serde logic live in `asap_sketchlib::sketches`. -/// This file retains QE-specific trait impls and JSON output. +/// KLL sketch accumulator — holds `sketches::KLL` directly. Wire +/// format (`KllSketchData { k, sketch_bytes }`) and base64-JSON output +/// live in `sketchlib_runtime`. pub struct DatasketchesKLLAccumulator { - pub inner: KllSketch, + pub inner: RuntimeKll, } impl DatasketchesKLLAccumulator { pub fn new(k: u16) -> Self { Self { - inner: KllSketch::new(k), + inner: kll_new(k), } } pub fn update(&mut self, value: f64) { - self.inner.update(value); + kll_update(&mut self.inner, value); } pub fn get_quantile(&self, quantile: f64) -> f64 { - self.inner.quantile(quantile) + kll_quantile(&self.inner, quantile) } pub fn deserialize_from_bytes_arroyo( @@ -42,8 +45,7 @@ impl DatasketchesKLLAccumulator { buffer.len() ); Ok(Self { - inner: KllSketch::deserialize_msgpack(buffer) - .map_err(|e| -> Box { e.to_string().into() })?, + inner: kll_from_msgpack(buffer)?, }) } @@ -71,15 +73,14 @@ impl DatasketchesKLLAccumulator { kll_accumulators.push(kll_acc); } - let inner_refs: Vec<&KllSketch> = kll_accumulators.iter().map(|acc| &acc.inner).collect(); - let merged_inner = KllSketch::merge_refs(&inner_refs)?; + let inner_refs: Vec<&RuntimeKll> = kll_accumulators.iter().map(|acc| &acc.inner).collect(); + let merged_inner = kll_merge_refs(&inner_refs)?; Ok(Self { inner: merged_inner, }) } } -// Manual trait implementations since the C++ library doesn't provide them impl Clone for DatasketchesKLLAccumulator { fn clone(&self) -> Self { Self { @@ -91,29 +92,27 @@ impl Clone for DatasketchesKLLAccumulator { impl std::fmt::Debug for DatasketchesKLLAccumulator { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DatasketchesKLLAccumulator") - .field("k", &self.inner.k) + .field("k", &self.inner.k()) .field("sketch_n", &self.inner.count()) .finish() } } -// TODO: verify this -// Thread safety: The C++ library is not thread-safe by default, but since we're using it -// in a single-threaded context per accumulator instance and only sharing read-only operations, -// this should be safe. +// Thread safety: each accumulator is used in single-threaded contexts; +// only read-only methods are shared across threads. unsafe impl Send for DatasketchesKLLAccumulator {} unsafe impl Sync for DatasketchesKLLAccumulator {} impl SerializableToSink for DatasketchesKLLAccumulator { fn serialize_to_json(&self) -> Value { // Mirror Python implementation: {"sketch": base64_encoded_string} - let sketch_bytes = self.inner.sketch_bytes(); + let sketch_bytes = kll_sketch_bytes(&self.inner); let sketch_b64 = general_purpose::STANDARD.encode(&sketch_bytes); serde_json::json!({ "sketch": sketch_b64 }) } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + kll_to_msgpack(&self.inner) } } @@ -139,7 +138,7 @@ impl AggregateCore for DatasketchesKLLAccumulator { #[cfg(feature = "extra_debugging")] debug!( "[PERF] DatasketchesKLLAccumulator::merge_with() started - self.k={}, self.n={}", - self.inner.k, + self.inner.k(), self.inner.count() ); @@ -156,7 +155,7 @@ impl AggregateCore for DatasketchesKLLAccumulator { .downcast_ref::() .ok_or("Failed to downcast to DatasketchesKLLAccumulator")?; - let merged_inner = KllSketch::merge_refs(&[&self.inner, &other_kll.inner])?; + let merged_inner = kll_merge_refs(&[&self.inner, &other_kll.inner])?; let merged = Self { inner: merged_inner, }; @@ -232,12 +231,11 @@ impl MergeableAccumulator for DatasketchesKLLAccumul if accumulators.is_empty() { return Err("No accumulators to merge".into()); } - let mut iter = accumulators.into_iter(); - let mut merged = iter.next().unwrap(); - for acc in iter { - merged.inner.merge(&acc.inner)?; - } - Ok(merged) + let inner_refs: Vec<&RuntimeKll> = accumulators.iter().map(|acc| &acc.inner).collect(); + let merged_inner = kll_merge_refs(&inner_refs)?; + Ok(Self { + inner: merged_inner, + }) } } @@ -249,7 +247,7 @@ mod tests { fn test_datasketches_kll_creation() { let kll = DatasketchesKLLAccumulator::new(200); assert!(kll.inner.count() == 0); - assert_eq!(kll.inner.k, 200); + assert_eq!(kll.inner.k(), 200); } #[test] @@ -269,7 +267,6 @@ mod tests { } assert_eq!(kll.get_quantile(0.0), 1.0); assert_eq!(kll.get_quantile(1.0), 10.0); - // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. let q50 = kll.get_quantile(0.5); assert!((q50 - 6.0).abs() <= 1.0, "expected median ~6, got {q50}"); } @@ -284,7 +281,6 @@ mod tests { let mut query_kwargs = HashMap::new(); query_kwargs.insert("quantile".to_string(), "0.5".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. assert!( (result - 6.0).abs() <= 1.0, "expected median ~6, got {result}" @@ -322,7 +318,7 @@ mod tests { let deserialized = DatasketchesKLLAccumulator::deserialize_from_bytes_arroyo(&bytes).unwrap(); - assert_eq!(deserialized.inner.k, 200); + assert_eq!(deserialized.inner.k(), 200); assert_eq!(deserialized.inner.count(), 5); assert_eq!(deserialized.get_quantile(0.0), 1.0); assert_eq!(deserialized.get_quantile(1.0), 5.0); @@ -352,7 +348,6 @@ mod tests { let mut query_kwargs = HashMap::new(); query_kwargs.insert("quantile".to_string(), "0.5".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - // Sketchlib KLL is approximate; 0.5 quantile of 1..10 may be 5, 6, or 7. assert!( (result - 6.0).abs() <= 1.0, "expected median ~6, got {result}" @@ -360,7 +355,6 @@ mod tests { query_kwargs.insert("quantile".to_string(), "0.9".to_string()); let result = kll.query(Statistic::Quantile, Some(&query_kwargs)).unwrap(); - // Sketchlib KLL is approximate; 0.9 quantile of 1..10 may be 9 or 10. assert!( (9.0..=10.0).contains(&result), "expected 0.9 quantile in [9,10], got {result}" diff --git a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs index f323426..8261988 100644 --- a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs @@ -2,7 +2,8 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::delta_set_aggregator::{deserialize_msgpack, serialize_msgpack}; +use asap_sketchlib::DeltaResult; +use asap_sketchlib::message_pack_format::MessagePackCodec; use serde_json::Value; use std::collections::{HashMap, HashSet}; @@ -153,7 +154,7 @@ impl DeltaSetAggregatorAccumulator { buffer: &[u8], ) -> Result> { // Delegate to sketch-core canonical DeltaResult msgpack format - let delta = deserialize_msgpack(buffer) + let delta = DeltaResult::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?; let mut added = HashSet::new(); @@ -203,7 +204,9 @@ impl SerializableToSink for DeltaSetAggregatorAccumulator { .iter() .map(|key| key.to_semicolon_str()) .collect(); - serialize_msgpack(&added, &removed).unwrap_or_default() + DeltaResult { added, removed } + .to_msgpack() + .unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs index f33012d..2c6d440 100644 --- a/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs @@ -5,7 +5,8 @@ use crate::{ }, KeyByLabelValues, }; -use asap_sketchlib::sketches::hydra_kll::HydraKllSketch; +use asap_sketchlib::HydraKllSketch; +use asap_sketchlib::message_pack_format::MessagePackCodec; use base64::{engine::general_purpose, Engine as _}; use std::collections::HashMap; @@ -38,7 +39,7 @@ impl HydraKllSketchAccumulator { buffer: &[u8], ) -> Result> { Ok(Self { - inner: HydraKllSketch::deserialize_msgpack(buffer) + inner: HydraKllSketch::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?, }) } @@ -51,13 +52,13 @@ impl HydraKllSketchAccumulator { impl SerializableToSink for HydraKllSketchAccumulator { fn serialize_to_json(&self) -> serde_json::Value { // Mirror Python implementation: {"sketch": base64_encoded_string} - let sketch_bytes = self.inner.serialize_msgpack().unwrap_or_default(); + let sketch_bytes = self.inner.to_msgpack().unwrap_or_default(); let sketch_b64 = general_purpose::STANDARD.encode(&sketch_bytes); serde_json::json!({ "sketch": sketch_b64 }) } fn serialize_to_bytes(&self) -> Vec { - self.inner.serialize_msgpack().unwrap_or_default() + self.inner.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/mod.rs b/asap-query-engine/src/precompute_operators/mod.rs index fbbff1c..01d0608 100644 --- a/asap-query-engine/src/precompute_operators/mod.rs +++ b/asap-query-engine/src/precompute_operators/mod.rs @@ -9,6 +9,7 @@ pub mod multiple_increase_accumulator; pub mod multiple_min_max_accumulator; pub mod multiple_sum_accumulator; pub mod set_aggregator_accumulator; +pub mod sketchlib_runtime; pub mod sum_accumulator; pub use count_min_sketch_accumulator::*; diff --git a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs index 45b74d5..83f5078 100644 --- a/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/set_aggregator_accumulator.rs @@ -2,7 +2,8 @@ use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, }; -use asap_sketchlib::sketches::set_aggregator::SetAggregator; +use asap_sketchlib::SetAggregator; +use asap_sketchlib::message_pack_format::MessagePackCodec; use serde_json::Value; use std::collections::{HashMap, HashSet}; @@ -92,7 +93,7 @@ impl SetAggregatorAccumulator { pub fn deserialize_from_bytes_arroyo( buffer: &[u8], ) -> Result> { - let sa = SetAggregator::deserialize_msgpack(buffer) + let sa = SetAggregator::from_msgpack(buffer) .map_err(|e| -> Box { e.to_string().into() })?; let added = sa .values @@ -109,7 +110,7 @@ impl SetAggregatorAccumulator { for key in &self.added { sa.update(&key.to_semicolon_str()); } - sa.serialize_msgpack().unwrap_or_default() + sa.to_msgpack().unwrap_or_default() } } diff --git a/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs b/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs new file mode 100644 index 0000000..25c8f4a --- /dev/null +++ b/asap-query-engine/src/precompute_operators/sketchlib_runtime.rs @@ -0,0 +1,181 @@ +//! Thin runtime adapters over `asap_sketchlib::sketches::*`. +//! +//! ASAPQuery accumulators hold the pure-Rust runtime sketch types +//! directly (`sketches::CountMin, FastPath, DefaultXxHasher>`, +//! `sketches::KLL`) and reach for these helpers to translate the +//! accumulator surface (string keys, `Vec>` matrices, +//! Go-compatible msgpack envelopes) onto those underlying types. +//! +//! Cross-language byte parity for the underlying `sketches::*` paths +//! is locked in by +//! `asap_sketchlib::tests::sketches_go_parity_probe`. + +use asap_sketchlib::message_pack_format::portable::countminsketch::CountMinSketchWire; +use asap_sketchlib::message_pack_format::portable::kll::KllSketchData; +use asap_sketchlib::sketches::countminsketch::CountMin; +use asap_sketchlib::sketches::kll::KLL; +use asap_sketchlib::{DataInput, DefaultXxHasher, FastPath, Vector2D}; + +// ============================================================================= +// CountMinSketch — sketches::CountMin, FastPath, DefaultXxHasher> +// ============================================================================= + +/// Concrete runtime CMS type used by `CountMinSketchAccumulator`. Same +/// dimensions + hasher choice as `asap_sketchlib`'s wire-format +/// `CountMinSketch` facade, so the on-the-wire byte shape is identical. +pub type RuntimeCountMin = CountMin, FastPath, DefaultXxHasher>; + +pub fn cms_new(rows: usize, cols: usize) -> RuntimeCountMin { + CountMin::with_dimensions(rows, cols) +} + +pub fn cms_update(sk: &mut RuntimeCountMin, key: &str, value: f64) { + if value <= 0.0 { + return; + } + sk.insert_many(&DataInput::String(key.to_owned()), value); +} + +pub fn cms_estimate(sk: &RuntimeCountMin, key: &str) -> f64 { + sk.estimate(&DataInput::String(key.to_owned())) +} + +/// Snapshot the storage as `Vec>` (used for JSON output + wire DTO). +pub fn cms_matrix(sk: &RuntimeCountMin) -> Vec> { + let storage = sk.as_storage(); + let rows = storage.rows(); + let cols = storage.cols(); + let mut out = vec![vec![0.0f64; cols]; rows]; + for r in 0..rows { + for c in 0..cols { + if let Some(v) = storage.get(r, c) { + out[r][c] = *v; + } + } + } + out +} + +/// Build a CountMin from an existing matrix (used by JSON / legacy +/// byte-format decoders). +pub fn cms_from_matrix(matrix: Vec>, rows: usize, cols: usize) -> RuntimeCountMin { + let storage = Vector2D::from_fn(rows, cols, |r, c| { + matrix + .get(r) + .and_then(|row| row.get(c)) + .copied() + .unwrap_or(0.0) + }); + CountMin::from_storage(storage) +} + +/// Serialize to the Go-compatible MessagePack envelope. +pub fn cms_to_msgpack(sk: &RuntimeCountMin) -> Vec { + let wire = CountMinSketchWire { + sketch: cms_matrix(sk), + rows: sk.rows(), + cols: sk.cols(), + }; + rmp_serde::to_vec(&wire).unwrap_or_default() +} + +/// Deserialize from the Go-compatible MessagePack envelope. +pub fn cms_from_msgpack(bytes: &[u8]) -> Result> { + let wire: CountMinSketchWire = rmp_serde::from_slice(bytes)?; + Ok(cms_from_matrix(wire.sketch, wire.rows, wire.cols)) +} + +/// Merge a slice of CMS references into a single new sketch. +pub fn cms_merge_refs( + sketches: &[&RuntimeCountMin], +) -> Result> { + let first = *sketches + .first() + .ok_or("cms_merge_refs called with empty input")?; + let rows = first.rows(); + let cols = first.cols(); + for s in sketches { + if s.rows() != rows || s.cols() != cols { + return Err(format!( + "CountMin dimension mismatch in merge: expected {rows}x{cols}, got {}x{}", + s.rows(), + s.cols() + ) + .into()); + } + } + let mut merged = cms_new(rows, cols); + for s in sketches { + merged.merge(s); + } + Ok(merged) +} + +// ============================================================================= +// KllSketch — sketches::KLL +// ============================================================================= + +/// Concrete runtime KLL type used by `DatasketchesKLLAccumulator`. +pub type RuntimeKll = KLL; + +pub fn kll_new(k: u16) -> RuntimeKll { + KLL::init_kll(k as i32) +} + +pub fn kll_update(sk: &mut RuntimeKll, value: f64) { + sk.update(&value); +} + +pub fn kll_quantile(sk: &RuntimeKll, q: f64) -> f64 { + if sk.count() == 0 { + return 0.0; + } + sk.quantile(q) +} + +/// Raw msgpack bytes of the KLL backend (sans the `k`-envelope outer +/// wrapper). Used by JSON output (base64-encoded) and the wire codec. +pub fn kll_sketch_bytes(sk: &RuntimeKll) -> Vec { + sk.serialize_to_bytes().unwrap_or_default() +} + +/// Serialize to the Go-compatible `KllSketchData { k, sketch_bytes }` +/// MessagePack envelope. +pub fn kll_to_msgpack(sk: &RuntimeKll) -> Vec { + let wire = KllSketchData { + k: sk.k() as u16, + sketch_bytes: kll_sketch_bytes(sk), + }; + rmp_serde::to_vec(&wire).unwrap_or_default() +} + +/// Deserialize from the Go-compatible `KllSketchData` envelope. +pub fn kll_from_msgpack(bytes: &[u8]) -> Result> { + let wire: KllSketchData = rmp_serde::from_slice(bytes)?; + Ok(KLL::deserialize_from_bytes(&wire.sketch_bytes)?) +} + +/// Merge a slice of KLL references into a single new sketch. All +/// inputs must share the same `k`. +pub fn kll_merge_refs( + sketches: &[&RuntimeKll], +) -> Result> { + let first = *sketches + .first() + .ok_or("kll_merge_refs called with empty input")?; + let k = first.k(); + for s in sketches { + if s.k() != k { + return Err(format!( + "KLL k mismatch in merge: expected {k}, got {}", + s.k() + ) + .into()); + } + } + let mut merged = kll_new(k as u16); + for s in sketches { + merged.merge(s); + } + Ok(merged) +} diff --git a/asap-query-engine/tests/e2e_precompute_equivalence.rs b/asap-query-engine/tests/e2e_precompute_equivalence.rs index b8c6953..6a8299b 100644 --- a/asap-query-engine/tests/e2e_precompute_equivalence.rs +++ b/asap-query-engine/tests/e2e_precompute_equivalence.rs @@ -7,7 +7,7 @@ //! 3. Advances the watermark past the window boundary to close it //! 4. Drains captured outputs and verifies equivalence with wire-format accumulators -use asap_sketchlib::sketches::kll::KllSketch; +use asap_sketchlib::KllSketch; use asap_types::aggregation_config::AggregationConfig; use asap_types::enums::{AggregationType, WindowType}; use flate2::{write::GzEncoder, Compression}; @@ -265,7 +265,7 @@ async fn e2e_kll_output_matches_arroyo() { // Sketch contents assert_eq!( - handcrafted_acc.inner.k, arroyo_acc.inner.k, + handcrafted_acc.inner.k(), arroyo_acc.inner.k(), "KLL k mismatch" ); assert_eq!(