From bdec5e45adcc83546eef1ce72749653af4851e42 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Wed, 6 May 2026 22:35:58 -0400 Subject: [PATCH 1/2] refactor(planner): refactored whole asap-planner --- asap-planner-rs/src/controller.rs | 161 ++++++ asap-planner-rs/src/lib.rs | 421 +--------------- asap-planner-rs/src/output/generator.rs | 3 +- asap-planner-rs/src/output/sql_generator.rs | 4 +- asap-planner-rs/src/planner/agg_config.rs | 225 +++++++++ asap-planner-rs/src/planner/cleanup.rs | 229 +++++++++ asap-planner-rs/src/planner/labels.rs | 22 + asap-planner-rs/src/planner/logics.rs | 477 ------------------ asap-planner-rs/src/planner/mod.rs | 13 +- .../planner/{single_query.rs => promql.rs} | 233 +-------- asap-planner-rs/src/planner/sketch.rs | 149 ++++++ .../planner/{sql_single_query.rs => sql.rs} | 8 +- asap-planner-rs/src/planner/window.rs | 85 ++++ asap-planner-rs/src/planner_output.rs | 230 +++++++++ asap-planner-rs/src/sql_controller.rs | 46 ++ 15 files changed, 1176 insertions(+), 1130 deletions(-) create mode 100644 asap-planner-rs/src/controller.rs create mode 100644 asap-planner-rs/src/planner/agg_config.rs create mode 100644 asap-planner-rs/src/planner/cleanup.rs create mode 100644 asap-planner-rs/src/planner/labels.rs delete mode 100644 asap-planner-rs/src/planner/logics.rs rename asap-planner-rs/src/planner/{single_query.rs => promql.rs} (60%) create mode 100644 asap-planner-rs/src/planner/sketch.rs rename asap-planner-rs/src/planner/{sql_single_query.rs => sql.rs} (96%) create mode 100644 asap-planner-rs/src/planner/window.rs create mode 100644 asap-planner-rs/src/planner_output.rs create mode 100644 asap-planner-rs/src/sql_controller.rs diff --git a/asap-planner-rs/src/controller.rs b/asap-planner-rs/src/controller.rs new file mode 100644 index 00000000..d1ebd69c --- /dev/null +++ b/asap-planner-rs/src/controller.rs @@ -0,0 +1,161 @@ +use std::path::Path; +use tracing::debug; + +use asap_types::PromQLSchema; +use promql_utilities::data_model::KeyByLabelNames; + +use crate::config::input::ControllerConfig; +use crate::error::ControllerError; +use crate::planner_output::PlannerOutput; +use crate::RuntimeOptions; +use crate::{output, prometheus_client, query_log}; + +pub struct Controller { + config: ControllerConfig, + schema: PromQLSchema, + options: RuntimeOptions, +} + +impl Controller { + pub fn new(config: ControllerConfig, schema: PromQLSchema, options: RuntimeOptions) -> Self { + Self { + config, + schema, + options, + } + } + + /// Build a `Controller` from a config file, fetching metric labels from Prometheus. + /// + /// `prometheus_url` is queried via `GET /api/v1/series?match[]=` for each metric + /// name found in the config's PromQL queries. + pub fn from_file( + path: &Path, + opts: RuntimeOptions, + prometheus_url: &str, + ) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?; + let all_queries: Vec = config + .query_groups + .iter() + .flat_map(|qg| qg.queries.clone()) + .collect(); + let mut schema = + prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?; + // For any metric that Prometheus had no series for, fall back to the + // `metrics` hint in the config file (if present). + if let Some(metric_hints) = &config.metrics { + for hint in metric_hints { + if !schema.config.contains_key(&hint.metric) { + debug!( + "Prometheus had no series for '{}'; falling back to config-file hint with labels {:?}", + hint.metric, hint.labels + ); + schema = schema.add_metric( + hint.metric.clone(), + KeyByLabelNames::new(hint.labels.clone()), + ); + } + } + } + Ok(Self { + config, + schema, + options: opts, + }) + } + + /// Build a `Controller` from a config file with a caller-supplied `PromQLSchema`. + /// + /// Use this when the schema is available without querying Prometheus (e.g. in tests + /// or when the schema is constructed in-process by the caller). + pub fn from_file_with_schema( + path: &Path, + schema: PromQLSchema, + opts: RuntimeOptions, + ) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?; + Ok(Self { + config, + schema, + options: opts, + }) + } + + /// Build a `Controller` from a YAML string with a caller-supplied `PromQLSchema`. + pub fn from_yaml_with_schema( + yaml: &str, + schema: PromQLSchema, + opts: RuntimeOptions, + ) -> Result { + let config: ControllerConfig = serde_yaml::from_str(yaml)?; + Ok(Self { + config, + schema, + options: opts, + }) + } + + /// Build a `Controller` from a Prometheus query log file, fetching metric labels from + /// Prometheus. + /// + /// - `log_path`: newline-delimited JSON query log (Prometheus `--query.log-file` output) + /// - `prometheus_url`: base URL queried for label discovery + pub fn from_query_log( + log_path: &Path, + opts: RuntimeOptions, + prometheus_url: &str, + ) -> Result { + let entries = query_log::parse_log_file(log_path)?; + let (instants, ranges) = + query_log::infer_queries(&entries, opts.prometheus_scrape_interval); + let config = query_log::to_controller_config(instants, ranges); + let all_queries: Vec = config + .query_groups + .iter() + .flat_map(|qg| qg.queries.clone()) + .collect(); + let schema = prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?; + Ok(Self { + config, + schema, + options: opts, + }) + } + + /// Build a `Controller` from a Prometheus query log file with a caller-supplied `PromQLSchema`. + /// + /// Use this when the schema is available without querying Prometheus (e.g. in tests). + pub fn from_query_log_with_schema( + log_path: &Path, + schema: PromQLSchema, + opts: RuntimeOptions, + ) -> Result { + let entries = query_log::parse_log_file(log_path)?; + let (instants, ranges) = + query_log::infer_queries(&entries, opts.prometheus_scrape_interval); + let config = query_log::to_controller_config(instants, ranges); + Ok(Self { + config, + schema, + options: opts, + }) + } + + pub fn generate(&self) -> Result { + let output = output::generator::generate_plan(&self.config, &self.schema, &self.options)?; + Ok(PlannerOutput::from_output(output)) + } + + pub fn generate_to_dir(&self, dir: &Path) -> Result { + let output = self.generate()?; + std::fs::create_dir_all(dir)?; + let streaming_str = serde_yaml::to_string(output.streaming_yaml())?; + let inference_str = serde_yaml::to_string(output.inference_yaml())?; + std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?; + std::fs::write(dir.join("inference_config.yaml"), inference_str)?; + Ok(output) + } +} diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 5aa3196b..8930e310 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -1,30 +1,23 @@ pub mod config; +pub mod controller; pub mod error; pub mod output; pub mod planner; +pub mod planner_output; pub mod prometheus_client; pub mod query_log; - -use asap_types::enums::QueryLanguage; -use asap_types::inference_config::InferenceConfig; -use asap_types::streaming_config::StreamingConfig; -use promql_utilities::data_model::KeyByLabelNames; -use serde_yaml::Value as YamlValue; -use std::path::Path; -use tracing::debug; +pub mod sql_controller; pub use asap_types::PromQLSchema; pub use config::input::ControllerConfig; pub use config::input::SQLControllerConfig; +pub use controller::Controller; pub use error::ControllerError; pub use output::generator::{GeneratorOutput, PuntedQuery}; -use output::generator::{ - KEY_AGGREGATIONS, KEY_AGG_SUB_TYPE, KEY_AGG_TYPE, KEY_LABELS, KEY_NUM_AGG_TO_RETAIN, - KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD, KEY_TABLE_NAME, KEY_VALUE_COLUMN, - KEY_WINDOW_SIZE, -}; pub use output::sql_generator::SQLRuntimeOptions; +pub use planner_output::PlannerOutput; pub use prometheus_client::build_schema_from_prometheus; +pub use sql_controller::SQLController; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamingEngine { @@ -41,405 +34,3 @@ pub struct RuntimeOptions { pub range_duration: u64, pub step: u64, } - -pub struct Controller { - config: ControllerConfig, - schema: PromQLSchema, - options: RuntimeOptions, -} - -/// Output of the planning process — contains the two YAML configs -pub struct PlannerOutput { - pub punted_queries: Vec, - streaming_yaml: YamlValue, - inference_yaml: YamlValue, - aggregation_count: usize, - query_count: usize, -} - -impl PlannerOutput { - pub fn streaming_aggregation_count(&self) -> usize { - self.aggregation_count - } - - pub fn inference_query_count(&self) -> usize { - self.query_count - } - - fn streaming_aggs_slice(&self) -> Option<&[YamlValue]> { - if let YamlValue::Mapping(root) = &self.streaming_yaml { - if let Some(YamlValue::Sequence(aggs)) = root.get(KEY_AGGREGATIONS) { - return Some(aggs.as_slice()); - } - } - None - } - - fn find_aggregation_by_type(&self, agg_type: &str) -> Option<&serde_yaml::Mapping> { - self.streaming_aggs_slice()?.iter().find_map(|agg| { - if let YamlValue::Mapping(m) = agg { - if matches!(m.get(KEY_AGG_TYPE), Some(YamlValue::String(s)) if s == agg_type) { - return Some(m); - } - } - None - }) - } - - pub fn has_aggregation_type(&self, t: &str) -> bool { - self.find_aggregation_by_type(t).is_some() - } - - pub fn all_tumbling_window_sizes_eq(&self, s: u64) -> bool { - self.check_tumbling_window_sizes(|size| size == s) - } - - pub fn all_tumbling_window_sizes_leq(&self, s: u64) -> bool { - self.check_tumbling_window_sizes(|size| size <= s) - } - - fn check_tumbling_window_sizes(&self, predicate: impl Fn(u64) -> bool) -> bool { - self.streaming_aggs_slice() - .map(|aggs| { - aggs.iter().all(|agg| { - if let YamlValue::Mapping(m) = agg { - if let Some(val) = m.get(KEY_WINDOW_SIZE) { - let size = match val { - YamlValue::Number(n) => n.as_u64().unwrap_or(0), - _ => 0, - }; - return predicate(size); - } - } - false - }) - }) - .unwrap_or(false) - } - - /// Returns the sorted labels for the first aggregation matching `agg_type`, - /// for the given `label_kind` ("rollup", "grouping", or "aggregated"). - pub fn aggregation_labels(&self, agg_type: &str, label_kind: &str) -> Vec { - let Some(seq) = self - .find_aggregation_by_type(agg_type) - .and_then(|m| m.get(KEY_LABELS)) - .and_then(|v| { - if let YamlValue::Mapping(lm) = v { - Some(lm) - } else { - None - } - }) - .and_then(|lm| lm.get(label_kind)) - .and_then(|v| { - if let YamlValue::Sequence(seq) = v { - Some(seq) - } else { - None - } - }) - else { - return vec![]; - }; - let mut result: Vec = seq - .iter() - .filter_map(|v| { - if let YamlValue::String(s) = v { - Some(s.clone()) - } else { - None - } - }) - .collect(); - result.sort(); - result - } - - /// Returns the cleanup param (read_count_threshold or num_aggregates_to_retain) - /// for the first aggregation entry of the given query string. - pub fn inference_cleanup_param(&self, query: &str) -> Option { - if let YamlValue::Mapping(root) = &self.inference_yaml { - if let Some(YamlValue::Sequence(queries)) = root.get(KEY_QUERIES) { - for q in queries { - if let YamlValue::Mapping(qm) = q { - if let Some(YamlValue::String(qs)) = qm.get(KEY_QUERY) { - if qs == query { - if let Some(YamlValue::Sequence(aggs)) = qm.get(KEY_AGGREGATIONS) { - if let Some(YamlValue::Mapping(agg)) = aggs.first() { - for key in [KEY_READ_COUNT_THRESHOLD, KEY_NUM_AGG_TO_RETAIN] - { - if let Some(YamlValue::Number(n)) = agg.get(key) { - return n.as_u64(); - } - } - } - } - } - } - } - } - } - } - None - } - - pub fn to_streaming_yaml_string(&self) -> Result { - Ok(serde_yaml::to_string(&self.streaming_yaml)?) - } - - pub fn to_inference_yaml_string(&self) -> Result { - Ok(serde_yaml::to_string(&self.inference_yaml)?) - } - - pub fn to_streaming_config( - &self, - query_language: QueryLanguage, - ) -> Result { - let inference_config = self.to_inference_config(query_language)?; - StreamingConfig::from_yaml_data(&self.streaming_yaml, Some(&inference_config)) - } - - pub fn to_inference_config( - &self, - query_language: QueryLanguage, - ) -> Result { - InferenceConfig::from_yaml_data(&self.inference_yaml, query_language) - } - - /// Returns the table_name field of the first aggregation matching agg_type. - pub fn aggregation_table_name(&self, agg_type: &str) -> Option { - self.find_aggregation_by_type(agg_type) - .and_then(|m| m.get(KEY_TABLE_NAME)) - .and_then(|v| { - if let YamlValue::String(s) = v { - Some(s.clone()) - } else { - None - } - }) - } - - /// Returns the value_column field of the first aggregation matching agg_type. - pub fn aggregation_value_column(&self, agg_type: &str) -> Option { - self.find_aggregation_by_type(agg_type) - .and_then(|m| m.get(KEY_VALUE_COLUMN)) - .and_then(|v| { - if let YamlValue::String(s) = v { - Some(s.clone()) - } else { - None - } - }) - } - - /// Returns true if any aggregation has the matching type AND sub_type. - pub fn has_aggregation_type_and_sub_type(&self, agg_type: &str, sub_type: &str) -> bool { - self.streaming_aggs_slice() - .map(|aggs| { - aggs.iter().any(|agg| { - if let YamlValue::Mapping(m) = agg { - matches!(m.get(KEY_AGG_TYPE), Some(YamlValue::String(s)) if s == agg_type) - && matches!(m.get(KEY_AGG_SUB_TYPE), Some(YamlValue::String(s)) if s == sub_type) - } else { - false - } - }) - }) - .unwrap_or(false) - } -} - -pub struct SQLController { - config: SQLControllerConfig, - options: SQLRuntimeOptions, -} - -impl SQLController { - pub fn new(config: SQLControllerConfig, options: SQLRuntimeOptions) -> Self { - Self { config, options } - } - - pub fn from_file(path: &Path, opts: SQLRuntimeOptions) -> Result { - let yaml_str = std::fs::read_to_string(path)?; - Self::from_yaml(&yaml_str, opts) - } - - pub fn from_yaml(yaml: &str, opts: SQLRuntimeOptions) -> Result { - let config: SQLControllerConfig = serde_yaml::from_str(yaml)?; - Ok(Self { - config, - options: opts, - }) - } - - pub fn generate(&self) -> Result { - let output = output::sql_generator::generate_sql_plan(&self.config, &self.options)?; - Ok(PlannerOutput { - punted_queries: output.punted_queries, - streaming_yaml: output.streaming_yaml, - inference_yaml: output.inference_yaml, - aggregation_count: output.aggregation_count, - query_count: output.query_count, - }) - } - - pub fn generate_to_dir(&self, dir: &Path) -> Result { - let output = self.generate()?; - std::fs::create_dir_all(dir)?; - let streaming_str = serde_yaml::to_string(&output.streaming_yaml)?; - let inference_str = serde_yaml::to_string(&output.inference_yaml)?; - std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?; - std::fs::write(dir.join("inference_config.yaml"), inference_str)?; - Ok(output) - } -} - -impl Controller { - pub fn new(config: ControllerConfig, schema: PromQLSchema, options: RuntimeOptions) -> Self { - Self { - config, - schema, - options, - } - } - - /// Build a `Controller` from a config file, fetching metric labels from Prometheus. - /// - /// `prometheus_url` is queried via `GET /api/v1/series?match[]=` for each metric - /// name found in the config's PromQL queries. - pub fn from_file( - path: &Path, - opts: RuntimeOptions, - prometheus_url: &str, - ) -> Result { - let yaml_str = std::fs::read_to_string(path)?; - let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?; - let all_queries: Vec = config - .query_groups - .iter() - .flat_map(|qg| qg.queries.clone()) - .collect(); - let mut schema = - prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?; - // For any metric that Prometheus had no series for, fall back to the - // `metrics` hint in the config file (if present). - if let Some(metric_hints) = &config.metrics { - for hint in metric_hints { - if !schema.config.contains_key(&hint.metric) { - debug!( - "Prometheus had no series for '{}'; falling back to config-file hint with labels {:?}", - hint.metric, hint.labels - ); - schema = schema.add_metric( - hint.metric.clone(), - KeyByLabelNames::new(hint.labels.clone()), - ); - } - } - } - Ok(Self { - config, - schema, - options: opts, - }) - } - - /// Build a `Controller` from a config file with a caller-supplied `PromQLSchema`. - /// - /// Use this when the schema is available without querying Prometheus (e.g. in tests - /// or when the schema is constructed in-process by the caller). - pub fn from_file_with_schema( - path: &Path, - schema: PromQLSchema, - opts: RuntimeOptions, - ) -> Result { - let yaml_str = std::fs::read_to_string(path)?; - let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?; - Ok(Self { - config, - schema, - options: opts, - }) - } - - /// Build a `Controller` from a YAML string with a caller-supplied `PromQLSchema`. - pub fn from_yaml_with_schema( - yaml: &str, - schema: PromQLSchema, - opts: RuntimeOptions, - ) -> Result { - let config: ControllerConfig = serde_yaml::from_str(yaml)?; - Ok(Self { - config, - schema, - options: opts, - }) - } - - /// Build a `Controller` from a Prometheus query log file, fetching metric labels from - /// Prometheus. - /// - /// - `log_path`: newline-delimited JSON query log (Prometheus `--query.log-file` output) - /// - `prometheus_url`: base URL queried for label discovery - pub fn from_query_log( - log_path: &Path, - opts: RuntimeOptions, - prometheus_url: &str, - ) -> Result { - let entries = query_log::parse_log_file(log_path)?; - let (instants, ranges) = - query_log::infer_queries(&entries, opts.prometheus_scrape_interval); - let config = query_log::to_controller_config(instants, ranges); - let all_queries: Vec = config - .query_groups - .iter() - .flat_map(|qg| qg.queries.clone()) - .collect(); - let schema = prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?; - Ok(Self { - config, - schema, - options: opts, - }) - } - - /// Build a `Controller` from a Prometheus query log file with a caller-supplied `PromQLSchema`. - /// - /// Use this when the schema is available without querying Prometheus (e.g. in tests). - pub fn from_query_log_with_schema( - log_path: &Path, - schema: PromQLSchema, - opts: RuntimeOptions, - ) -> Result { - let entries = query_log::parse_log_file(log_path)?; - let (instants, ranges) = - query_log::infer_queries(&entries, opts.prometheus_scrape_interval); - let config = query_log::to_controller_config(instants, ranges); - Ok(Self { - config, - schema, - options: opts, - }) - } - - pub fn generate(&self) -> Result { - let output = output::generator::generate_plan(&self.config, &self.schema, &self.options)?; - Ok(PlannerOutput { - punted_queries: output.punted_queries, - streaming_yaml: output.streaming_yaml, - inference_yaml: output.inference_yaml, - aggregation_count: output.aggregation_count, - query_count: output.query_count, - }) - } - - pub fn generate_to_dir(&self, dir: &Path) -> Result { - let output = self.generate()?; - std::fs::create_dir_all(dir)?; - let streaming_str = serde_yaml::to_string(&output.streaming_yaml)?; - let inference_str = serde_yaml::to_string(&output.inference_yaml)?; - std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?; - std::fs::write(dir.join("inference_config.yaml"), inference_str)?; - Ok(output) - } -} diff --git a/asap-planner-rs/src/output/generator.rs b/asap-planner-rs/src/output/generator.rs index ce902400..42a32764 100644 --- a/asap-planner-rs/src/output/generator.rs +++ b/asap-planner-rs/src/output/generator.rs @@ -9,7 +9,8 @@ use promql_utilities::data_model::KeyByLabelNames; use crate::config::input::ControllerConfig; use crate::error::ControllerError; -use crate::planner::single_query::{BinaryArm, IntermediateAggConfig, SingleQueryProcessor}; +use crate::planner::agg_config::IntermediateAggConfig; +use crate::planner::promql::{BinaryArm, SingleQueryProcessor}; use crate::RuntimeOptions; // YAML key constants — shared with sql_generator.rs and lib.rs via pub(crate) diff --git a/asap-planner-rs/src/output/sql_generator.rs b/asap-planner-rs/src/output/sql_generator.rs index 23bbda73..881f532b 100644 --- a/asap-planner-rs/src/output/sql_generator.rs +++ b/asap-planner-rs/src/output/sql_generator.rs @@ -11,8 +11,8 @@ use crate::output::generator::{ KEY_CLEANUP_POLICY, KEY_METADATA_COLUMNS, KEY_NAME, KEY_QUERIES, KEY_TABLES, KEY_TIME_COLUMN, KEY_VALUE_COLUMNS, }; -use crate::planner::single_query::IntermediateAggConfig; -use crate::planner::sql_single_query::SQLSingleQueryProcessor; +use crate::planner::agg_config::IntermediateAggConfig; +use crate::planner::sql::SQLSingleQueryProcessor; use crate::StreamingEngine; pub struct SQLRuntimeOptions { diff --git a/asap-planner-rs/src/planner/agg_config.rs b/asap-planner-rs/src/planner/agg_config.rs new file mode 100644 index 00000000..af891f22 --- /dev/null +++ b/asap-planner-rs/src/planner/agg_config.rs @@ -0,0 +1,225 @@ +use asap_types::enums::WindowType; +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::query_logics::enums::{AggregationType, QueryTreatmentType, Statistic}; +use promql_utilities::query_logics::logics::map_statistic_to_precompute_operator; +use serde_json::Value; +use std::collections::HashMap; + +use crate::planner::labels::set_subpopulation_labels; +use crate::planner::window::IntermediateWindowConfig; + +/// Internal representation of an aggregation config before IDs are assigned +#[derive(Debug, Clone)] +pub struct IntermediateAggConfig { + pub aggregation_type: AggregationType, + pub aggregation_sub_type: String, + pub window_type: WindowType, + pub window_size: u64, + pub slide_interval: u64, + pub spatial_filter: String, + pub metric: String, + pub table_name: Option, + pub value_column: Option, + pub parameters: HashMap, + pub rollup_labels: KeyByLabelNames, + pub grouping_labels: KeyByLabelNames, + pub aggregated_labels: KeyByLabelNames, +} + +impl IntermediateAggConfig { + /// Canonical deduplication key matching Python's get_identifying_key() + pub fn identifying_key(&self) -> String { + // Build a canonical string representation matching Python's tuple + let mut params_vec: Vec<(String, String)> = self + .parameters + .iter() + .map(|(k, v)| (k.clone(), v.to_string())) + .collect(); + params_vec.sort_by_key(|(k, _)| k.clone()); + + let mut label_parts = String::new(); + // sorted label keys: aggregated, grouping, rollup + let mut label_keys = vec!["aggregated", "grouping", "rollup"]; + label_keys.sort(); + for k in label_keys { + let labels = match k { + "aggregated" => &self.aggregated_labels, + "grouping" => &self.grouping_labels, + "rollup" => &self.rollup_labels, + _ => unreachable!(), + }; + label_parts.push_str(&format!("{}:{:?};", k, labels.labels)); + } + + format!( + "{}|{}|{}|{}|{}|{}|{}|{:?}|{:?}|{:?}|{}", + self.aggregation_type, + self.aggregation_sub_type, + self.window_type, + self.window_size, + self.slide_interval, + self.spatial_filter, + self.metric, + self.table_name, + self.value_column, + params_vec, + label_parts, + ) + } +} + +/// Shared per-statistic config builder used by both PromQL and SQL paths. +/// +/// `get_params(agg_type, agg_sub_type)` is a closure supplied by the caller +/// that resolves sketch parameters; it is the only thing that differs between +/// the two paths. +#[allow(clippy::too_many_arguments)] +pub fn build_agg_configs_for_statistics( + statistics: &[Statistic], + treatment_type: QueryTreatmentType, + subpopulation_labels: &KeyByLabelNames, + rollup: &KeyByLabelNames, + window_cfg: &IntermediateWindowConfig, + metric: &str, + table_name: Option<&str>, + value_column: Option<&str>, + spatial_filter: &str, + get_params: impl Fn(AggregationType, &str) -> Result, String>, +) -> Result, String> { + let mut configs = Vec::new(); + + for statistic in statistics.iter().copied() { + let (agg_type, agg_sub_type) = + map_statistic_to_precompute_operator(statistic, treatment_type)?; + + let mut grouping = KeyByLabelNames::empty(); + let mut aggregated = KeyByLabelNames::empty(); + set_subpopulation_labels( + statistic, + agg_type, + subpopulation_labels, + &mut rollup.clone(), + &mut grouping, + &mut aggregated, + ); + + if matches!( + agg_type, + AggregationType::CountMinSketch | AggregationType::HydraKLL + ) { + let delta_params = get_params(AggregationType::DeltaSetAggregator, "")?; + configs.push(IntermediateAggConfig { + aggregation_type: AggregationType::DeltaSetAggregator, + aggregation_sub_type: String::new(), + window_type: window_cfg.window_type, + window_size: window_cfg.window_size, + slide_interval: window_cfg.slide_interval, + spatial_filter: spatial_filter.to_string(), + metric: metric.to_string(), + table_name: table_name.map(str::to_string), + value_column: value_column.map(str::to_string), + parameters: delta_params, + rollup_labels: rollup.clone(), + grouping_labels: grouping.clone(), + aggregated_labels: aggregated.clone(), + }); + } + + let parameters = get_params(agg_type, &agg_sub_type)?; + configs.push(IntermediateAggConfig { + aggregation_type: agg_type, + aggregation_sub_type: agg_sub_type, + window_type: window_cfg.window_type, + window_size: window_cfg.window_size, + slide_interval: window_cfg.slide_interval, + spatial_filter: spatial_filter.to_string(), + metric: metric.to_string(), + table_name: table_name.map(str::to_string), + value_column: value_column.map(str::to_string), + parameters, + rollup_labels: rollup.clone(), + grouping_labels: grouping, + aggregated_labels: aggregated, + }); + } + + Ok(configs) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::Value; + use std::collections::HashMap; + + fn base_config() -> IntermediateAggConfig { + IntermediateAggConfig { + aggregation_type: AggregationType::MultipleIncrease, + aggregation_sub_type: "rate".to_string(), + window_type: WindowType::Tumbling, + window_size: 300, + slide_interval: 300, + spatial_filter: String::new(), + metric: "http_requests_total".to_string(), + table_name: None, + value_column: None, + parameters: HashMap::new(), + rollup_labels: KeyByLabelNames::new(vec!["instance".to_string()]), + grouping_labels: KeyByLabelNames::empty(), + aggregated_labels: KeyByLabelNames::empty(), + } + } + + #[test] + fn identifying_key_is_stable() { + let cfg = base_config(); + assert_eq!(cfg.identifying_key(), cfg.identifying_key()); + } + + #[test] + fn identical_configs_have_same_key() { + assert_eq!( + base_config().identifying_key(), + base_config().identifying_key() + ); + } + + #[test] + fn different_aggregation_type_produces_different_key() { + let cfg1 = base_config(); + let mut cfg2 = base_config(); + cfg2.aggregation_type = AggregationType::DatasketchesKLL; + assert_ne!(cfg1.identifying_key(), cfg2.identifying_key()); + } + + #[test] + fn different_window_size_produces_different_key() { + let cfg1 = base_config(); + let mut cfg2 = base_config(); + cfg2.window_size = 60; + assert_ne!(cfg1.identifying_key(), cfg2.identifying_key()); + } + + #[test] + fn different_rollup_labels_produce_different_key() { + let cfg1 = base_config(); + let mut cfg2 = base_config(); + cfg2.rollup_labels = KeyByLabelNames::new(vec!["job".to_string()]); + assert_ne!(cfg1.identifying_key(), cfg2.identifying_key()); + } + + #[test] + fn parameter_insertion_order_does_not_affect_key() { + let mut cfg1 = base_config(); + let mut cfg2 = base_config(); + cfg1.parameters + .insert("depth".to_string(), Value::Number(3.into())); + cfg1.parameters + .insert("width".to_string(), Value::Number(1024.into())); + cfg2.parameters + .insert("width".to_string(), Value::Number(1024.into())); + cfg2.parameters + .insert("depth".to_string(), Value::Number(3.into())); + assert_eq!(cfg1.identifying_key(), cfg2.identifying_key()); + } +} diff --git a/asap-planner-rs/src/planner/cleanup.rs b/asap-planner-rs/src/planner/cleanup.rs new file mode 100644 index 00000000..838e084a --- /dev/null +++ b/asap-planner-rs/src/planner/cleanup.rs @@ -0,0 +1,229 @@ +use asap_types::enums::{CleanupPolicy, WindowType}; +use promql_utilities::ast_matching::PromQLMatchResult; +use promql_utilities::query_logics::enums::QueryPatternType; + +use super::window::get_effective_repeat; + +pub fn get_cleanup_param( + cleanup_policy: CleanupPolicy, + query_pattern_type: QueryPatternType, + match_result: &PromQLMatchResult, + t_repeat: u64, + window_type: WindowType, + range_duration: u64, + step: u64, +) -> Result { + // Validation + if (range_duration == 0) != (step == 0) { + return Err(format!( + "range_duration and step must both be 0 or both > 0. Got range_duration={}, step={}", + range_duration, step + )); + } + + let is_range_query = step > 0; + + let t_lookback: u64 = if query_pattern_type == QueryPatternType::OnlySpatial { + t_repeat + } else { + match_result + .get_range_duration() + .map(|d| d.num_seconds() as u64) + .ok_or_else(|| "No range_vector token found".to_string())? + }; + + if window_type == WindowType::Sliding { + let result = if is_range_query { + range_duration / step + 1 + } else { + 1 + }; + return Ok(result); + } + + // Tumbling + let effective_repeat = get_effective_repeat(t_repeat, step); + + let result = match cleanup_policy { + CleanupPolicy::CircularBuffer => { + // ceil((t_lookback + range_duration) / effective_repeat) + let numerator = t_lookback + range_duration; + numerator.div_ceil(effective_repeat) + } + CleanupPolicy::ReadBased => { + // ceil(t_lookback / effective_repeat) * (range_duration / step + 1) + let lookback_buckets = t_lookback.div_ceil(effective_repeat); + let num_steps = if is_range_query { + range_duration / step + 1 + } else { + 1 + }; + lookback_buckets * num_steps + } + CleanupPolicy::NoCleanup => { + return Err("NoCleanup policy should not call get_cleanup_param".to_string()); + } + }; + + Ok(result) +} + +/// SQL cleanup param — SQL queries are always instant (no range_duration/step). +pub fn get_sql_cleanup_param( + cleanup_policy: CleanupPolicy, + t_lookback: u64, + t_repeat: u64, +) -> Result { + match cleanup_policy { + CleanupPolicy::CircularBuffer | CleanupPolicy::ReadBased => { + Ok(t_lookback.div_ceil(t_repeat)) + } + CleanupPolicy::NoCleanup => { + Err("NoCleanup policy should not call get_sql_cleanup_param".to_string()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::planner::patterns::build_patterns; + + use promql_utilities::ast_matching::PromQLMatchResult; + use promql_utilities::query_logics::enums::QueryPatternType; + + fn match_query(query: &str) -> (QueryPatternType, PromQLMatchResult) { + let ast = promql_parser::parser::parse(query).unwrap(); + let patterns = build_patterns(); + for (pt, pattern) in &patterns { + let result = pattern.matches(&ast); + if result.matches { + return (*pt, result); + } + } + panic!("no pattern matched query: {}", query); + } + + #[test] + fn cleanup_param_circular_buffer_spatial_instant_query() { + let (pt, mr) = match_query("sum(some_metric)"); + assert_eq!(pt, QueryPatternType::OnlySpatial); + // t_lookback = t_repeat = 300 (OnlySpatial path) + // effective_repeat = 300 (step=0) + // ceil((300 + 0) / 300) = 1 + let result = get_cleanup_param( + CleanupPolicy::CircularBuffer, + pt, + &mr, + 300, + WindowType::Tumbling, + 0, + 0, + ) + .unwrap(); + assert_eq!(result, 1); + } + + #[test] + fn cleanup_param_circular_buffer_spatial_range_query() { + let (pt, mr) = match_query("sum(some_metric)"); + // t_lookback = t_repeat = 300, effective_repeat = min(300, 30) = 30 + // ceil((300 + 3600) / 30) = ceil(130) = 130 + let result = get_cleanup_param( + CleanupPolicy::CircularBuffer, + pt, + &mr, + 300, + WindowType::Tumbling, + 3600, + 30, + ) + .unwrap(); + assert_eq!(result, 130); + } + + #[test] + fn cleanup_param_read_based_spatial_instant_query() { + let (pt, mr) = match_query("sum(some_metric)"); + // lookback_buckets = ceil(300/300) = 1, num_steps = 1 → result = 1 + let result = get_cleanup_param( + CleanupPolicy::ReadBased, + pt, + &mr, + 300, + WindowType::Tumbling, + 0, + 0, + ) + .unwrap(); + assert_eq!(result, 1); + } + + #[test] + fn cleanup_param_read_based_spatial_range_query() { + let (pt, mr) = match_query("sum(some_metric)"); + // lookback_buckets = ceil(300/30) = 10, num_steps = 3600/30 + 1 = 121 + // result = 10 * 121 = 1210 + let result = get_cleanup_param( + CleanupPolicy::ReadBased, + pt, + &mr, + 300, + WindowType::Tumbling, + 3600, + 30, + ) + .unwrap(); + assert_eq!(result, 1210); + } + + #[test] + fn cleanup_param_circular_buffer_temporal_instant_query() { + let (pt, mr) = match_query("rate(some_metric[5m])"); + assert_eq!(pt, QueryPatternType::OnlyTemporal); + // t_lookback = 5m = 300s (from [5m] range vector), range_duration=0, step=0 + // effective_repeat = 60, ceil((300 + 0) / 60) = 5 + let result = get_cleanup_param( + CleanupPolicy::CircularBuffer, + pt, + &mr, + 60, + WindowType::Tumbling, + 0, + 0, + ) + .unwrap(); + assert_eq!(result, 5); + } + + #[test] + fn cleanup_param_no_cleanup_returns_error() { + let (pt, mr) = match_query("sum(some_metric)"); + let result = get_cleanup_param( + CleanupPolicy::NoCleanup, + pt, + &mr, + 300, + WindowType::Tumbling, + 0, + 0, + ); + assert!(result.is_err()); + } + + #[test] + fn cleanup_param_mismatched_range_and_step_returns_error() { + let (pt, mr) = match_query("sum(some_metric)"); + // range_duration > 0 but step == 0 is invalid + let result = get_cleanup_param( + CleanupPolicy::CircularBuffer, + pt, + &mr, + 300, + WindowType::Tumbling, + 3600, + 0, + ); + assert!(result.is_err()); + } +} diff --git a/asap-planner-rs/src/planner/labels.rs b/asap-planner-rs/src/planner/labels.rs new file mode 100644 index 00000000..5ab35715 --- /dev/null +++ b/asap-planner-rs/src/planner/labels.rs @@ -0,0 +1,22 @@ +use promql_utilities::data_model::KeyByLabelNames; +use promql_utilities::query_logics::enums::{AggregationType, Statistic}; +use promql_utilities::query_logics::logics::does_precompute_operator_support_subpopulations; + +pub fn set_subpopulation_labels( + statistic: Statistic, + aggregation_type: AggregationType, + subpopulation_labels: &KeyByLabelNames, + rollup_labels: &mut KeyByLabelNames, + grouping_labels: &mut KeyByLabelNames, + aggregated_labels: &mut KeyByLabelNames, +) { + // rollup is set by caller before calling this function + let _ = rollup_labels; // not modified here + if does_precompute_operator_support_subpopulations(statistic, aggregation_type) { + *grouping_labels = KeyByLabelNames::empty(); + *aggregated_labels = subpopulation_labels.clone(); + } else { + *grouping_labels = subpopulation_labels.clone(); + *aggregated_labels = KeyByLabelNames::empty(); + } +} diff --git a/asap-planner-rs/src/planner/logics.rs b/asap-planner-rs/src/planner/logics.rs deleted file mode 100644 index 3b261937..00000000 --- a/asap-planner-rs/src/planner/logics.rs +++ /dev/null @@ -1,477 +0,0 @@ -use crate::config::input::SketchParameterOverrides; -use asap_types::enums::{CleanupPolicy, WindowType}; -use promql_utilities::ast_matching::PromQLMatchResult; -use promql_utilities::data_model::KeyByLabelNames; -use promql_utilities::query_logics::enums::{AggregationType, QueryPatternType, Statistic}; -use promql_utilities::query_logics::logics::does_precompute_operator_support_subpopulations; -use std::collections::HashMap; - -// Default sketch parameters -const DEFAULT_CMS_DEPTH: u64 = 3; -const DEFAULT_CMS_WIDTH: u64 = 1024; -const DEFAULT_CMS_HEAP_MULT: u64 = 4; -const DEFAULT_KLL_K: u64 = 500; -const DEFAULT_HYDRA_ROW: u64 = 3; -const DEFAULT_HYDRA_COL: u64 = 1024; -const DEFAULT_HYDRA_K: u64 = 20; - -pub fn get_effective_repeat(t_repeat: u64, step: u64) -> u64 { - if step > 0 { - t_repeat.min(step) - } else { - t_repeat - } -} - -pub fn should_use_sliding_window( - _query_pattern_type: QueryPatternType, - _aggregation_type: &str, -) -> bool { - // HARDCODED: sliding windows crash Arroyo - false -} - -pub fn set_window_parameters( - query_pattern_type: QueryPatternType, - t_repeat: u64, - prometheus_scrape_interval: u64, - aggregation_type: &str, - step: u64, - config: &mut IntermediateWindowConfig, -) { - let effective_repeat = get_effective_repeat(t_repeat, step); - let _use_sliding = should_use_sliding_window(query_pattern_type, aggregation_type); - // use_sliding is always false, so always tumbling - set_tumbling_window_parameters( - query_pattern_type, - effective_repeat, - prometheus_scrape_interval, - config, - ); -} - -fn set_tumbling_window_parameters( - query_pattern_type: QueryPatternType, - effective_repeat: u64, - prometheus_scrape_interval: u64, - config: &mut IntermediateWindowConfig, -) { - match query_pattern_type { - QueryPatternType::OnlyTemporal | QueryPatternType::OneTemporalOneSpatial => { - config.window_size = effective_repeat; - config.slide_interval = effective_repeat; - config.window_type = WindowType::Tumbling; - } - QueryPatternType::OnlySpatial => { - config.window_size = prometheus_scrape_interval; - config.slide_interval = prometheus_scrape_interval; - config.window_type = WindowType::Tumbling; - } - } -} - -/// A mutable window config holder used during planning -#[derive(Debug, Clone, Default)] -pub struct IntermediateWindowConfig { - pub window_size: u64, - pub slide_interval: u64, - pub window_type: WindowType, -} - -/// Shared sketch parameter builder used by both PromQL and SQL paths. -/// -/// `topk_k` is only required for `CountMinSketchWithHeap`: PromQL supplies it -/// from the `topk(k, …)` query argument; SQL passes `None` (SQL never produces -/// this operator today, so the `None` branch is unreachable in practice). -pub fn build_sketch_parameters( - aggregation_type: AggregationType, - aggregation_sub_type: &str, - topk_k: Option, - sketch_params: Option<&SketchParameterOverrides>, -) -> Result, String> { - match aggregation_type { - AggregationType::Increase - | AggregationType::MinMax - | AggregationType::Sum - | AggregationType::MultipleIncrease - | AggregationType::MultipleMinMax - | AggregationType::MultipleSum - | AggregationType::DeltaSetAggregator - | AggregationType::SetAggregator => Ok(HashMap::new()), - - AggregationType::CountMinSketch => { - let depth = sketch_params - .and_then(|p| p.count_min_sketch.as_ref()) - .map(|p| p.depth) - .unwrap_or(DEFAULT_CMS_DEPTH); - let width = sketch_params - .and_then(|p| p.count_min_sketch.as_ref()) - .map(|p| p.width) - .unwrap_or(DEFAULT_CMS_WIDTH); - let mut m = HashMap::new(); - m.insert("depth".to_string(), serde_json::Value::Number(depth.into())); - m.insert("width".to_string(), serde_json::Value::Number(width.into())); - Ok(m) - } - - AggregationType::CountMinSketchWithHeap => { - if aggregation_sub_type != "topk" { - return Err(format!( - "Aggregation sub-type {} for CountMinSketchWithHeap not supported", - aggregation_sub_type - )); - } - let k = topk_k - .ok_or_else(|| "CountMinSketchWithHeap requires a topk k value".to_string())?; - let depth = sketch_params - .and_then(|p| p.count_min_sketch_with_heap.as_ref()) - .map(|p| p.depth) - .unwrap_or(DEFAULT_CMS_DEPTH); - let width = sketch_params - .and_then(|p| p.count_min_sketch_with_heap.as_ref()) - .map(|p| p.width) - .unwrap_or(DEFAULT_CMS_WIDTH); - let heap_mult = sketch_params - .and_then(|p| p.count_min_sketch_with_heap.as_ref()) - .and_then(|p| p.heap_multiplier) - .unwrap_or(DEFAULT_CMS_HEAP_MULT); - let mut m = HashMap::new(); - m.insert("depth".to_string(), serde_json::Value::Number(depth.into())); - m.insert("width".to_string(), serde_json::Value::Number(width.into())); - m.insert( - "heapsize".to_string(), - serde_json::Value::Number((k * heap_mult).into()), - ); - Ok(m) - } - - AggregationType::DatasketchesKLL => { - let k = sketch_params - .and_then(|p| p.datasketches_kll.as_ref()) - .map(|p| p.k) - .unwrap_or(DEFAULT_KLL_K); - let mut m = HashMap::new(); - m.insert("K".to_string(), serde_json::Value::Number(k.into())); - Ok(m) - } - - AggregationType::HydraKLL => { - let row_num = sketch_params - .and_then(|p| p.hydra_kll.as_ref()) - .map(|p| p.row_num) - .unwrap_or(DEFAULT_HYDRA_ROW); - let col_num = sketch_params - .and_then(|p| p.hydra_kll.as_ref()) - .map(|p| p.col_num) - .unwrap_or(DEFAULT_HYDRA_COL); - let k = sketch_params - .and_then(|p| p.hydra_kll.as_ref()) - .map(|p| p.k) - .unwrap_or(DEFAULT_HYDRA_K); - let mut m = HashMap::new(); - m.insert( - "row_num".to_string(), - serde_json::Value::Number(row_num.into()), - ); - m.insert( - "col_num".to_string(), - serde_json::Value::Number(col_num.into()), - ); - m.insert("k".to_string(), serde_json::Value::Number(k.into())); - Ok(m) - } - - other => Err(format!("Aggregation type {} not supported", other)), - } -} - -/// PromQL wrapper: extracts the topk `k` from the match result when needed, -/// then delegates to `build_sketch_parameters`. -pub fn build_sketch_parameters_from_promql( - aggregation_type: AggregationType, - aggregation_sub_type: &str, - match_result: &PromQLMatchResult, - sketch_params: Option<&SketchParameterOverrides>, -) -> Result, String> { - let topk_k = if aggregation_type == AggregationType::CountMinSketchWithHeap { - let k: u64 = match_result - .tokens - .get("aggregation") - .and_then(|t| t.aggregation.as_ref()) - .and_then(|a| a.param.as_ref()) - .and_then(|p| p.parse::().ok()) - .map(|f| f as u64) - .ok_or_else(|| "topk query missing required 'k' parameter".to_string())?; - Some(k) - } else { - None - }; - build_sketch_parameters( - aggregation_type, - aggregation_sub_type, - topk_k, - sketch_params, - ) -} - -pub fn get_cleanup_param( - cleanup_policy: CleanupPolicy, - query_pattern_type: QueryPatternType, - match_result: &PromQLMatchResult, - t_repeat: u64, - window_type: WindowType, - range_duration: u64, - step: u64, -) -> Result { - // Validation - if (range_duration == 0) != (step == 0) { - return Err(format!( - "range_duration and step must both be 0 or both > 0. Got range_duration={}, step={}", - range_duration, step - )); - } - - let is_range_query = step > 0; - - let t_lookback: u64 = if query_pattern_type == QueryPatternType::OnlySpatial { - t_repeat - } else { - match_result - .get_range_duration() - .map(|d| d.num_seconds() as u64) - .ok_or_else(|| "No range_vector token found".to_string())? - }; - - if window_type == WindowType::Sliding { - let result = if is_range_query { - range_duration / step + 1 - } else { - 1 - }; - return Ok(result); - } - - // Tumbling - let effective_repeat = get_effective_repeat(t_repeat, step); - - let result = match cleanup_policy { - CleanupPolicy::CircularBuffer => { - // ceil((t_lookback + range_duration) / effective_repeat) - let numerator = t_lookback + range_duration; - numerator.div_ceil(effective_repeat) - } - CleanupPolicy::ReadBased => { - // ceil(t_lookback / effective_repeat) * (range_duration / step + 1) - let lookback_buckets = t_lookback.div_ceil(effective_repeat); - let num_steps = if is_range_query { - range_duration / step + 1 - } else { - 1 - }; - lookback_buckets * num_steps - } - CleanupPolicy::NoCleanup => { - return Err("NoCleanup policy should not call get_cleanup_param".to_string()); - } - }; - - Ok(result) -} - -pub fn set_subpopulation_labels( - statistic: Statistic, - aggregation_type: AggregationType, - subpopulation_labels: &KeyByLabelNames, - rollup_labels: &mut KeyByLabelNames, - grouping_labels: &mut KeyByLabelNames, - aggregated_labels: &mut KeyByLabelNames, -) { - // rollup is set by caller before calling this function - let _ = rollup_labels; // not modified here - if does_precompute_operator_support_subpopulations(statistic, aggregation_type) { - *grouping_labels = KeyByLabelNames::empty(); - *aggregated_labels = subpopulation_labels.clone(); - } else { - *grouping_labels = subpopulation_labels.clone(); - *aggregated_labels = KeyByLabelNames::empty(); - } -} - -/// SQL cleanup param — SQL queries are always instant (no range_duration/step). -pub fn get_sql_cleanup_param( - cleanup_policy: CleanupPolicy, - t_lookback: u64, - t_repeat: u64, -) -> Result { - match cleanup_policy { - CleanupPolicy::CircularBuffer | CleanupPolicy::ReadBased => { - Ok(t_lookback.div_ceil(t_repeat)) - } - CleanupPolicy::NoCleanup => { - Err("NoCleanup policy should not call get_sql_cleanup_param".to_string()) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::planner::patterns::build_patterns; - - use promql_utilities::ast_matching::PromQLMatchResult; - use promql_utilities::query_logics::enums::QueryPatternType; - - fn match_query(query: &str) -> (QueryPatternType, PromQLMatchResult) { - let ast = promql_parser::parser::parse(query).unwrap(); - let patterns = build_patterns(); - for (pt, pattern) in &patterns { - let result = pattern.matches(&ast); - if result.matches { - return (*pt, result); - } - } - panic!("no pattern matched query: {}", query); - } - - // --- get_effective_repeat --- - - #[test] - fn effective_repeat_no_step() { - assert_eq!(get_effective_repeat(300, 0), 300); - } - - #[test] - fn effective_repeat_step_smaller_than_t_repeat() { - assert_eq!(get_effective_repeat(300, 30), 30); - } - - #[test] - fn effective_repeat_step_larger_than_t_repeat() { - assert_eq!(get_effective_repeat(30, 300), 30); - } - - // --- get_cleanup_param --- - - #[test] - fn cleanup_param_circular_buffer_spatial_instant_query() { - let (pt, mr) = match_query("sum(some_metric)"); - assert_eq!(pt, QueryPatternType::OnlySpatial); - // t_lookback = t_repeat = 300 (OnlySpatial path) - // effective_repeat = 300 (step=0) - // ceil((300 + 0) / 300) = 1 - let result = get_cleanup_param( - CleanupPolicy::CircularBuffer, - pt, - &mr, - 300, - WindowType::Tumbling, - 0, - 0, - ) - .unwrap(); - assert_eq!(result, 1); - } - - #[test] - fn cleanup_param_circular_buffer_spatial_range_query() { - let (pt, mr) = match_query("sum(some_metric)"); - // t_lookback = t_repeat = 300, effective_repeat = min(300, 30) = 30 - // ceil((300 + 3600) / 30) = ceil(130) = 130 - let result = get_cleanup_param( - CleanupPolicy::CircularBuffer, - pt, - &mr, - 300, - WindowType::Tumbling, - 3600, - 30, - ) - .unwrap(); - assert_eq!(result, 130); - } - - #[test] - fn cleanup_param_read_based_spatial_instant_query() { - let (pt, mr) = match_query("sum(some_metric)"); - // lookback_buckets = ceil(300/300) = 1, num_steps = 1 → result = 1 - let result = get_cleanup_param( - CleanupPolicy::ReadBased, - pt, - &mr, - 300, - WindowType::Tumbling, - 0, - 0, - ) - .unwrap(); - assert_eq!(result, 1); - } - - #[test] - fn cleanup_param_read_based_spatial_range_query() { - let (pt, mr) = match_query("sum(some_metric)"); - // lookback_buckets = ceil(300/30) = 10, num_steps = 3600/30 + 1 = 121 - // result = 10 * 121 = 1210 - let result = get_cleanup_param( - CleanupPolicy::ReadBased, - pt, - &mr, - 300, - WindowType::Tumbling, - 3600, - 30, - ) - .unwrap(); - assert_eq!(result, 1210); - } - - #[test] - fn cleanup_param_circular_buffer_temporal_instant_query() { - let (pt, mr) = match_query("rate(some_metric[5m])"); - assert_eq!(pt, QueryPatternType::OnlyTemporal); - // t_lookback = 5m = 300s (from [5m] range vector), range_duration=0, step=0 - // effective_repeat = 60, ceil((300 + 0) / 60) = 5 - let result = get_cleanup_param( - CleanupPolicy::CircularBuffer, - pt, - &mr, - 60, - WindowType::Tumbling, - 0, - 0, - ) - .unwrap(); - assert_eq!(result, 5); - } - - #[test] - fn cleanup_param_no_cleanup_returns_error() { - let (pt, mr) = match_query("sum(some_metric)"); - let result = get_cleanup_param( - CleanupPolicy::NoCleanup, - pt, - &mr, - 300, - WindowType::Tumbling, - 0, - 0, - ); - assert!(result.is_err()); - } - - #[test] - fn cleanup_param_mismatched_range_and_step_returns_error() { - let (pt, mr) = match_query("sum(some_metric)"); - // range_duration > 0 but step == 0 is invalid - let result = get_cleanup_param( - CleanupPolicy::CircularBuffer, - pt, - &mr, - 300, - WindowType::Tumbling, - 3600, - 0, - ); - assert!(result.is_err()); - } -} diff --git a/asap-planner-rs/src/planner/mod.rs b/asap-planner-rs/src/planner/mod.rs index 44475bec..ce9b172c 100644 --- a/asap-planner-rs/src/planner/mod.rs +++ b/asap-planner-rs/src/planner/mod.rs @@ -1,5 +1,10 @@ -pub mod logics; +pub mod agg_config; +pub mod cleanup; +pub mod labels; pub mod patterns; -pub mod single_query; -pub mod sql_single_query; -pub use single_query::*; +pub mod promql; +pub mod sketch; +pub mod sql; +pub mod window; +pub use agg_config::*; +pub use promql::*; diff --git a/asap-planner-rs/src/planner/single_query.rs b/asap-planner-rs/src/planner/promql.rs similarity index 60% rename from asap-planner-rs/src/planner/single_query.rs rename to asap-planner-rs/src/planner/promql.rs index 9bdb2255..84ff5c79 100644 --- a/asap-planner-rs/src/planner/single_query.rs +++ b/asap-planner-rs/src/planner/promql.rs @@ -1,27 +1,22 @@ -use asap_types::enums::{CleanupPolicy, WindowType}; +use asap_types::enums::CleanupPolicy; use asap_types::PromQLSchema; use promql_utilities::ast_matching::PromQLMatchResult; use promql_utilities::data_model::KeyByLabelNames; use promql_utilities::query_logics::enums::{ AggregationOperator, AggregationType, PromQLFunction, QueryPatternType, QueryTreatmentType, - Statistic, -}; -use promql_utilities::query_logics::logics::{ - get_is_collapsable, map_statistic_to_precompute_operator, }; +use promql_utilities::query_logics::logics::get_is_collapsable; use promql_utilities::query_logics::parsing::{ get_metric_and_spatial_filter, get_spatial_aggregation_output_labels, get_statistics_to_compute, }; -use serde_json::Value; -use std::collections::HashMap; use crate::config::input::SketchParameterOverrides; use crate::error::ControllerError; -use crate::planner::logics::{ - build_sketch_parameters_from_promql, get_cleanup_param, set_subpopulation_labels, - set_window_parameters, IntermediateWindowConfig, -}; +use crate::planner::agg_config::{build_agg_configs_for_statistics, IntermediateAggConfig}; +use crate::planner::cleanup::get_cleanup_param; use crate::planner::patterns::build_patterns; +use crate::planner::sketch::build_sketch_parameters_from_promql; +use crate::planner::window::{set_window_parameters, IntermediateWindowConfig}; use crate::StreamingEngine; /// Represents one arm of a binary arithmetic expression in the planner. @@ -55,66 +50,6 @@ fn strip_parens(expr: &promql_parser::parser::Expr) -> &promql_parser::parser::E } } -/// Internal representation of an aggregation config before IDs are assigned -#[derive(Debug, Clone)] -pub struct IntermediateAggConfig { - pub aggregation_type: AggregationType, - pub aggregation_sub_type: String, - pub window_type: WindowType, - pub window_size: u64, - pub slide_interval: u64, - pub spatial_filter: String, - pub metric: String, - pub table_name: Option, - pub value_column: Option, - pub parameters: HashMap, - pub rollup_labels: KeyByLabelNames, - pub grouping_labels: KeyByLabelNames, - pub aggregated_labels: KeyByLabelNames, -} - -impl IntermediateAggConfig { - /// Canonical deduplication key matching Python's get_identifying_key() - pub fn identifying_key(&self) -> String { - // Build a canonical string representation matching Python's tuple - let mut params_vec: Vec<(String, String)> = self - .parameters - .iter() - .map(|(k, v)| (k.clone(), v.to_string())) - .collect(); - params_vec.sort_by_key(|(k, _)| k.clone()); - - let mut label_parts = String::new(); - // sorted label keys: aggregated, grouping, rollup - let mut label_keys = vec!["aggregated", "grouping", "rollup"]; - label_keys.sort(); - for k in label_keys { - let labels = match k { - "aggregated" => &self.aggregated_labels, - "grouping" => &self.grouping_labels, - "rollup" => &self.rollup_labels, - _ => unreachable!(), - }; - label_parts.push_str(&format!("{}:{:?};", k, labels.labels)); - } - - format!( - "{}|{}|{}|{}|{}|{}|{}|{:?}|{:?}|{:?}|{}", - self.aggregation_type, - self.aggregation_sub_type, - self.window_type, - self.window_size, - self.slide_interval, - self.spatial_filter, - self.metric, - self.table_name, - self.value_column, - params_vec, - label_parts, - ) - } -} - pub struct SingleQueryProcessor { query: String, t_repeat: u64, @@ -394,159 +329,3 @@ fn get_label_routing( } } } - -/// Shared per-statistic config builder used by both PromQL and SQL paths. -/// -/// `get_params(agg_type, agg_sub_type)` is a closure supplied by the caller -/// that resolves sketch parameters; it is the only thing that differs between -/// the two paths. -#[allow(clippy::too_many_arguments)] -pub fn build_agg_configs_for_statistics( - statistics: &[Statistic], - treatment_type: QueryTreatmentType, - subpopulation_labels: &KeyByLabelNames, - rollup: &KeyByLabelNames, - window_cfg: &IntermediateWindowConfig, - metric: &str, - table_name: Option<&str>, - value_column: Option<&str>, - spatial_filter: &str, - get_params: impl Fn(AggregationType, &str) -> Result, String>, -) -> Result, String> { - let mut configs = Vec::new(); - - for statistic in statistics.iter().copied() { - let (agg_type, agg_sub_type) = - map_statistic_to_precompute_operator(statistic, treatment_type)?; - - let mut grouping = KeyByLabelNames::empty(); - let mut aggregated = KeyByLabelNames::empty(); - set_subpopulation_labels( - statistic, - agg_type, - subpopulation_labels, - &mut rollup.clone(), - &mut grouping, - &mut aggregated, - ); - - if matches!( - agg_type, - AggregationType::CountMinSketch | AggregationType::HydraKLL - ) { - let delta_params = get_params(AggregationType::DeltaSetAggregator, "")?; - configs.push(IntermediateAggConfig { - aggregation_type: AggregationType::DeltaSetAggregator, - aggregation_sub_type: String::new(), - window_type: window_cfg.window_type, - window_size: window_cfg.window_size, - slide_interval: window_cfg.slide_interval, - spatial_filter: spatial_filter.to_string(), - metric: metric.to_string(), - table_name: table_name.map(str::to_string), - value_column: value_column.map(str::to_string), - parameters: delta_params, - rollup_labels: rollup.clone(), - grouping_labels: grouping.clone(), - aggregated_labels: aggregated.clone(), - }); - } - - let parameters = get_params(agg_type, &agg_sub_type)?; - configs.push(IntermediateAggConfig { - aggregation_type: agg_type, - aggregation_sub_type: agg_sub_type, - window_type: window_cfg.window_type, - window_size: window_cfg.window_size, - slide_interval: window_cfg.slide_interval, - spatial_filter: spatial_filter.to_string(), - metric: metric.to_string(), - table_name: table_name.map(str::to_string), - value_column: value_column.map(str::to_string), - parameters, - rollup_labels: rollup.clone(), - grouping_labels: grouping, - aggregated_labels: aggregated, - }); - } - - Ok(configs) -} - -#[cfg(test)] -mod tests { - use super::*; - use serde_json::Value; - use std::collections::HashMap; - - fn base_config() -> IntermediateAggConfig { - IntermediateAggConfig { - aggregation_type: AggregationType::MultipleIncrease, - aggregation_sub_type: "rate".to_string(), - window_type: WindowType::Tumbling, - window_size: 300, - slide_interval: 300, - spatial_filter: String::new(), - metric: "http_requests_total".to_string(), - table_name: None, - value_column: None, - parameters: HashMap::new(), - rollup_labels: KeyByLabelNames::new(vec!["instance".to_string()]), - grouping_labels: KeyByLabelNames::empty(), - aggregated_labels: KeyByLabelNames::empty(), - } - } - - #[test] - fn identifying_key_is_stable() { - let cfg = base_config(); - assert_eq!(cfg.identifying_key(), cfg.identifying_key()); - } - - #[test] - fn identical_configs_have_same_key() { - assert_eq!( - base_config().identifying_key(), - base_config().identifying_key() - ); - } - - #[test] - fn different_aggregation_type_produces_different_key() { - let cfg1 = base_config(); - let mut cfg2 = base_config(); - cfg2.aggregation_type = AggregationType::DatasketchesKLL; - assert_ne!(cfg1.identifying_key(), cfg2.identifying_key()); - } - - #[test] - fn different_window_size_produces_different_key() { - let cfg1 = base_config(); - let mut cfg2 = base_config(); - cfg2.window_size = 60; - assert_ne!(cfg1.identifying_key(), cfg2.identifying_key()); - } - - #[test] - fn different_rollup_labels_produce_different_key() { - let cfg1 = base_config(); - let mut cfg2 = base_config(); - cfg2.rollup_labels = KeyByLabelNames::new(vec!["job".to_string()]); - assert_ne!(cfg1.identifying_key(), cfg2.identifying_key()); - } - - #[test] - fn parameter_insertion_order_does_not_affect_key() { - let mut cfg1 = base_config(); - let mut cfg2 = base_config(); - cfg1.parameters - .insert("depth".to_string(), Value::Number(3.into())); - cfg1.parameters - .insert("width".to_string(), Value::Number(1024.into())); - cfg2.parameters - .insert("width".to_string(), Value::Number(1024.into())); - cfg2.parameters - .insert("depth".to_string(), Value::Number(3.into())); - assert_eq!(cfg1.identifying_key(), cfg2.identifying_key()); - } -} diff --git a/asap-planner-rs/src/planner/sketch.rs b/asap-planner-rs/src/planner/sketch.rs new file mode 100644 index 00000000..40310bc7 --- /dev/null +++ b/asap-planner-rs/src/planner/sketch.rs @@ -0,0 +1,149 @@ +use crate::config::input::SketchParameterOverrides; +use promql_utilities::ast_matching::PromQLMatchResult; +use promql_utilities::query_logics::enums::AggregationType; +use std::collections::HashMap; + +// Default sketch parameters +const DEFAULT_CMS_DEPTH: u64 = 3; +const DEFAULT_CMS_WIDTH: u64 = 1024; +const DEFAULT_CMS_HEAP_MULT: u64 = 4; +const DEFAULT_KLL_K: u64 = 500; +const DEFAULT_HYDRA_ROW: u64 = 3; +const DEFAULT_HYDRA_COL: u64 = 1024; +const DEFAULT_HYDRA_K: u64 = 20; + +/// Shared sketch parameter builder used by both PromQL and SQL paths. +/// +/// `topk_k` is only required for `CountMinSketchWithHeap`: PromQL supplies it +/// from the `topk(k, …)` query argument; SQL passes `None` (SQL never produces +/// this operator today, so the `None` branch is unreachable in practice). +pub fn build_sketch_parameters( + aggregation_type: AggregationType, + aggregation_sub_type: &str, + topk_k: Option, + sketch_params: Option<&SketchParameterOverrides>, +) -> Result, String> { + match aggregation_type { + AggregationType::Increase + | AggregationType::MinMax + | AggregationType::Sum + | AggregationType::MultipleIncrease + | AggregationType::MultipleMinMax + | AggregationType::MultipleSum + | AggregationType::DeltaSetAggregator + | AggregationType::SetAggregator => Ok(HashMap::new()), + + AggregationType::CountMinSketch => { + let depth = sketch_params + .and_then(|p| p.count_min_sketch.as_ref()) + .map(|p| p.depth) + .unwrap_or(DEFAULT_CMS_DEPTH); + let width = sketch_params + .and_then(|p| p.count_min_sketch.as_ref()) + .map(|p| p.width) + .unwrap_or(DEFAULT_CMS_WIDTH); + let mut m = HashMap::new(); + m.insert("depth".to_string(), serde_json::Value::Number(depth.into())); + m.insert("width".to_string(), serde_json::Value::Number(width.into())); + Ok(m) + } + + AggregationType::CountMinSketchWithHeap => { + if aggregation_sub_type != "topk" { + return Err(format!( + "Aggregation sub-type {} for CountMinSketchWithHeap not supported", + aggregation_sub_type + )); + } + let k = topk_k + .ok_or_else(|| "CountMinSketchWithHeap requires a topk k value".to_string())?; + let depth = sketch_params + .and_then(|p| p.count_min_sketch_with_heap.as_ref()) + .map(|p| p.depth) + .unwrap_or(DEFAULT_CMS_DEPTH); + let width = sketch_params + .and_then(|p| p.count_min_sketch_with_heap.as_ref()) + .map(|p| p.width) + .unwrap_or(DEFAULT_CMS_WIDTH); + let heap_mult = sketch_params + .and_then(|p| p.count_min_sketch_with_heap.as_ref()) + .and_then(|p| p.heap_multiplier) + .unwrap_or(DEFAULT_CMS_HEAP_MULT); + let mut m = HashMap::new(); + m.insert("depth".to_string(), serde_json::Value::Number(depth.into())); + m.insert("width".to_string(), serde_json::Value::Number(width.into())); + m.insert( + "heapsize".to_string(), + serde_json::Value::Number((k * heap_mult).into()), + ); + Ok(m) + } + + AggregationType::DatasketchesKLL => { + let k = sketch_params + .and_then(|p| p.datasketches_kll.as_ref()) + .map(|p| p.k) + .unwrap_or(DEFAULT_KLL_K); + let mut m = HashMap::new(); + m.insert("K".to_string(), serde_json::Value::Number(k.into())); + Ok(m) + } + + AggregationType::HydraKLL => { + let row_num = sketch_params + .and_then(|p| p.hydra_kll.as_ref()) + .map(|p| p.row_num) + .unwrap_or(DEFAULT_HYDRA_ROW); + let col_num = sketch_params + .and_then(|p| p.hydra_kll.as_ref()) + .map(|p| p.col_num) + .unwrap_or(DEFAULT_HYDRA_COL); + let k = sketch_params + .and_then(|p| p.hydra_kll.as_ref()) + .map(|p| p.k) + .unwrap_or(DEFAULT_HYDRA_K); + let mut m = HashMap::new(); + m.insert( + "row_num".to_string(), + serde_json::Value::Number(row_num.into()), + ); + m.insert( + "col_num".to_string(), + serde_json::Value::Number(col_num.into()), + ); + m.insert("k".to_string(), serde_json::Value::Number(k.into())); + Ok(m) + } + + other => Err(format!("Aggregation type {} not supported", other)), + } +} + +/// PromQL wrapper: extracts the topk `k` from the match result when needed, +/// then delegates to `build_sketch_parameters`. +pub fn build_sketch_parameters_from_promql( + aggregation_type: AggregationType, + aggregation_sub_type: &str, + match_result: &PromQLMatchResult, + sketch_params: Option<&SketchParameterOverrides>, +) -> Result, String> { + let topk_k = if aggregation_type == AggregationType::CountMinSketchWithHeap { + let k: u64 = match_result + .tokens + .get("aggregation") + .and_then(|t| t.aggregation.as_ref()) + .and_then(|a| a.param.as_ref()) + .and_then(|p| p.parse::().ok()) + .map(|f| f as u64) + .ok_or_else(|| "topk query missing required 'k' parameter".to_string())?; + Some(k) + } else { + None + }; + build_sketch_parameters( + aggregation_type, + aggregation_sub_type, + topk_k, + sketch_params, + ) +} diff --git a/asap-planner-rs/src/planner/sql_single_query.rs b/asap-planner-rs/src/planner/sql.rs similarity index 96% rename from asap-planner-rs/src/planner/sql_single_query.rs rename to asap-planner-rs/src/planner/sql.rs index 057b3209..4bcb2b95 100644 --- a/asap-planner-rs/src/planner/sql_single_query.rs +++ b/asap-planner-rs/src/planner/sql.rs @@ -12,10 +12,10 @@ use sqlparser::parser::Parser as SqlParser; use crate::config::input::{SketchParameterOverrides, TableDefinition}; use crate::error::ControllerError; -use crate::planner::logics::{ - build_sketch_parameters, get_sql_cleanup_param, IntermediateWindowConfig, -}; -use crate::planner::single_query::{build_agg_configs_for_statistics, IntermediateAggConfig}; +use crate::planner::agg_config::{build_agg_configs_for_statistics, IntermediateAggConfig}; +use crate::planner::cleanup::get_sql_cleanup_param; +use crate::planner::sketch::build_sketch_parameters; +use crate::planner::window::IntermediateWindowConfig; use crate::StreamingEngine; pub struct SQLSingleQueryProcessor { diff --git a/asap-planner-rs/src/planner/window.rs b/asap-planner-rs/src/planner/window.rs new file mode 100644 index 00000000..171b09af --- /dev/null +++ b/asap-planner-rs/src/planner/window.rs @@ -0,0 +1,85 @@ +use asap_types::enums::WindowType; +use promql_utilities::query_logics::enums::QueryPatternType; + +pub fn get_effective_repeat(t_repeat: u64, step: u64) -> u64 { + if step > 0 { + t_repeat.min(step) + } else { + t_repeat + } +} + +pub fn should_use_sliding_window( + _query_pattern_type: QueryPatternType, + _aggregation_type: &str, +) -> bool { + // HARDCODED: sliding windows crash Arroyo + false +} + +pub fn set_window_parameters( + query_pattern_type: QueryPatternType, + t_repeat: u64, + prometheus_scrape_interval: u64, + aggregation_type: &str, + step: u64, + config: &mut IntermediateWindowConfig, +) { + let effective_repeat = get_effective_repeat(t_repeat, step); + let _use_sliding = should_use_sliding_window(query_pattern_type, aggregation_type); + // use_sliding is always false, so always tumbling + set_tumbling_window_parameters( + query_pattern_type, + effective_repeat, + prometheus_scrape_interval, + config, + ); +} + +fn set_tumbling_window_parameters( + query_pattern_type: QueryPatternType, + effective_repeat: u64, + prometheus_scrape_interval: u64, + config: &mut IntermediateWindowConfig, +) { + match query_pattern_type { + QueryPatternType::OnlyTemporal | QueryPatternType::OneTemporalOneSpatial => { + config.window_size = effective_repeat; + config.slide_interval = effective_repeat; + config.window_type = WindowType::Tumbling; + } + QueryPatternType::OnlySpatial => { + config.window_size = prometheus_scrape_interval; + config.slide_interval = prometheus_scrape_interval; + config.window_type = WindowType::Tumbling; + } + } +} + +/// A mutable window config holder used during planning +#[derive(Debug, Clone, Default)] +pub struct IntermediateWindowConfig { + pub window_size: u64, + pub slide_interval: u64, + pub window_type: WindowType, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn effective_repeat_no_step() { + assert_eq!(get_effective_repeat(300, 0), 300); + } + + #[test] + fn effective_repeat_step_smaller_than_t_repeat() { + assert_eq!(get_effective_repeat(300, 30), 30); + } + + #[test] + fn effective_repeat_step_larger_than_t_repeat() { + assert_eq!(get_effective_repeat(30, 300), 30); + } +} diff --git a/asap-planner-rs/src/planner_output.rs b/asap-planner-rs/src/planner_output.rs new file mode 100644 index 00000000..f0b98c1d --- /dev/null +++ b/asap-planner-rs/src/planner_output.rs @@ -0,0 +1,230 @@ +use serde_yaml::Value as YamlValue; + +use asap_types::enums::QueryLanguage; +use asap_types::inference_config::InferenceConfig; +use asap_types::streaming_config::StreamingConfig; + +use crate::output::generator::{ + GeneratorOutput, PuntedQuery, KEY_AGGREGATIONS, KEY_AGG_SUB_TYPE, KEY_AGG_TYPE, KEY_LABELS, + KEY_NUM_AGG_TO_RETAIN, KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD, KEY_TABLE_NAME, + KEY_VALUE_COLUMN, KEY_WINDOW_SIZE, +}; + +/// Output of the planning process — contains the two YAML configs +pub struct PlannerOutput { + pub punted_queries: Vec, + streaming_yaml: YamlValue, + inference_yaml: YamlValue, + aggregation_count: usize, + query_count: usize, +} + +impl PlannerOutput { + pub(crate) fn from_output(output: GeneratorOutput) -> Self { + Self { + punted_queries: output.punted_queries, + streaming_yaml: output.streaming_yaml, + inference_yaml: output.inference_yaml, + aggregation_count: output.aggregation_count, + query_count: output.query_count, + } + } + + pub(crate) fn streaming_yaml(&self) -> &YamlValue { + &self.streaming_yaml + } + + pub(crate) fn inference_yaml(&self) -> &YamlValue { + &self.inference_yaml + } + + pub fn streaming_aggregation_count(&self) -> usize { + self.aggregation_count + } + + pub fn inference_query_count(&self) -> usize { + self.query_count + } + + fn streaming_aggs_slice(&self) -> Option<&[YamlValue]> { + if let YamlValue::Mapping(root) = &self.streaming_yaml { + if let Some(YamlValue::Sequence(aggs)) = root.get(KEY_AGGREGATIONS) { + return Some(aggs.as_slice()); + } + } + None + } + + fn find_aggregation_by_type(&self, agg_type: &str) -> Option<&serde_yaml::Mapping> { + self.streaming_aggs_slice()?.iter().find_map(|agg| { + if let YamlValue::Mapping(m) = agg { + if matches!(m.get(KEY_AGG_TYPE), Some(YamlValue::String(s)) if s == agg_type) { + return Some(m); + } + } + None + }) + } + + pub fn has_aggregation_type(&self, t: &str) -> bool { + self.find_aggregation_by_type(t).is_some() + } + + pub fn all_tumbling_window_sizes_eq(&self, s: u64) -> bool { + self.check_tumbling_window_sizes(|size| size == s) + } + + pub fn all_tumbling_window_sizes_leq(&self, s: u64) -> bool { + self.check_tumbling_window_sizes(|size| size <= s) + } + + fn check_tumbling_window_sizes(&self, predicate: impl Fn(u64) -> bool) -> bool { + self.streaming_aggs_slice() + .map(|aggs| { + aggs.iter().all(|agg| { + if let YamlValue::Mapping(m) = agg { + if let Some(val) = m.get(KEY_WINDOW_SIZE) { + let size = match val { + YamlValue::Number(n) => n.as_u64().unwrap_or(0), + _ => 0, + }; + return predicate(size); + } + } + false + }) + }) + .unwrap_or(false) + } + + /// Returns the sorted labels for the first aggregation matching `agg_type`, + /// for the given `label_kind` ("rollup", "grouping", or "aggregated"). + pub fn aggregation_labels(&self, agg_type: &str, label_kind: &str) -> Vec { + let Some(seq) = self + .find_aggregation_by_type(agg_type) + .and_then(|m| m.get(KEY_LABELS)) + .and_then(|v| { + if let YamlValue::Mapping(lm) = v { + Some(lm) + } else { + None + } + }) + .and_then(|lm| lm.get(label_kind)) + .and_then(|v| { + if let YamlValue::Sequence(seq) = v { + Some(seq) + } else { + None + } + }) + else { + return vec![]; + }; + let mut result: Vec = seq + .iter() + .filter_map(|v| { + if let YamlValue::String(s) = v { + Some(s.clone()) + } else { + None + } + }) + .collect(); + result.sort(); + result + } + + /// Returns the cleanup param (read_count_threshold or num_aggregates_to_retain) + /// for the first aggregation entry of the given query string. + pub fn inference_cleanup_param(&self, query: &str) -> Option { + if let YamlValue::Mapping(root) = &self.inference_yaml { + if let Some(YamlValue::Sequence(queries)) = root.get(KEY_QUERIES) { + for q in queries { + if let YamlValue::Mapping(qm) = q { + if let Some(YamlValue::String(qs)) = qm.get(KEY_QUERY) { + if qs == query { + if let Some(YamlValue::Sequence(aggs)) = qm.get(KEY_AGGREGATIONS) { + if let Some(YamlValue::Mapping(agg)) = aggs.first() { + for key in [KEY_READ_COUNT_THRESHOLD, KEY_NUM_AGG_TO_RETAIN] + { + if let Some(YamlValue::Number(n)) = agg.get(key) { + return n.as_u64(); + } + } + } + } + } + } + } + } + } + } + None + } + + pub fn to_streaming_yaml_string(&self) -> Result { + Ok(serde_yaml::to_string(&self.streaming_yaml)?) + } + + pub fn to_inference_yaml_string(&self) -> Result { + Ok(serde_yaml::to_string(&self.inference_yaml)?) + } + + pub fn to_streaming_config( + &self, + query_language: QueryLanguage, + ) -> Result { + let inference_config = self.to_inference_config(query_language)?; + StreamingConfig::from_yaml_data(&self.streaming_yaml, Some(&inference_config)) + } + + pub fn to_inference_config( + &self, + query_language: QueryLanguage, + ) -> Result { + InferenceConfig::from_yaml_data(&self.inference_yaml, query_language) + } + + /// Returns the table_name field of the first aggregation matching agg_type. + pub fn aggregation_table_name(&self, agg_type: &str) -> Option { + self.find_aggregation_by_type(agg_type) + .and_then(|m| m.get(KEY_TABLE_NAME)) + .and_then(|v| { + if let YamlValue::String(s) = v { + Some(s.clone()) + } else { + None + } + }) + } + + /// Returns the value_column field of the first aggregation matching agg_type. + pub fn aggregation_value_column(&self, agg_type: &str) -> Option { + self.find_aggregation_by_type(agg_type) + .and_then(|m| m.get(KEY_VALUE_COLUMN)) + .and_then(|v| { + if let YamlValue::String(s) = v { + Some(s.clone()) + } else { + None + } + }) + } + + /// Returns true if any aggregation has the matching type AND sub_type. + pub fn has_aggregation_type_and_sub_type(&self, agg_type: &str, sub_type: &str) -> bool { + self.streaming_aggs_slice() + .map(|aggs| { + aggs.iter().any(|agg| { + if let YamlValue::Mapping(m) = agg { + matches!(m.get(KEY_AGG_TYPE), Some(YamlValue::String(s)) if s == agg_type) + && matches!(m.get(KEY_AGG_SUB_TYPE), Some(YamlValue::String(s)) if s == sub_type) + } else { + false + } + }) + }) + .unwrap_or(false) + } +} diff --git a/asap-planner-rs/src/sql_controller.rs b/asap-planner-rs/src/sql_controller.rs new file mode 100644 index 00000000..8df9546c --- /dev/null +++ b/asap-planner-rs/src/sql_controller.rs @@ -0,0 +1,46 @@ +use std::path::Path; + +use crate::config::input::SQLControllerConfig; +use crate::error::ControllerError; +use crate::output; +use crate::output::sql_generator::SQLRuntimeOptions; +use crate::planner_output::PlannerOutput; + +pub struct SQLController { + config: SQLControllerConfig, + options: SQLRuntimeOptions, +} + +impl SQLController { + pub fn new(config: SQLControllerConfig, options: SQLRuntimeOptions) -> Self { + Self { config, options } + } + + pub fn from_file(path: &Path, opts: SQLRuntimeOptions) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + Self::from_yaml(&yaml_str, opts) + } + + pub fn from_yaml(yaml: &str, opts: SQLRuntimeOptions) -> Result { + let config: SQLControllerConfig = serde_yaml::from_str(yaml)?; + Ok(Self { + config, + options: opts, + }) + } + + pub fn generate(&self) -> Result { + let output = output::sql_generator::generate_sql_plan(&self.config, &self.options)?; + Ok(PlannerOutput::from_output(output)) + } + + pub fn generate_to_dir(&self, dir: &Path) -> Result { + let output = self.generate()?; + std::fs::create_dir_all(dir)?; + let streaming_str = serde_yaml::to_string(output.streaming_yaml())?; + let inference_str = serde_yaml::to_string(output.inference_yaml())?; + std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?; + std::fs::write(dir.join("inference_config.yaml"), inference_str)?; + Ok(output) + } +} From 007369156991a3dfa616b19c88b32f263207bc4a Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Thu, 7 May 2026 09:30:01 -0400 Subject: [PATCH 2/2] more refactor --- asap-planner-rs/src/lib.rs | 13 ++++++------- asap-planner-rs/src/output/mod.rs | 3 --- asap-planner-rs/src/planner_output.rs | 2 +- asap-planner-rs/src/{ => promql}/controller.rs | 5 +++-- asap-planner-rs/src/{output => promql}/generator.rs | 0 asap-planner-rs/src/promql/mod.rs | 4 ++++ .../src/{sql_controller.rs => sql/controller.rs} | 6 +++--- .../{output/sql_generator.rs => sql/generator.rs} | 7 ++++--- asap-planner-rs/src/sql/mod.rs | 4 ++++ 9 files changed, 25 insertions(+), 19 deletions(-) delete mode 100644 asap-planner-rs/src/output/mod.rs rename asap-planner-rs/src/{ => promql}/controller.rs (97%) rename asap-planner-rs/src/{output => promql}/generator.rs (100%) create mode 100644 asap-planner-rs/src/promql/mod.rs rename asap-planner-rs/src/{sql_controller.rs => sql/controller.rs} (89%) rename asap-planner-rs/src/{output/sql_generator.rs => sql/generator.rs} (98%) create mode 100644 asap-planner-rs/src/sql/mod.rs diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 8930e310..573702dd 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -1,23 +1,22 @@ pub mod config; -pub mod controller; pub mod error; -pub mod output; pub mod planner; pub mod planner_output; pub mod prometheus_client; +pub mod promql; pub mod query_log; -pub mod sql_controller; +pub mod sql; pub use asap_types::PromQLSchema; pub use config::input::ControllerConfig; pub use config::input::SQLControllerConfig; -pub use controller::Controller; pub use error::ControllerError; -pub use output::generator::{GeneratorOutput, PuntedQuery}; -pub use output::sql_generator::SQLRuntimeOptions; pub use planner_output::PlannerOutput; pub use prometheus_client::build_schema_from_prometheus; -pub use sql_controller::SQLController; +pub use promql::generator::{GeneratorOutput, PuntedQuery}; +pub use promql::Controller; +pub use sql::SQLController; +pub use sql::SQLRuntimeOptions; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamingEngine { diff --git a/asap-planner-rs/src/output/mod.rs b/asap-planner-rs/src/output/mod.rs deleted file mode 100644 index 63c14cbb..00000000 --- a/asap-planner-rs/src/output/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod generator; -pub mod sql_generator; -pub use generator::*; diff --git a/asap-planner-rs/src/planner_output.rs b/asap-planner-rs/src/planner_output.rs index f0b98c1d..7eefea81 100644 --- a/asap-planner-rs/src/planner_output.rs +++ b/asap-planner-rs/src/planner_output.rs @@ -4,7 +4,7 @@ use asap_types::enums::QueryLanguage; use asap_types::inference_config::InferenceConfig; use asap_types::streaming_config::StreamingConfig; -use crate::output::generator::{ +use crate::promql::generator::{ GeneratorOutput, PuntedQuery, KEY_AGGREGATIONS, KEY_AGG_SUB_TYPE, KEY_AGG_TYPE, KEY_LABELS, KEY_NUM_AGG_TO_RETAIN, KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD, KEY_TABLE_NAME, KEY_VALUE_COLUMN, KEY_WINDOW_SIZE, diff --git a/asap-planner-rs/src/controller.rs b/asap-planner-rs/src/promql/controller.rs similarity index 97% rename from asap-planner-rs/src/controller.rs rename to asap-planner-rs/src/promql/controller.rs index d1ebd69c..24b4e6ba 100644 --- a/asap-planner-rs/src/controller.rs +++ b/asap-planner-rs/src/promql/controller.rs @@ -4,11 +4,12 @@ use tracing::debug; use asap_types::PromQLSchema; use promql_utilities::data_model::KeyByLabelNames; +use super::generator; use crate::config::input::ControllerConfig; use crate::error::ControllerError; use crate::planner_output::PlannerOutput; use crate::RuntimeOptions; -use crate::{output, prometheus_client, query_log}; +use crate::{prometheus_client, query_log}; pub struct Controller { config: ControllerConfig, @@ -145,7 +146,7 @@ impl Controller { } pub fn generate(&self) -> Result { - let output = output::generator::generate_plan(&self.config, &self.schema, &self.options)?; + let output = generator::generate_plan(&self.config, &self.schema, &self.options)?; Ok(PlannerOutput::from_output(output)) } diff --git a/asap-planner-rs/src/output/generator.rs b/asap-planner-rs/src/promql/generator.rs similarity index 100% rename from asap-planner-rs/src/output/generator.rs rename to asap-planner-rs/src/promql/generator.rs diff --git a/asap-planner-rs/src/promql/mod.rs b/asap-planner-rs/src/promql/mod.rs new file mode 100644 index 00000000..a08205cc --- /dev/null +++ b/asap-planner-rs/src/promql/mod.rs @@ -0,0 +1,4 @@ +pub mod controller; +pub mod generator; +pub use controller::Controller; +pub use generator::{GeneratorOutput, PuntedQuery}; diff --git a/asap-planner-rs/src/sql_controller.rs b/asap-planner-rs/src/sql/controller.rs similarity index 89% rename from asap-planner-rs/src/sql_controller.rs rename to asap-planner-rs/src/sql/controller.rs index 8df9546c..fb4f5aaa 100644 --- a/asap-planner-rs/src/sql_controller.rs +++ b/asap-planner-rs/src/sql/controller.rs @@ -1,10 +1,10 @@ use std::path::Path; +use super::generator; use crate::config::input::SQLControllerConfig; use crate::error::ControllerError; -use crate::output; -use crate::output::sql_generator::SQLRuntimeOptions; use crate::planner_output::PlannerOutput; +use crate::sql::generator::SQLRuntimeOptions; pub struct SQLController { config: SQLControllerConfig, @@ -30,7 +30,7 @@ impl SQLController { } pub fn generate(&self) -> Result { - let output = output::sql_generator::generate_sql_plan(&self.config, &self.options)?; + let output = generator::generate_sql_plan(&self.config, &self.options)?; Ok(PlannerOutput::from_output(output)) } diff --git a/asap-planner-rs/src/output/sql_generator.rs b/asap-planner-rs/src/sql/generator.rs similarity index 98% rename from asap-planner-rs/src/output/sql_generator.rs rename to asap-planner-rs/src/sql/generator.rs index 881f532b..1e36b3b8 100644 --- a/asap-planner-rs/src/output/sql_generator.rs +++ b/asap-planner-rs/src/sql/generator.rs @@ -6,13 +6,14 @@ use std::time::{SystemTime, UNIX_EPOCH}; use crate::config::input::SQLControllerConfig; use crate::error::ControllerError; -use crate::output::generator::{ +// Shared YAML helpers and types live in promql::generator until PR #235 extracts them. +use crate::planner::agg_config::IntermediateAggConfig; +use crate::planner::sql::SQLSingleQueryProcessor; +use crate::promql::generator::{ build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, KEY_CLEANUP_POLICY, KEY_METADATA_COLUMNS, KEY_NAME, KEY_QUERIES, KEY_TABLES, KEY_TIME_COLUMN, KEY_VALUE_COLUMNS, }; -use crate::planner::agg_config::IntermediateAggConfig; -use crate::planner::sql::SQLSingleQueryProcessor; use crate::StreamingEngine; pub struct SQLRuntimeOptions { diff --git a/asap-planner-rs/src/sql/mod.rs b/asap-planner-rs/src/sql/mod.rs new file mode 100644 index 00000000..a610b39c --- /dev/null +++ b/asap-planner-rs/src/sql/mod.rs @@ -0,0 +1,4 @@ +pub mod controller; +pub mod generator; +pub use controller::SQLController; +pub use generator::SQLRuntimeOptions;