diff --git a/asap-planner-rs/src/generator.rs b/asap-planner-rs/src/generator.rs new file mode 100644 index 0000000..62bc29d --- /dev/null +++ b/asap-planner-rs/src/generator.rs @@ -0,0 +1,217 @@ +use indexmap::IndexMap; +use serde_json::Value as JsonValue; +use serde_yaml::Value as YamlValue; +use std::collections::HashMap; + +use asap_types::enums::CleanupPolicy; +use promql_utilities::data_model::KeyByLabelNames; + +use crate::planner::agg_config::IntermediateAggConfig; + +// YAML key constants — shared by both SQL and PromQL generators +pub(crate) const KEY_AGG_ID: &str = "aggregationId"; +pub(crate) const KEY_AGG_SUB_TYPE: &str = "aggregationSubType"; +pub(crate) const KEY_AGG_TYPE: &str = "aggregationType"; +pub(crate) const KEY_AGGREGATION_ID: &str = "aggregation_id"; +pub(crate) const KEY_AGGREGATIONS: &str = "aggregations"; +pub(crate) const KEY_CLEANUP_POLICY: &str = "cleanup_policy"; +pub(crate) const KEY_LABELS: &str = "labels"; +pub(crate) const KEY_LABELS_AGGREGATED: &str = "aggregated"; +pub(crate) const KEY_LABELS_GROUPING: &str = "grouping"; +pub(crate) const KEY_LABELS_ROLLUP: &str = "rollup"; +pub(crate) const KEY_METADATA_COLUMNS: &str = "metadata_columns"; +pub(crate) const KEY_METRIC: &str = "metric"; +pub(crate) const KEY_METRICS: &str = "metrics"; +pub(crate) const KEY_NAME: &str = "name"; +pub(crate) const KEY_NUM_AGG_TO_RETAIN: &str = "num_aggregates_to_retain"; +pub(crate) const KEY_PARAMETERS: &str = "parameters"; +pub(crate) const KEY_QUERIES: &str = "queries"; +pub(crate) const KEY_QUERY: &str = "query"; +pub(crate) const KEY_READ_COUNT_THRESHOLD: &str = "read_count_threshold"; +pub(crate) const KEY_SLIDE_INTERVAL: &str = "slideInterval"; +pub(crate) const KEY_SPATIAL_FILTER: &str = "spatialFilter"; +pub(crate) const KEY_TABLE_NAME: &str = "table_name"; +pub(crate) const KEY_TABLES: &str = "tables"; +pub(crate) const KEY_TIME_COLUMN: &str = "time_column"; +pub(crate) const KEY_VALUE_COLUMN: &str = "value_column"; +pub(crate) const KEY_VALUE_COLUMNS: &str = "value_columns"; +pub(crate) const KEY_WINDOW_SIZE: &str = "windowSize"; +pub(crate) const KEY_WINDOW_TYPE: &str = "windowType"; + +pub fn key_by_labels_to_yaml(labels: &KeyByLabelNames) -> YamlValue { + YamlValue::Sequence( + labels + .labels + .iter() + .map(|l| YamlValue::String(l.clone())) + .collect(), + ) +} + +pub fn build_aggregation_entry(id: u32, cfg: &IntermediateAggConfig) -> YamlValue { + let mut map = serde_yaml::Mapping::new(); + map.insert( + YamlValue::String(KEY_AGG_ID.to_string()), + YamlValue::Number(id.into()), + ); + map.insert( + YamlValue::String(KEY_AGG_SUB_TYPE.to_string()), + YamlValue::String(cfg.aggregation_sub_type.clone()), + ); + map.insert( + YamlValue::String(KEY_AGG_TYPE.to_string()), + YamlValue::String(cfg.aggregation_type.to_string()), + ); + + let mut labels_map = serde_yaml::Mapping::new(); + labels_map.insert( + YamlValue::String(KEY_LABELS_AGGREGATED.to_string()), + key_by_labels_to_yaml(&cfg.aggregated_labels), + ); + labels_map.insert( + YamlValue::String(KEY_LABELS_GROUPING.to_string()), + key_by_labels_to_yaml(&cfg.grouping_labels), + ); + labels_map.insert( + YamlValue::String(KEY_LABELS_ROLLUP.to_string()), + key_by_labels_to_yaml(&cfg.rollup_labels), + ); + map.insert( + YamlValue::String(KEY_LABELS.to_string()), + YamlValue::Mapping(labels_map), + ); + + map.insert( + YamlValue::String(KEY_METRIC.to_string()), + YamlValue::String(cfg.metric.clone()), + ); + map.insert( + YamlValue::String(KEY_PARAMETERS.to_string()), + params_to_yaml(&cfg.parameters), + ); + map.insert( + YamlValue::String(KEY_SLIDE_INTERVAL.to_string()), + YamlValue::Number(cfg.slide_interval.into()), + ); + map.insert( + YamlValue::String(KEY_SPATIAL_FILTER.to_string()), + YamlValue::String(cfg.spatial_filter.clone()), + ); + map.insert( + YamlValue::String(KEY_TABLE_NAME.to_string()), + match &cfg.table_name { + Some(t) => YamlValue::String(t.clone()), + None => YamlValue::Null, + }, + ); + map.insert( + YamlValue::String(KEY_VALUE_COLUMN.to_string()), + match &cfg.value_column { + Some(v) => YamlValue::String(v.clone()), + None => YamlValue::Null, + }, + ); + map.insert( + YamlValue::String(KEY_WINDOW_SIZE.to_string()), + YamlValue::Number(cfg.window_size.into()), + ); + map.insert( + YamlValue::String(KEY_WINDOW_TYPE.to_string()), + YamlValue::String(cfg.window_type.to_string()), + ); + + YamlValue::Mapping(map) +} + +pub fn build_queries_yaml( + cleanup_policy: CleanupPolicy, + query_keys_map: &IndexMap)>>, + id_map: &HashMap, +) -> Vec { + query_keys_map + .iter() + .map(|(query_str, keys)| { + let aggregations: Vec = keys + .iter() + .map(|(key, cleanup_param)| { + let agg_id = id_map[key]; + let mut agg_map = serde_yaml::Mapping::new(); + agg_map.insert( + YamlValue::String(KEY_AGGREGATION_ID.to_string()), + YamlValue::Number(agg_id.into()), + ); + if let Some(param) = cleanup_param { + match cleanup_policy { + CleanupPolicy::CircularBuffer => { + agg_map.insert( + YamlValue::String(KEY_NUM_AGG_TO_RETAIN.to_string()), + YamlValue::Number((*param).into()), + ); + } + CleanupPolicy::ReadBased => { + agg_map.insert( + YamlValue::String(KEY_READ_COUNT_THRESHOLD.to_string()), + YamlValue::Number((*param).into()), + ); + } + CleanupPolicy::NoCleanup => {} + } + } + YamlValue::Mapping(agg_map) + }) + .collect(); + + let mut q_map = serde_yaml::Mapping::new(); + q_map.insert( + YamlValue::String(KEY_AGGREGATIONS.to_string()), + YamlValue::Sequence(aggregations), + ); + q_map.insert( + YamlValue::String(KEY_QUERY.to_string()), + YamlValue::String(query_str.clone()), + ); + YamlValue::Mapping(q_map) + }) + .collect() +} + +pub fn params_to_yaml(params: &HashMap) -> YamlValue { + if params.is_empty() { + return YamlValue::Mapping(serde_yaml::Mapping::new()); + } + let mut map = serde_yaml::Mapping::new(); + // Sort for determinism + let mut sorted: Vec<_> = params.iter().collect(); + sorted.sort_by_key(|(k, _)| k.as_str()); + for (k, v) in sorted { + let yaml_val = match v { + JsonValue::Number(n) => { + if let Some(i) = n.as_u64() { + YamlValue::Number(serde_yaml::Number::from(i)) + } else if let Some(f) = n.as_f64() { + YamlValue::Number(serde_yaml::Number::from(f)) + } else { + YamlValue::String(n.to_string()) + } + } + JsonValue::String(s) => YamlValue::String(s.clone()), + JsonValue::Bool(b) => YamlValue::Bool(*b), + other => YamlValue::String(other.to_string()), + }; + map.insert(YamlValue::String(k.clone()), yaml_val); + } + YamlValue::Mapping(map) +} + +#[derive(Debug, Clone)] +pub struct PuntedQuery { + pub query: String, +} + +pub struct GeneratorOutput { + pub punted_queries: Vec, + pub streaming_yaml: YamlValue, + pub inference_yaml: YamlValue, + pub aggregation_count: usize, + pub query_count: usize, +} diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 573702d..c73dfbd 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -1,5 +1,6 @@ pub mod config; pub mod error; +pub mod generator; pub mod planner; pub mod planner_output; pub mod prometheus_client; @@ -11,9 +12,9 @@ pub use asap_types::PromQLSchema; pub use config::input::ControllerConfig; pub use config::input::SQLControllerConfig; pub use error::ControllerError; +pub use generator::{GeneratorOutput, PuntedQuery}; pub use planner_output::PlannerOutput; pub use prometheus_client::build_schema_from_prometheus; -pub use promql::generator::{GeneratorOutput, PuntedQuery}; pub use promql::Controller; pub use sql::SQLController; pub use sql::SQLRuntimeOptions; diff --git a/asap-planner-rs/src/planner_output.rs b/asap-planner-rs/src/planner_output.rs index 7eefea8..71b3ad1 100644 --- a/asap-planner-rs/src/planner_output.rs +++ b/asap-planner-rs/src/planner_output.rs @@ -4,7 +4,7 @@ use asap_types::enums::QueryLanguage; use asap_types::inference_config::InferenceConfig; use asap_types::streaming_config::StreamingConfig; -use crate::promql::generator::{ +use crate::generator::{ GeneratorOutput, PuntedQuery, KEY_AGGREGATIONS, KEY_AGG_SUB_TYPE, KEY_AGG_TYPE, KEY_LABELS, KEY_NUM_AGG_TO_RETAIN, KEY_QUERIES, KEY_QUERY, KEY_READ_COUNT_THRESHOLD, KEY_TABLE_NAME, KEY_VALUE_COLUMN, KEY_WINDOW_SIZE, diff --git a/asap-planner-rs/src/promql/generator.rs b/asap-planner-rs/src/promql/generator.rs index 42a3276..486d9c3 100644 --- a/asap-planner-rs/src/promql/generator.rs +++ b/asap-planner-rs/src/promql/generator.rs @@ -1,48 +1,20 @@ use indexmap::IndexMap; -use serde_json::Value as JsonValue; use serde_yaml::Value as YamlValue; use std::collections::HashMap; use asap_types::enums::CleanupPolicy; use asap_types::PromQLSchema; -use promql_utilities::data_model::KeyByLabelNames; use crate::config::input::ControllerConfig; use crate::error::ControllerError; +use crate::generator::{ + build_aggregation_entry, build_queries_yaml, key_by_labels_to_yaml, GeneratorOutput, + PuntedQuery, KEY_AGGREGATIONS, KEY_CLEANUP_POLICY, KEY_METRICS, KEY_NAME, KEY_QUERIES, +}; use crate::planner::agg_config::IntermediateAggConfig; use crate::planner::promql::{BinaryArm, SingleQueryProcessor}; use crate::RuntimeOptions; -// YAML key constants — shared with sql_generator.rs and lib.rs via pub(crate) -pub(crate) const KEY_AGG_ID: &str = "aggregationId"; -pub(crate) const KEY_AGG_SUB_TYPE: &str = "aggregationSubType"; -pub(crate) const KEY_AGG_TYPE: &str = "aggregationType"; -pub(crate) const KEY_AGGREGATION_ID: &str = "aggregation_id"; -pub(crate) const KEY_AGGREGATIONS: &str = "aggregations"; -pub(crate) const KEY_CLEANUP_POLICY: &str = "cleanup_policy"; -pub(crate) const KEY_LABELS: &str = "labels"; -pub(crate) const KEY_LABELS_AGGREGATED: &str = "aggregated"; -pub(crate) const KEY_LABELS_GROUPING: &str = "grouping"; -pub(crate) const KEY_LABELS_ROLLUP: &str = "rollup"; -pub(crate) const KEY_METADATA_COLUMNS: &str = "metadata_columns"; -pub(crate) const KEY_METRIC: &str = "metric"; -pub(crate) const KEY_METRICS: &str = "metrics"; -pub(crate) const KEY_NAME: &str = "name"; -pub(crate) const KEY_NUM_AGG_TO_RETAIN: &str = "num_aggregates_to_retain"; -pub(crate) const KEY_PARAMETERS: &str = "parameters"; -pub(crate) const KEY_QUERIES: &str = "queries"; -pub(crate) const KEY_QUERY: &str = "query"; -pub(crate) const KEY_READ_COUNT_THRESHOLD: &str = "read_count_threshold"; -pub(crate) const KEY_SLIDE_INTERVAL: &str = "slideInterval"; -pub(crate) const KEY_SPATIAL_FILTER: &str = "spatialFilter"; -pub(crate) const KEY_TABLE_NAME: &str = "table_name"; -pub(crate) const KEY_TABLES: &str = "tables"; -pub(crate) const KEY_TIME_COLUMN: &str = "time_column"; -pub(crate) const KEY_VALUE_COLUMN: &str = "value_column"; -pub(crate) const KEY_VALUE_COLUMNS: &str = "value_columns"; -pub(crate) const KEY_WINDOW_SIZE: &str = "windowSize"; -pub(crate) const KEY_WINDOW_TYPE: &str = "windowType"; - /// `(query_string, Vec<(identifying_key, cleanup_param)>)` pairs produced by binary leaf decomposition. type LeafEntries = Vec<(String, Vec<(String, Option)>)>; @@ -212,171 +184,6 @@ fn collect_binary_leaf_entries( Ok(Some(all_entries)) } -pub fn key_by_labels_to_yaml(labels: &KeyByLabelNames) -> YamlValue { - YamlValue::Sequence( - labels - .labels - .iter() - .map(|l| YamlValue::String(l.clone())) - .collect(), - ) -} - -pub fn build_aggregation_entry(id: u32, cfg: &IntermediateAggConfig) -> YamlValue { - let mut map = serde_yaml::Mapping::new(); - map.insert( - YamlValue::String(KEY_AGG_ID.to_string()), - YamlValue::Number(id.into()), - ); - map.insert( - YamlValue::String(KEY_AGG_SUB_TYPE.to_string()), - YamlValue::String(cfg.aggregation_sub_type.clone()), - ); - map.insert( - YamlValue::String(KEY_AGG_TYPE.to_string()), - YamlValue::String(cfg.aggregation_type.to_string()), - ); - - let mut labels_map = serde_yaml::Mapping::new(); - labels_map.insert( - YamlValue::String(KEY_LABELS_AGGREGATED.to_string()), - key_by_labels_to_yaml(&cfg.aggregated_labels), - ); - labels_map.insert( - YamlValue::String(KEY_LABELS_GROUPING.to_string()), - key_by_labels_to_yaml(&cfg.grouping_labels), - ); - labels_map.insert( - YamlValue::String(KEY_LABELS_ROLLUP.to_string()), - key_by_labels_to_yaml(&cfg.rollup_labels), - ); - map.insert( - YamlValue::String(KEY_LABELS.to_string()), - YamlValue::Mapping(labels_map), - ); - - map.insert( - YamlValue::String(KEY_METRIC.to_string()), - YamlValue::String(cfg.metric.clone()), - ); - map.insert( - YamlValue::String(KEY_PARAMETERS.to_string()), - params_to_yaml(&cfg.parameters), - ); - map.insert( - YamlValue::String(KEY_SLIDE_INTERVAL.to_string()), - YamlValue::Number(cfg.slide_interval.into()), - ); - map.insert( - YamlValue::String(KEY_SPATIAL_FILTER.to_string()), - YamlValue::String(cfg.spatial_filter.clone()), - ); - map.insert( - YamlValue::String(KEY_TABLE_NAME.to_string()), - match &cfg.table_name { - Some(t) => YamlValue::String(t.clone()), - None => YamlValue::Null, - }, - ); - map.insert( - YamlValue::String(KEY_VALUE_COLUMN.to_string()), - match &cfg.value_column { - Some(v) => YamlValue::String(v.clone()), - None => YamlValue::Null, - }, - ); - map.insert( - YamlValue::String(KEY_WINDOW_SIZE.to_string()), - YamlValue::Number(cfg.window_size.into()), - ); - map.insert( - YamlValue::String(KEY_WINDOW_TYPE.to_string()), - YamlValue::String(cfg.window_type.to_string()), - ); - - YamlValue::Mapping(map) -} - -pub fn build_queries_yaml( - cleanup_policy: CleanupPolicy, - query_keys_map: &IndexMap)>>, - id_map: &HashMap, -) -> Vec { - query_keys_map - .iter() - .map(|(query_str, keys)| { - let aggregations: Vec = keys - .iter() - .map(|(key, cleanup_param)| { - let agg_id = id_map[key]; - let mut agg_map = serde_yaml::Mapping::new(); - agg_map.insert( - YamlValue::String(KEY_AGGREGATION_ID.to_string()), - YamlValue::Number(agg_id.into()), - ); - if let Some(param) = cleanup_param { - match cleanup_policy { - CleanupPolicy::CircularBuffer => { - agg_map.insert( - YamlValue::String(KEY_NUM_AGG_TO_RETAIN.to_string()), - YamlValue::Number((*param).into()), - ); - } - CleanupPolicy::ReadBased => { - agg_map.insert( - YamlValue::String(KEY_READ_COUNT_THRESHOLD.to_string()), - YamlValue::Number((*param).into()), - ); - } - CleanupPolicy::NoCleanup => {} - } - } - YamlValue::Mapping(agg_map) - }) - .collect(); - - let mut q_map = serde_yaml::Mapping::new(); - q_map.insert( - YamlValue::String(KEY_AGGREGATIONS.to_string()), - YamlValue::Sequence(aggregations), - ); - q_map.insert( - YamlValue::String(KEY_QUERY.to_string()), - YamlValue::String(query_str.clone()), - ); - YamlValue::Mapping(q_map) - }) - .collect() -} - -pub fn params_to_yaml(params: &HashMap) -> YamlValue { - if params.is_empty() { - return YamlValue::Mapping(serde_yaml::Mapping::new()); - } - let mut map = serde_yaml::Mapping::new(); - // Sort for determinism - let mut sorted: Vec<_> = params.iter().collect(); - sorted.sort_by_key(|(k, _)| k.as_str()); - for (k, v) in sorted { - let yaml_val = match v { - JsonValue::Number(n) => { - if let Some(i) = n.as_u64() { - YamlValue::Number(serde_yaml::Number::from(i)) - } else if let Some(f) = n.as_f64() { - YamlValue::Number(serde_yaml::Number::from(f)) - } else { - YamlValue::String(n.to_string()) - } - } - JsonValue::String(s) => YamlValue::String(s.clone()), - JsonValue::Bool(b) => YamlValue::Bool(*b), - other => YamlValue::String(other.to_string()), - }; - map.insert(YamlValue::String(k.clone()), yaml_val); - } - YamlValue::Mapping(map) -} - fn build_streaming_yaml( dedup_map: &IndexMap, id_map: &HashMap, @@ -448,16 +255,3 @@ fn build_inference_yaml( Ok(YamlValue::Mapping(root)) } - -#[derive(Debug, Clone)] -pub struct PuntedQuery { - pub query: String, -} - -pub struct GeneratorOutput { - pub punted_queries: Vec, - pub streaming_yaml: YamlValue, - pub inference_yaml: YamlValue, - pub aggregation_count: usize, - pub query_count: usize, -} diff --git a/asap-planner-rs/src/promql/mod.rs b/asap-planner-rs/src/promql/mod.rs index a08205c..4d5e3f4 100644 --- a/asap-planner-rs/src/promql/mod.rs +++ b/asap-planner-rs/src/promql/mod.rs @@ -1,4 +1,4 @@ pub mod controller; pub mod generator; +pub use crate::generator::{GeneratorOutput, PuntedQuery}; pub use controller::Controller; -pub use generator::{GeneratorOutput, PuntedQuery}; diff --git a/asap-planner-rs/src/sql/generator.rs b/asap-planner-rs/src/sql/generator.rs index 1e36b3b..16e1307 100644 --- a/asap-planner-rs/src/sql/generator.rs +++ b/asap-planner-rs/src/sql/generator.rs @@ -6,14 +6,13 @@ use std::time::{SystemTime, UNIX_EPOCH}; use crate::config::input::SQLControllerConfig; use crate::error::ControllerError; -// Shared YAML helpers and types live in promql::generator until PR #235 extracts them. -use crate::planner::agg_config::IntermediateAggConfig; -use crate::planner::sql::SQLSingleQueryProcessor; -use crate::promql::generator::{ +use crate::generator::{ build_aggregation_entry, build_queries_yaml, GeneratorOutput, KEY_AGGREGATIONS, KEY_CLEANUP_POLICY, KEY_METADATA_COLUMNS, KEY_NAME, KEY_QUERIES, KEY_TABLES, KEY_TIME_COLUMN, KEY_VALUE_COLUMNS, }; +use crate::planner::agg_config::IntermediateAggConfig; +use crate::planner::sql::SQLSingleQueryProcessor; use crate::StreamingEngine; pub struct SQLRuntimeOptions {