Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 0 additions & 38 deletions asap-summary-ingest/run_arroyosketch.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ def create_pipeline(
template_path = os.path.join(udf_dir, f"{udf_name}.rs.j2")
regular_path = os.path.join(udf_dir, f"{udf_name}.rs")

# Get parameters for this UDF (impl_mode injected in main() for sketch UDFs)
params = dict(agg_function_params.get(udf_name, {}))

if len(params) > 0 and not os.path.exists(template_path):
Expand Down Expand Up @@ -943,20 +942,6 @@ def main(args):
filter_metric_name,
)

parameters = dict(parameters)
if agg_function in ("countminsketch_count", "countminsketch_sum"):
parameters["impl_mode"] = getattr(
args, "sketch_cms_impl", "legacy"
).capitalize()
elif agg_function == "countminsketchwithheap_topk":
parameters["impl_mode"] = getattr(
args, "sketch_cmwh_impl", "legacy"
).capitalize()
elif agg_function in ("datasketcheskll_", "hydrakll_"):
parameters["impl_mode"] = getattr(
args, "sketch_kll_impl", "sketchlib"
).capitalize()

sql_queries.append(sql_query)
# if not is_labels_accumulator:
agg_functions_with_params.append((agg_function, parameters))
Expand Down Expand Up @@ -1111,29 +1096,6 @@ def main(args):
help="Query language for schema interpretation (default: promql)",
)

# Sketch implementation mode - must match QueryEngine (--sketch-cms-impl etc.)
parser.add_argument(
"--sketch_cms_impl",
type=str,
choices=["legacy", "sketchlib"],
default="sketchlib",
help="Count-Min Sketch backend (legacy | sketchlib). Must match QueryEngine.",
)
parser.add_argument(
"--sketch_kll_impl",
type=str,
choices=["legacy", "sketchlib"],
default="sketchlib",
help="KLL Sketch backend (legacy | sketchlib). Must match QueryEngine.",
)
parser.add_argument(
"--sketch_cmwh_impl",
type=str,
choices=["legacy", "sketchlib"],
default="sketchlib",
help="Count-Min-With-Heap backend (legacy | sketchlib). Must match QueryEngine.",
)

args = parser.parse_args()
check_args(args)
main(args)
104 changes: 25 additions & 79 deletions asap-summary-ingest/templates/udfs/countminsketch_count.rs.j2
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,18 @@
[dependencies]
rmp-serde = "1.1"
serde = { version = "1.0", features = ["derive"] }
twox-hash = "2.1.0"
asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" }
*/

use arroyo_udf_plugin::udf;
use rmp_serde::Serializer;
use serde::{Deserialize, Serialize};
use twox_hash::XxHash32;

use asap_sketchlib::{CountMin as SketchlibCountMin, RegularPath, DataInput, Vector2D};

// Count-Min Sketch parameters
const DEPTH: usize = {{ depth }}; // Number of hash functions
const WIDTH: usize = {{ width }}; // Number of buckets per hash function

// Implementation mode for Count-Min Sketch. Set at compile time; no env vars.
enum ImplMode {
Legacy,
Sketchlib,
}

{% set _impl_mode = impl_mode | default("Sketchlib") %}
const IMPL_MODE: ImplMode = ImplMode::{% if _impl_mode == "Legacy" or _impl_mode == "Sketchlib" %}{{ _impl_mode }}{% else %}Sketchlib{% endif %};

fn use_sketchlib_for_cms() -> bool {
matches!(IMPL_MODE, ImplMode::Sketchlib)
}

type SketchlibCms = SketchlibCountMin<Vector2D<i64>, RegularPath>;

#[derive(Serialize, Deserialize, Clone)]
Expand All @@ -39,75 +23,37 @@ struct CountMinSketch {
col_num: usize,
}

impl CountMinSketch {
fn new() -> Self {
CountMinSketch {
sketch: vec![vec![0.0; WIDTH]; DEPTH],
row_num: DEPTH,
col_num: WIDTH,
}
}

// Legacy path: update the sketch with a key-value pair using twox-hash.
fn update(&mut self, key: &str, value: f64) {
for i in 0..self.row_num {
// already UTF-8
let hash = XxHash32::oneshot(i as u32, key.as_bytes());
let bucket = (hash as usize) % self.col_num;
self.sketch[i][bucket] += value;
}
}
}

#[udf]
fn countminsketch_count(keys: Vec<&str>, values: Vec<f64>) -> Option<Vec<u8>> {
if use_sketchlib_for_cms() {
// asap_sketchlib backed implementation: integer counters + internal hashing.
let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH);
let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH);

for &key in keys.iter() {
let input = DataInput::String(key.to_owned());
inner.insert_many(&input, 1);
}
for &key in keys.iter() {
let input = DataInput::String(key.to_owned());
inner.insert_many(&input, 1);
}

// Convert sketchlib storage to legacy matrix wire format.
let storage: &Vector2D<i64> = inner.as_storage();
let rows = storage.rows();
let cols = storage.cols();
let mut sketch = vec![vec![0.0; cols]; rows];
let storage: &Vector2D<i64> = inner.as_storage();
let rows = storage.rows();
let cols = storage.cols();
let mut sketch = vec![vec![0.0; cols]; rows];

for r in 0..rows {
for c in 0..cols {
if let Some(v) = storage.get(r, c) {
sketch[r][c] = *v as f64;
}
for r in 0..rows {
for c in 0..cols {
if let Some(v) = storage.get(r, c) {
sketch[r][c] = *v as f64;
}
}

let countminsketch = CountMinSketch {
sketch,
row_num: rows,
col_num: cols,
};

let mut buf = Vec::new();
countminsketch
.serialize(&mut Serializer::new(&mut buf))
.ok()?;
Some(buf)
} else {
// Legacy twox-hash backed implementation (unchanged).
let mut countminsketch = CountMinSketch::new();

// Iterate through the keys and update the sketch for each entry
for &key in keys.iter() {
countminsketch.update(key, 1.0);
}

let mut buf = Vec::new();
countminsketch
.serialize(&mut Serializer::new(&mut buf))
.ok()?;
Some(buf)
}

let countminsketch = CountMinSketch {
sketch,
row_num: rows,
col_num: cols,
};

let mut buf = Vec::new();
countminsketch
.serialize(&mut Serializer::new(&mut buf))
.ok()?;
Some(buf)
}
114 changes: 29 additions & 85 deletions asap-summary-ingest/templates/udfs/countminsketch_sum.rs.j2
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,18 @@
[dependencies]
rmp-serde = "1.1"
serde = { version = "1.0", features = ["derive"] }
twox-hash = "2.1.0"
asap_sketchlib = { git = "https://github.com/ProjectASAP/asap_sketchlib" }
*/

use arroyo_udf_plugin::udf;
use rmp_serde::Serializer;
use serde::{Deserialize, Serialize};
use twox_hash::XxHash32;

use asap_sketchlib::{CountMin as SketchlibCountMin, RegularPath, DataInput, Vector2D};

// Count-Min Sketch parameters
const DEPTH: usize = {{ depth }}; // Number of hash functions
const WIDTH: usize = {{ width }}; // Number of buckets per hash function

// Implementation mode for Count-Min Sketch. Set at compile time; no env vars.
enum ImplMode {
Legacy,
Sketchlib,
}

{% set _impl_mode = impl_mode | default("Sketchlib") %}
const IMPL_MODE: ImplMode = ImplMode::{% if _impl_mode == "Legacy" or _impl_mode == "Sketchlib" %}{{ _impl_mode }}{% else %}Sketchlib{% endif %};

fn use_sketchlib_for_cms() -> bool {
matches!(IMPL_MODE, ImplMode::Sketchlib)
}

type SketchlibCms = SketchlibCountMin<Vector2D<i64>, RegularPath>;

#[derive(Serialize, Deserialize, Clone)]
Expand All @@ -39,86 +23,46 @@ struct CountMinSketch {
col_num: usize,
}

impl CountMinSketch {
fn new() -> Self {
CountMinSketch {
sketch: vec![vec![0.0; WIDTH]; DEPTH],
row_num: DEPTH,
col_num: WIDTH,
}
}

// Legacy path: update the sketch with a key-value pair using twox-hash.
fn update(&mut self, key: &str, value: f64) {
for i in 0..self.row_num {
// already UTF-8
let hash = XxHash32::oneshot(i as u32, key.as_bytes());
let bucket = (hash as usize) % self.col_num;
self.sketch[i][bucket] += value;
}
}
}

#[udf]
fn countminsketch_sum(keys: Vec<&str>, values: Vec<f64>) -> Option<Vec<u8>> {
// Check that keys and values have equal length
if keys.len() != values.len() {
return None;
}

if use_sketchlib_for_cms() {
// asap_sketchlib backed implementation: integer counters + internal hashing.
let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH);
let mut inner = SketchlibCms::with_dimensions(DEPTH, WIDTH);

for (i, &key) in keys.iter().enumerate() {
let value = values[i];
// Values arrive as f64; Count-Min counters are integers.
let many = value.round() as i64;
if many <= 0 {
continue;
}
let input = DataInput::String(key.to_owned());
inner.insert_many(&input, many);
for (i, &key) in keys.iter().enumerate() {
let value = values[i];
let many = value.round() as i64;
if many <= 0 {
continue;
}
let input = DataInput::String(key.to_owned());
inner.insert_many(&input, many);
}

// Convert sketchlib storage to legacy matrix wire format.
let storage: &Vector2D<i64> = inner.as_storage();
let rows = storage.rows();
let cols = storage.cols();
let mut sketch = vec![vec![0.0; cols]; rows];
let storage: &Vector2D<i64> = inner.as_storage();
let rows = storage.rows();
let cols = storage.cols();
let mut sketch = vec![vec![0.0; cols]; rows];

for r in 0..rows {
for c in 0..cols {
if let Some(v) = storage.get(r, c) {
sketch[r][c] = *v as f64;
}
for r in 0..rows {
for c in 0..cols {
if let Some(v) = storage.get(r, c) {
sketch[r][c] = *v as f64;
}
}

let countminsketch = CountMinSketch {
sketch,
row_num: rows,
col_num: cols,
};

let mut buf = Vec::new();
countminsketch
.serialize(&mut Serializer::new(&mut buf))
.ok()?;
Some(buf)
} else {
// Legacy twox-hash backed implementation (unchanged).
let mut countminsketch = CountMinSketch::new();

// Iterate through the keys and values and update the sketch for each entry
for (i, &key) in keys.iter().enumerate() {
countminsketch.update(key, values[i]);
}

let mut buf = Vec::new();
countminsketch
.serialize(&mut Serializer::new(&mut buf))
.ok()?;
Some(buf)
}

let countminsketch = CountMinSketch {
sketch,
row_num: rows,
col_num: cols,
};

let mut buf = Vec::new();
countminsketch
.serialize(&mut Serializer::new(&mut buf))
.ok()?;
Some(buf)
}
Loading
Loading