From 6dd9be2453f363ec4765b55ee4ea65f9def19a3e Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Thu, 2 Apr 2026 17:58:11 -0400 Subject: [PATCH 1/9] Added logic to automatically infer list of metrics and labels for each metric from Prometheus --- Cargo.lock | 1 + asap-planner-rs/Cargo.toml | 1 + asap-planner-rs/src/config/input.rs | 2 +- asap-planner-rs/src/error.rs | 2 + asap-planner-rs/src/lib.rs | 93 +++++++++-- asap-planner-rs/src/main.rs | 21 ++- asap-planner-rs/src/output/generator.rs | 9 +- asap-planner-rs/src/prometheus_client.rs | 154 ++++++++++++++++++ asap-planner-rs/src/query_log/converter.rs | 16 +- asap-planner-rs/src/query_log/mod.rs | 2 +- .../test_data/configs/cleanup_circular.yaml | 3 - .../test_data/configs/cleanup_read_based.yaml | 3 - .../test_data/configs/deduplicated.yaml | 3 - .../test_data/configs/increase.yaml | 3 - .../test_data/configs/mixed_workload.yaml | 3 - .../test_data/configs/quantile_over_time.yaml | 3 - .../test_data/configs/range_query.yaml | 3 - .../test_data/configs/rate_increase.yaml | 3 - .../test_data/configs/spatial_quantile.yaml | 3 - .../comparison/test_data/configs/sum_by.yaml | 3 - .../test_data/configs/sum_by_overlapping.yaml | 3 - .../test_data/configs/sum_over_time.yaml | 3 - .../configs/temporal_overlapping.yaml | 3 - .../comparison/test_data/configs/topk.yaml | 3 - asap-planner-rs/tests/integration.rs | 151 ++++++++++------- 25 files changed, 354 insertions(+), 140 deletions(-) create mode 100644 asap-planner-rs/src/prometheus_client.rs diff --git a/Cargo.lock b/Cargo.lock index cc56da4c..74f8fcfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,6 +387,7 @@ dependencies = [ "pretty_assertions", "promql-parser", "promql_utilities", + "reqwest", "serde", "serde_json", "serde_yaml", diff --git a/asap-planner-rs/Cargo.toml b/asap-planner-rs/Cargo.toml index 8e7288bb..bb7499bb 100644 --- a/asap-planner-rs/Cargo.toml +++ b/asap-planner-rs/Cargo.toml @@ -27,6 +27,7 @@ clap.workspace = true indexmap.workspace = true chrono.workspace = true promql-parser = "0.5.0" +reqwest = { version = "0.11", features = ["blocking", "json"] } [dev-dependencies] tempfile = "3.20" diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 431bc772..090e20de 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -1,9 +1,9 @@ use serde::Deserialize; #[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] pub struct ControllerConfig { pub query_groups: Vec, - pub metrics: Vec, pub sketch_parameters: Option, pub aggregate_cleanup: Option, } diff --git a/asap-planner-rs/src/error.rs b/asap-planner-rs/src/error.rs index 6806a5e5..02748805 100644 --- a/asap-planner-rs/src/error.rs +++ b/asap-planner-rs/src/error.rs @@ -18,4 +18,6 @@ pub enum ControllerError { SqlParse(String), #[error("Unknown table: {0}")] UnknownTable(String), + #[error("Prometheus client error: {0}")] + PrometheusClient(String), } diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 744e7548..77adcbb4 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -2,6 +2,7 @@ pub mod config; pub mod error; pub mod output; pub mod planner; +pub mod prometheus_client; pub mod query_log; use serde_yaml::Value as YamlValue; @@ -12,6 +13,8 @@ pub use config::input::SQLControllerConfig; pub use error::ControllerError; pub use output::generator::{GeneratorOutput, PuntedQuery}; pub use output::sql_generator::SQLRuntimeOptions; +pub use prometheus_client::build_schema_from_prometheus; +pub use sketch_db_common::PromQLSchema; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamingEngine { @@ -30,6 +33,7 @@ pub struct RuntimeOptions { pub struct Controller { config: ControllerConfig, + schema: PromQLSchema, options: RuntimeOptions, } @@ -279,45 +283,110 @@ impl SQLController { } impl Controller { - pub fn from_file(path: &Path, opts: RuntimeOptions) -> Result { + /// Build a `Controller` from a config file, fetching metric labels from Prometheus. + /// + /// `prometheus_url` is queried via `GET /api/v1/series?match[]=` for each metric + /// name found in the config's PromQL queries. + pub fn from_file( + path: &Path, + opts: RuntimeOptions, + prometheus_url: &str, + ) -> Result { let yaml_str = std::fs::read_to_string(path)?; - Self::from_yaml(&yaml_str, opts) + let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?; + let all_queries: Vec = config + .query_groups + .iter() + .flat_map(|qg| qg.queries.clone()) + .collect(); + let schema = prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?; + Ok(Self { + config, + schema, + options: opts, + }) + } + + /// Build a `Controller` from a config file with a caller-supplied `PromQLSchema`. + /// + /// Use this when the schema is available without querying Prometheus (e.g. in tests + /// or when the schema is constructed in-process by the caller). + pub fn from_file_with_schema( + path: &Path, + schema: PromQLSchema, + opts: RuntimeOptions, + ) -> Result { + let yaml_str = std::fs::read_to_string(path)?; + let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?; + Ok(Self { + config, + schema, + options: opts, + }) } - pub fn from_yaml(yaml: &str, opts: RuntimeOptions) -> Result { + /// Build a `Controller` from a YAML string with a caller-supplied `PromQLSchema`. + pub fn from_yaml_with_schema( + yaml: &str, + schema: PromQLSchema, + opts: RuntimeOptions, + ) -> Result { let config: ControllerConfig = serde_yaml::from_str(yaml)?; Ok(Self { config, + schema, options: opts, }) } - /// Build a `Controller` from a Prometheus query log file and a metrics config YAML. + /// Build a `Controller` from a Prometheus query log file, fetching metric labels from + /// Prometheus. /// /// - `log_path`: newline-delimited JSON query log (Prometheus `--query.log-file` output) - /// - `metrics_path`: YAML file with a `metrics:` section listing metric names and labels + /// - `prometheus_url`: base URL queried for label discovery pub fn from_query_log( log_path: &Path, - metrics_path: &Path, opts: RuntimeOptions, + prometheus_url: &str, ) -> Result { let entries = query_log::parse_log_file(log_path)?; let (instants, ranges) = query_log::infer_queries(&entries, opts.prometheus_scrape_interval); + let config = query_log::to_controller_config(instants, ranges); + let all_queries: Vec = config + .query_groups + .iter() + .flat_map(|qg| qg.queries.clone()) + .collect(); + let schema = prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?; + Ok(Self { + config, + schema, + options: opts, + }) + } - let metrics_yaml = std::fs::read_to_string(metrics_path)?; - let metrics_config: query_log::MetricsConfig = serde_yaml::from_str(&metrics_yaml)?; - - let config = query_log::to_controller_config(instants, ranges, metrics_config.metrics); - + /// Build a `Controller` from a Prometheus query log file with a caller-supplied `PromQLSchema`. + /// + /// Use this when the schema is available without querying Prometheus (e.g. in tests). + pub fn from_query_log_with_schema( + log_path: &Path, + schema: PromQLSchema, + opts: RuntimeOptions, + ) -> Result { + let entries = query_log::parse_log_file(log_path)?; + let (instants, ranges) = + query_log::infer_queries(&entries, opts.prometheus_scrape_interval); + let config = query_log::to_controller_config(instants, ranges); Ok(Self { config, + schema, options: opts, }) } pub fn generate(&self) -> Result { - let output = output::generator::generate_plan(&self.config, &self.options)?; + let output = output::generator::generate_plan(&self.config, &self.schema, &self.options)?; Ok(PlannerOutput { punted_queries: output.punted_queries, streaming_yaml: output.streaming_yaml, diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index 79a54c0f..c274a266 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -14,16 +14,17 @@ struct Args { #[arg(long = "query-log", conflicts_with = "input_config")] query_log: Option, - /// Path to a metrics config YAML (required when using --query-log). - #[arg(long = "metrics-config", requires = "query_log")] - metrics_config: Option, - #[arg(long = "output_dir")] output_dir: PathBuf, #[arg(long = "prometheus_scrape_interval", required = false)] prometheus_scrape_interval: Option, + /// Base URL of the Prometheus instance used to auto-infer metric label sets. + /// Required for PromQL mode. Example: http://localhost:9090 + #[arg(long = "prometheus-url", required = false)] + prometheus_url: Option, + #[arg(long = "streaming_engine", value_enum)] streaming_engine: EngineArg, @@ -73,6 +74,9 @@ fn main() -> anyhow::Result<()> { let scrape_interval = args.prometheus_scrape_interval.ok_or_else(|| { anyhow::anyhow!("--prometheus_scrape_interval is required for PromQL mode") })?; + let prometheus_url = args + .prometheus_url + .ok_or_else(|| anyhow::anyhow!("--prometheus-url is required for PromQL mode"))?; let opts = RuntimeOptions { prometheus_scrape_interval: scrape_interval, streaming_engine: engine, @@ -81,12 +85,11 @@ fn main() -> anyhow::Result<()> { step: args.step, }; let controller = match (args.input_config, args.query_log) { - (Some(config_path), None) => Controller::from_file(&config_path, opts)?, + (Some(config_path), None) => { + Controller::from_file(&config_path, opts, &prometheus_url)? + } (None, Some(log_path)) => { - let metrics_path = args - .metrics_config - .expect("--metrics-config is required when using --query-log"); - Controller::from_query_log(&log_path, &metrics_path, opts)? + Controller::from_query_log(&log_path, opts, &prometheus_url)? } _ => anyhow::bail!( "exactly one of --input_config or --query-log must be provided for PromQL mode" diff --git a/asap-planner-rs/src/output/generator.rs b/asap-planner-rs/src/output/generator.rs index efc51227..7f263afe 100644 --- a/asap-planner-rs/src/output/generator.rs +++ b/asap-planner-rs/src/output/generator.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use promql_utilities::data_model::KeyByLabelNames; use sketch_db_common::enums::CleanupPolicy; +use sketch_db_common::PromQLSchema; use crate::config::input::ControllerConfig; use crate::error::ControllerError; @@ -14,14 +15,10 @@ use crate::RuntimeOptions; /// Run the full planning pipeline and produce YAML outputs pub fn generate_plan( controller_config: &ControllerConfig, + schema: &PromQLSchema, opts: &RuntimeOptions, ) -> Result { - // Build metric schema - let mut metric_schema = sketch_db_common::PromQLSchema::new(); - for md in &controller_config.metrics { - metric_schema = - metric_schema.add_metric(md.metric.clone(), KeyByLabelNames::new(md.labels.clone())); - } + let metric_schema = schema.clone(); // Determine cleanup policy let cleanup_policy_str = controller_config diff --git a/asap-planner-rs/src/prometheus_client.rs b/asap-planner-rs/src/prometheus_client.rs new file mode 100644 index 00000000..d0634525 --- /dev/null +++ b/asap-planner-rs/src/prometheus_client.rs @@ -0,0 +1,154 @@ +use std::collections::HashSet; + +use promql_parser::parser::Expr; +use promql_utilities::data_model::KeyByLabelNames; +use sketch_db_common::PromQLSchema; +use tracing::warn; + +use crate::error::ControllerError; + +/// Walk a PromQL AST and collect all metric names referenced by VectorSelectors. +fn collect_metric_names(expr: &Expr, names: &mut HashSet) { + match expr { + Expr::VectorSelector(vs) => { + if let Some(name) = &vs.name { + names.insert(name.clone()); + } + } + Expr::MatrixSelector(ms) => { + if let Some(name) = &ms.vs.name { + names.insert(name.clone()); + } + } + Expr::Call(call) => { + for arg in &call.args.args { + collect_metric_names(arg, names); + } + } + Expr::Aggregate(agg) => { + collect_metric_names(&agg.expr, names); + } + Expr::Binary(bin) => { + collect_metric_names(&bin.lhs, names); + collect_metric_names(&bin.rhs, names); + } + Expr::Subquery(sq) => { + collect_metric_names(&sq.expr, names); + } + _ => {} + } +} + +/// Extract all unique metric names referenced in a slice of PromQL query strings. +/// Queries that fail to parse are skipped with a warning. +pub fn extract_metric_names(queries: &[String]) -> HashSet { + let mut names = HashSet::new(); + for query in queries { + match promql_parser::parser::parse(query) { + Ok(expr) => collect_metric_names(&expr, &mut names), + Err(e) => warn!( + "Could not parse query {:?} for metric name extraction: {}", + query, e + ), + } + } + names +} + +/// Query Prometheus `GET /api/v1/series?match[]=` and return the set of label key names +/// for that metric, or `None` if no series were found. +/// +/// Internal `__*__` labels (e.g. `__name__`) are excluded from the result. +/// +/// TODO: This queries only the last 5 minutes of series data (Prometheus default when no +/// `start`/`end` parameters are provided). Expand to a configurable lookback window to capture +/// metrics that have not been seen recently. +fn fetch_labels_for_metric( + prometheus_url: &str, + metric_name: &str, +) -> Result>, ControllerError> { + let url = format!("{}/api/v1/series", prometheus_url.trim_end_matches('/')); + let client = reqwest::blocking::Client::new(); + let response = client + .get(&url) + .query(&[("match[]", metric_name)]) + .send() + .map_err(|e| { + ControllerError::PrometheusClient(format!( + "HTTP request failed for metric '{}': {}", + metric_name, e + )) + })?; + + if !response.status().is_success() { + return Err(ControllerError::PrometheusClient(format!( + "Prometheus returned HTTP {} for metric '{}'", + response.status(), + metric_name + ))); + } + + let body: serde_json::Value = response.json().map_err(|e| { + ControllerError::PrometheusClient(format!( + "Failed to parse Prometheus response for metric '{}': {}", + metric_name, e + )) + })?; + + let data = match body.get("data").and_then(|d| d.as_array()) { + Some(arr) => arr, + None => { + warn!( + "Prometheus returned no 'data' array for metric '{}'; skipping", + metric_name + ); + return Ok(None); + } + }; + + if data.is_empty() { + warn!( + "Prometheus returned no series for metric '{}' in the last 5 minutes; skipping", + metric_name + ); + return Ok(None); + } + + // Collect all unique label key names across all returned series, + // filtering out internal __*__ labels. + let mut label_keys: HashSet = HashSet::new(); + for series in data { + if let Some(labels) = series.as_object() { + for key in labels.keys() { + if !key.starts_with("__") { + label_keys.insert(key.clone()); + } + } + } + } + + Ok(Some(label_keys.into_iter().collect())) +} + +/// Build a `PromQLSchema` by querying Prometheus for each metric name found in the given +/// PromQL queries. Metrics with no series in Prometheus are skipped with a warning. +pub fn build_schema_from_prometheus( + prometheus_url: &str, + queries: &[String], +) -> Result { + let metric_names = extract_metric_names(queries); + let mut schema = PromQLSchema::new(); + + for metric_name in metric_names { + match fetch_labels_for_metric(prometheus_url, &metric_name)? { + Some(labels) => { + schema = schema.add_metric(metric_name, KeyByLabelNames::new(labels)); + } + None => { + // Warning already emitted inside fetch_labels_for_metric. + } + } + } + + Ok(schema) +} diff --git a/asap-planner-rs/src/query_log/converter.rs b/asap-planner-rs/src/query_log/converter.rs index 5703d042..86e68c1e 100644 --- a/asap-planner-rs/src/query_log/converter.rs +++ b/asap-planner-rs/src/query_log/converter.rs @@ -1,24 +1,13 @@ -use serde::Deserialize; - -use crate::config::input::{ - AggregateCleanupConfig, ControllerConfig, MetricDefinition, QueryGroup, -}; +use crate::config::input::{AggregateCleanupConfig, ControllerConfig, QueryGroup}; use super::frequency::{InstantQueryInfo, RangeQueryInfo}; -/// Subset of ControllerConfig used when loading from a metrics-only YAML file. -#[derive(Deserialize)] -pub struct MetricsConfig { - pub metrics: Vec, -} - -/// Build a `ControllerConfig` from extracted instant and range queries plus a metrics definition. +/// Build a `ControllerConfig` from extracted instant and range queries. /// /// Each query becomes its own `QueryGroup` (one query per group, no SLA fields needed). pub fn to_controller_config( instants: Vec, ranges: Vec, - metrics: Vec, ) -> ControllerConfig { let mut query_groups: Vec = Vec::new(); @@ -46,7 +35,6 @@ pub fn to_controller_config( ControllerConfig { query_groups, - metrics, sketch_parameters: None, aggregate_cleanup: Some(AggregateCleanupConfig { policy: Some("read_based".to_string()), diff --git a/asap-planner-rs/src/query_log/mod.rs b/asap-planner-rs/src/query_log/mod.rs index d1987419..135e3fe0 100644 --- a/asap-planner-rs/src/query_log/mod.rs +++ b/asap-planner-rs/src/query_log/mod.rs @@ -2,6 +2,6 @@ pub mod converter; pub mod frequency; pub mod parser; -pub use converter::{to_controller_config, MetricsConfig}; +pub use converter::to_controller_config; pub use frequency::{infer_queries, InstantQueryInfo, RangeQueryInfo}; pub use parser::{parse_log_file, LogEntry}; diff --git a/asap-planner-rs/tests/comparison/test_data/configs/cleanup_circular.yaml b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_circular.yaml index 54f1ea4f..40d6c1f0 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/cleanup_circular.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_circular.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "circular_buffer" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/cleanup_read_based.yaml b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_read_based.yaml index b3b332ee..2fcd7996 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/cleanup_read_based.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/cleanup_read_based.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/deduplicated.yaml b/asap-planner-rs/tests/comparison/test_data/configs/deduplicated.yaml index 1af476ff..4fe88975 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/deduplicated.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/deduplicated.yaml @@ -15,8 +15,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/increase.yaml b/asap-planner-rs/tests/comparison/test_data/configs/increase.yaml index 2a018e95..d7235571 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/increase.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/increase.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/mixed_workload.yaml b/asap-planner-rs/tests/comparison/test_data/configs/mixed_workload.yaml index 105be712..c4cac069 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/mixed_workload.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/mixed_workload.yaml @@ -7,8 +7,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/quantile_over_time.yaml b/asap-planner-rs/tests/comparison/test_data/configs/quantile_over_time.yaml index eb4e8bcd..0d37afb6 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/quantile_over_time.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/quantile_over_time.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/range_query.yaml b/asap-planner-rs/tests/comparison/test_data/configs/range_query.yaml index b3b332ee..2fcd7996 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/range_query.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/range_query.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/rate_increase.yaml b/asap-planner-rs/tests/comparison/test_data/configs/rate_increase.yaml index b3b332ee..2fcd7996 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/rate_increase.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/rate_increase.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/spatial_quantile.yaml b/asap-planner-rs/tests/comparison/test_data/configs/spatial_quantile.yaml index fc1e1ebb..91b9ca99 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/spatial_quantile.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/spatial_quantile.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/sum_by.yaml b/asap-planner-rs/tests/comparison/test_data/configs/sum_by.yaml index b1c8ddd4..7c68d86e 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/sum_by.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/sum_by.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/sum_by_overlapping.yaml b/asap-planner-rs/tests/comparison/test_data/configs/sum_by_overlapping.yaml index 6e777e67..1324c781 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/sum_by_overlapping.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/sum_by_overlapping.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/sum_over_time.yaml b/asap-planner-rs/tests/comparison/test_data/configs/sum_over_time.yaml index ca7969cd..c560bddc 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/sum_over_time.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/sum_over_time.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/temporal_overlapping.yaml b/asap-planner-rs/tests/comparison/test_data/configs/temporal_overlapping.yaml index 0049e63f..4e74cb84 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/temporal_overlapping.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/temporal_overlapping.yaml @@ -9,8 +9,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/comparison/test_data/configs/topk.yaml b/asap-planner-rs/tests/comparison/test_data/configs/topk.yaml index 4aa2b6e3..3d996d8c 100644 --- a/asap-planner-rs/tests/comparison/test_data/configs/topk.yaml +++ b/asap-planner-rs/tests/comparison/test_data/configs/topk.yaml @@ -6,8 +6,5 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance", "job", "method", "status"] aggregate_cleanup: policy: "read_based" diff --git a/asap-planner-rs/tests/integration.rs b/asap-planner-rs/tests/integration.rs index a7f39ac6..d00cc845 100644 --- a/asap-planner-rs/tests/integration.rs +++ b/asap-planner-rs/tests/integration.rs @@ -1,13 +1,39 @@ -use asap_planner::{Controller, ControllerError, RuntimeOptions, StreamingEngine}; +use asap_planner::{Controller, ControllerError, PromQLSchema, RuntimeOptions, StreamingEngine}; +use promql_utilities::data_model::KeyByLabelNames; use std::path::Path; +// ─── helpers ───────────────────────────────────────────────────────────────── + +fn arroyo_opts() -> RuntimeOptions { + RuntimeOptions { + prometheus_scrape_interval: 15, + streaming_engine: StreamingEngine::Arroyo, + enable_punting: false, + range_duration: 0, + step: 0, + } +} + +/// Standard test schema: http_requests_total with [instance, job, method, status]. +fn http_requests_schema() -> PromQLSchema { + PromQLSchema::new().add_metric( + "http_requests_total".to_string(), + KeyByLabelNames::new(vec![ + "instance".to_string(), + "job".to_string(), + "method".to_string(), + "status".to_string(), + ]), + ) +} + // ─── query_log integration tests ───────────────────────────────────────────── #[test] fn query_log_instant_produces_valid_configs() { - let c = Controller::from_query_log( + let c = Controller::from_query_log_with_schema( Path::new("tests/comparison/test_data/query_logs/instant_only.log"), - Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -18,9 +44,9 @@ fn query_log_instant_produces_valid_configs() { #[test] fn query_log_range_produces_valid_configs() { - let c = Controller::from_query_log( + let c = Controller::from_query_log_with_schema( Path::new("tests/comparison/test_data/query_logs/range_only.log"), - Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -31,9 +57,9 @@ fn query_log_range_produces_valid_configs() { #[test] fn query_log_single_occurrence_excluded() { - let c = Controller::from_query_log( + let c = Controller::from_query_log_with_schema( Path::new("tests/comparison/test_data/query_logs/single_occurrence.log"), - Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -44,9 +70,9 @@ fn query_log_single_occurrence_excluded() { #[test] fn query_log_malformed_lines_skipped() { // with_malformed.log has 5 valid entries for rate() interspersed with bad lines - let c = Controller::from_query_log( + let c = Controller::from_query_log_with_schema( Path::new("tests/comparison/test_data/query_logs/with_malformed.log"), - Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -57,9 +83,9 @@ fn query_log_malformed_lines_skipped() { #[test] fn query_log_output_files_written() { let dir = tempfile::tempdir().unwrap(); - let c = Controller::from_query_log( + let c = Controller::from_query_log_with_schema( Path::new("tests/comparison/test_data/query_logs/instant_only.log"), - Path::new("tests/comparison/test_data/metrics/http_requests.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -68,22 +94,13 @@ fn query_log_output_files_written() { assert!(dir.path().join("inference_config.yaml").exists()); } -fn arroyo_opts() -> RuntimeOptions { - RuntimeOptions { - prometheus_scrape_interval: 15, - streaming_engine: StreamingEngine::Arroyo, - enable_punting: false, - range_duration: 0, - step: 0, - } -} - #[test] fn quantile_over_time_produces_kll() { // quantile_over_time groups by all labels → 1 DatasketchesKLL config // Arroyo/Flink maintains one sketch per unique label-value combination at runtime - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/quantile_over_time.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -95,8 +112,9 @@ fn quantile_over_time_produces_kll() { #[test] fn rate_produces_multiple_increase_only() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/rate_increase.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -107,8 +125,9 @@ fn rate_produces_multiple_increase_only() { #[test] fn only_spatial_window_equals_scrape_interval() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/spatial_quantile.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -118,8 +137,9 @@ fn only_spatial_window_equals_scrape_interval() { #[test] fn duplicate_aggregation_configs_are_deduped() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/deduplicated.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -130,8 +150,9 @@ fn duplicate_aggregation_configs_are_deduped() { #[test] fn topk_produces_count_min_sketch_with_heap() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/topk.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -146,8 +167,9 @@ fn range_query_uses_effective_repeat() { step: 30, ..arroyo_opts() }; - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/range_query.yaml"), + http_requests_schema(), opts, ) .unwrap(); @@ -158,8 +180,9 @@ fn range_query_uses_effective_repeat() { #[test] fn output_files_written_to_dir() { let dir = tempfile::tempdir().unwrap(); - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/mixed_workload.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -177,8 +200,9 @@ fn rate_tumbling_window_size_equals_effective_repeat() { step: 30, ..arroyo_opts() }; - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/rate_increase.yaml"), + http_requests_schema(), opts, ) .unwrap(); @@ -194,8 +218,9 @@ fn increase_tumbling_window_size_equals_effective_repeat() { step: 30, ..arroyo_opts() }; - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/increase.yaml"), + http_requests_schema(), opts, ) .unwrap(); @@ -212,8 +237,9 @@ fn quantile_over_time_tumbling_window_size_equals_effective_repeat() { step: 30, ..arroyo_opts() }; - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/quantile_over_time.yaml"), + http_requests_schema(), opts, ) .unwrap(); @@ -225,8 +251,9 @@ fn quantile_over_time_tumbling_window_size_equals_effective_repeat() { #[test] fn sum_over_time_produces_count_min_sketch_with_delta_set() { // sum_over_time is Approximate → CountMinSketch + DeltaSetAggregator pairing - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/sum_over_time.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -238,8 +265,9 @@ fn sum_over_time_produces_count_min_sketch_with_delta_set() { #[test] fn sum_by_produces_count_min_sketch_with_delta_set() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/sum_by.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -252,8 +280,9 @@ fn sum_by_produces_count_min_sketch_with_delta_set() { #[test] fn sum_by_rollup_excludes_groupby_labels() { // sum by (job, method) → rollup gets labels NOT in by-clause - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/sum_by.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -277,13 +306,10 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance"] aggregate_cleanup: policy: "not_a_real_policy" "#; - let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + let c = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()).unwrap(); assert!(matches!( c.generate(), Err(ControllerError::PlannerError(_)) @@ -302,11 +328,8 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance"] "#; - let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + let c = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()).unwrap(); assert!(matches!( c.generate(), Err(ControllerError::DuplicateQuery(_)) @@ -331,11 +354,8 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance"] "#; - let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + let c = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()).unwrap(); assert!(matches!( c.generate(), Err(ControllerError::DuplicateQuery(_)) @@ -354,11 +374,9 @@ query_groups: controller_options: accuracy_sla: 0.99 latency_sla: 1.0 -metrics: - - metric: "http_requests_total" - labels: ["instance"] "#; - let c = Controller::from_yaml(yaml, arroyo_opts()).unwrap(); + // Schema only knows about http_requests_total, not unknown_metric. + let c = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()).unwrap(); let out = c.generate().unwrap(); assert_eq!(out.inference_query_count(), 0); assert_eq!(out.streaming_aggregation_count(), 0); @@ -366,7 +384,26 @@ metrics: #[test] fn malformed_yaml_returns_parse_error() { - let result = Controller::from_yaml("{ invalid yaml :", arroyo_opts()); + let result = + Controller::from_yaml_with_schema("{ invalid yaml :", PromQLSchema::new(), arroyo_opts()); + assert!(matches!(result, Err(ControllerError::YamlParse(_)))); +} + +#[test] +fn stale_metrics_field_in_yaml_returns_parse_error() { + // Configs that still contain a top-level `metrics:` key must fail loudly + // (deny_unknown_fields is set on ControllerConfig). + let yaml = r#" +query_groups: + - id: 1 + queries: + - "rate(http_requests_total[5m])" + repetition_delay: 300 +metrics: + - metric: "http_requests_total" + labels: ["instance"] +"#; + let result = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()); assert!(matches!(result, Err(ControllerError::YamlParse(_)))); } @@ -378,8 +415,9 @@ fn malformed_yaml_returns_parse_error() { #[test] fn temporal_overlapping_window_size_equals_t_repeat() { // [5m] range repeated every 60s → windowSize = 60, not 300 - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -391,8 +429,9 @@ fn temporal_overlapping_window_size_equals_t_repeat() { fn temporal_overlapping_all_function_types_present() { // rate+increase → MultipleIncrease (deduped to 1), sum_over_time → CountMinSketch+DeltaSet, // quantile_over_time → DatasketchesKLL; 4 unique streaming aggregation configs total - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -406,8 +445,9 @@ fn temporal_overlapping_all_function_types_present() { #[test] fn temporal_overlapping_cleanup_param_equals_range_over_repeat() { // t_lookback = 5m = 300s, effective_repeat = 60s → ceil(300/60) = 5 - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); @@ -434,8 +474,9 @@ fn temporal_overlapping_cleanup_param_equals_range_over_repeat() { fn temporal_overlapping_rate_increase_deduped() { // rate and increase produce identical MultipleIncrease configs → 1 streaming entry shared, // but inference config still tracks 4 queries separately - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/temporal_overlapping.yaml"), + http_requests_schema(), arroyo_opts(), ) .unwrap(); From 86c6b997c267ea24214958140739cbe593a6ae56 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Mon, 6 Apr 2026 10:57:36 -0400 Subject: [PATCH 2/9] Modified asap-quickstart and asap-tools/experiments pipeline to account for cmd line changes to asap-planner-rs --- asap-planner-rs/docker-compose.yml.j2 | 3 +- asap-quickstart/docker-compose.yml | 8 +++--- asap-tools/experiments/experiment_run_e2e.py | 28 +++++++++++++++++++ .../experiment_utils/services/misc.py | 10 ++++++- .../generate_controller_compose.py | 8 ++++++ 5 files changed, 51 insertions(+), 6 deletions(-) diff --git a/asap-planner-rs/docker-compose.yml.j2 b/asap-planner-rs/docker-compose.yml.j2 index d36368f2..06d42d2d 100644 --- a/asap-planner-rs/docker-compose.yml.j2 +++ b/asap-planner-rs/docker-compose.yml.j2 @@ -9,7 +9,8 @@ services: "--input_config", "/app/input/config.yaml", "--output_dir", "/app/output", "--prometheus_scrape_interval", "{{ prometheus_scrape_interval }}", - "--streaming_engine", "{{ streaming_engine }}"{% if punting %}, + "--streaming_engine", "{{ streaming_engine }}", + "--prometheus-url", "{{ prometheus_url }}"{% if punting %}, "--enable-punting"{% endif %} ] restart: no diff --git a/asap-quickstart/docker-compose.yml b/asap-quickstart/docker-compose.yml index 8c943528..299b5e85 100644 --- a/asap-quickstart/docker-compose.yml +++ b/asap-quickstart/docker-compose.yml @@ -136,9 +136,6 @@ services: interval: 10s timeout: 5s retries: 5 - depends_on: - asap-summary-ingest: - condition: service_completed_successfully restart: no grafana: @@ -179,12 +176,16 @@ services: - "--input_config=/config/controller-config.yaml" - "--output_dir=/asap-planner-output" - "--prometheus_scrape_interval=1" + - "--prometheus-url=http://prometheus:9090" - "--streaming_engine=arroyo" - "--range-duration=300" - "--step=10" volumes: - ./config/controller-config.yaml:/config/controller-config.yaml:ro - asap-planner-output:/asap-planner-output + depends_on: + prometheus: + condition: service_healthy restart: "no" asap-summary-ingest: @@ -348,7 +349,6 @@ services: - "--dataset=sine" - "--num-labels=3" - "--num-values-per-label=30,30,30" - - "--num-values-per-label=30,30,30" - "--metric-type=gauge" - "--metric-name=sensor_reading" - "--label-names=region,service,host" diff --git a/asap-tools/experiments/experiment_run_e2e.py b/asap-tools/experiments/experiment_run_e2e.py index 446014af..0b2e66a3 100644 --- a/asap-tools/experiments/experiment_run_e2e.py +++ b/asap-tools/experiments/experiment_run_e2e.py @@ -318,12 +318,40 @@ def main(cfg: DictConfig): # copy_controller_client_config(args.controller_client_config, local_experiment_dir) if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: + # The controller queries Prometheus to auto-infer metric labels, so + # exporters and Prometheus must be up and have at least one scrape + # worth of data before the controller runs. + if config.check_exporter_and_queries_exist( + "fake_exporter", cfg.experiment_params + ): + exporter_service.start( + config=exporter_config["exporter_list"]["fake_exporter"], + experiment_output_dir=experiment_output_dir, + local_experiment_dir=local_experiment_dir, + ) + prometheus_service.start( + experiment_output_dir=experiment_output_dir, + local_experiment_dir=local_experiment_dir, + experiment_mode=experiment_mode, + ) + # Wait for two scrape intervals so Prometheus has series to return. + label_discovery_wait = prometheus_scrape_interval * 2 + print( + f"Waiting {label_discovery_wait}s for Prometheus to scrape initial data " + f"before running controller label inference..." + ) + time.sleep(label_discovery_wait) + + prometheus_url = ( + f"http://localhost:{prometheus_service.get_query_endpoint_port()}" + ) controller_service.start( controller_input_file=controller_client_config, prometheus_scrape_interval=prometheus_scrape_interval, streaming_engine=args.streaming_engine, controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR, punting=args.controller_punting, + prometheus_url=prometheus_url, ) sync.rsync_controller_config_remote_to_local( provider, diff --git a/asap-tools/experiments/experiment_utils/services/misc.py b/asap-tools/experiments/experiment_utils/services/misc.py index 798e63d2..5afaf678 100644 --- a/asap-tools/experiments/experiment_utils/services/misc.py +++ b/asap-tools/experiments/experiment_utils/services/misc.py @@ -198,6 +198,7 @@ def start( streaming_engine: str, controller_remote_output_dir: str, punting: bool, + prometheus_url: str, **kwargs, ) -> None: """ @@ -209,6 +210,7 @@ def start( streaming_engine: Type of streaming engine controller_remote_output_dir: Controller output directory punting: Enable query punting based on performance heuristics + prometheus_url: Base URL of the Prometheus instance for metric label inference **kwargs: Additional configuration """ if self.use_container: @@ -218,6 +220,7 @@ def start( streaming_engine, controller_remote_output_dir, punting, + prometheus_url, ) else: return self._start_bare_metal( @@ -226,6 +229,7 @@ def start( streaming_engine, controller_remote_output_dir, punting, + prometheus_url, ) def _start_bare_metal( @@ -235,12 +239,14 @@ def _start_bare_metal( streaming_engine: str, controller_remote_output_dir: str, punting: bool, + prometheus_url: str, ) -> None: - cmd = "./target/release/asap-planner --input_config {} --prometheus_scrape_interval {} --output_dir {} --streaming_engine {}".format( + cmd = "./target/release/asap-planner --input_config {} --prometheus_scrape_interval {} --output_dir {} --streaming_engine {} --prometheus-url {}".format( controller_input_file, prometheus_scrape_interval, controller_remote_output_dir, streaming_engine, + prometheus_url, ) if punting: cmd += " --enable-punting" @@ -261,6 +267,7 @@ def _start_containerized( streaming_engine: str, controller_remote_output_dir: str, punting: bool, + prometheus_url: str, ): controller_dir = os.path.join( self.provider.get_home_dir(), "code", "asap-planner-rs" @@ -288,6 +295,7 @@ def _start_containerized( generate_cmd += f" --controller-output-dir {controller_remote_output_dir}" generate_cmd += f" --prometheus-scrape-interval {prometheus_scrape_interval}" generate_cmd += f" --streaming-engine {streaming_engine}" + generate_cmd += f" --prometheus-url {prometheus_url}" if punting: generate_cmd += " --punting" diff --git a/asap-tools/experiments/generate_controller_compose.py b/asap-tools/experiments/generate_controller_compose.py index 439185d3..63ed78e1 100644 --- a/asap-tools/experiments/generate_controller_compose.py +++ b/asap-tools/experiments/generate_controller_compose.py @@ -18,6 +18,7 @@ def generate_compose_file( prometheus_scrape_interval: int, streaming_engine: str, punting: bool, + prometheus_url: str, ): """Generate docker-compose.yml from template with provided variables.""" @@ -41,6 +42,7 @@ def generate_compose_file( "prometheus_scrape_interval": prometheus_scrape_interval, "streaming_engine": streaming_engine, "punting": punting, + "prometheus_url": prometheus_url, } # Render the template @@ -117,6 +119,11 @@ def main(): action="store_true", help="Enable query punting based on performance heuristics", ) + parser.add_argument( + "--prometheus-url", + required=True, + help="Base URL of the Prometheus instance for metric label inference (e.g. http://localhost:9090)", + ) args = parser.parse_args() @@ -130,6 +137,7 @@ def main(): prometheus_scrape_interval=args.prometheus_scrape_interval, streaming_engine=args.streaming_engine, punting=args.punting, + prometheus_url=args.prometheus_url, ) From f3b0ac6c3a47ca467043122d74590d9b0e93b2b4 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Tue, 7 Apr 2026 10:01:38 -0400 Subject: [PATCH 3/9] added debug statements, added option to fallback to metrics if auto-infer fails, updated tests --- asap-planner-rs/src/config/input.rs | 4 ++ asap-planner-rs/src/lib.rs | 21 ++++++++- asap-planner-rs/src/prometheus_client.rs | 10 ++-- asap-planner-rs/src/query_log/converter.rs | 1 + asap-planner-rs/tests/integration.rs | 53 ++++++++++++++++++---- 5 files changed, 75 insertions(+), 14 deletions(-) diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 090e20de..699fc884 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -6,6 +6,10 @@ pub struct ControllerConfig { pub query_groups: Vec, pub sketch_parameters: Option, pub aggregate_cleanup: Option, + /// Optional hint: per-metric label sets used as a fallback when Prometheus + /// returns no series for a metric. Prometheus-inferred labels take priority. + #[serde(default)] + pub metrics: Option>, } #[derive(Debug, Clone, Deserialize)] diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 77adcbb4..6d71c4e8 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -5,8 +5,10 @@ pub mod planner; pub mod prometheus_client; pub mod query_log; +use promql_utilities::data_model::KeyByLabelNames; use serde_yaml::Value as YamlValue; use std::path::Path; +use tracing::debug; pub use config::input::ControllerConfig; pub use config::input::SQLControllerConfig; @@ -299,7 +301,24 @@ impl Controller { .iter() .flat_map(|qg| qg.queries.clone()) .collect(); - let schema = prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?; + let mut schema = + prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?; + // For any metric that Prometheus had no series for, fall back to the + // `metrics` hint in the config file (if present). + if let Some(metric_hints) = &config.metrics { + for hint in metric_hints { + if !schema.config.contains_key(&hint.metric) { + debug!( + "Prometheus had no series for '{}'; falling back to config-file hint with labels {:?}", + hint.metric, hint.labels + ); + schema = schema.add_metric( + hint.metric.clone(), + KeyByLabelNames::new(hint.labels.clone()), + ); + } + } + } Ok(Self { config, schema, diff --git a/asap-planner-rs/src/prometheus_client.rs b/asap-planner-rs/src/prometheus_client.rs index d0634525..fff6d416 100644 --- a/asap-planner-rs/src/prometheus_client.rs +++ b/asap-planner-rs/src/prometheus_client.rs @@ -3,7 +3,7 @@ use std::collections::HashSet; use promql_parser::parser::Expr; use promql_utilities::data_model::KeyByLabelNames; use sketch_db_common::PromQLSchema; -use tracing::warn; +use tracing::{debug, warn}; use crate::error::ControllerError; @@ -137,12 +137,14 @@ pub fn build_schema_from_prometheus( queries: &[String], ) -> Result { let metric_names = extract_metric_names(queries); + debug!("Inferred metric names from queries: {:?}", metric_names); let mut schema = PromQLSchema::new(); - for metric_name in metric_names { - match fetch_labels_for_metric(prometheus_url, &metric_name)? { + for metric_name in &metric_names { + match fetch_labels_for_metric(prometheus_url, metric_name)? { Some(labels) => { - schema = schema.add_metric(metric_name, KeyByLabelNames::new(labels)); + debug!("Inferred labels for metric '{}': {:?}", metric_name, labels); + schema = schema.add_metric(metric_name.clone(), KeyByLabelNames::new(labels)); } None => { // Warning already emitted inside fetch_labels_for_metric. diff --git a/asap-planner-rs/src/query_log/converter.rs b/asap-planner-rs/src/query_log/converter.rs index 86e68c1e..22babb86 100644 --- a/asap-planner-rs/src/query_log/converter.rs +++ b/asap-planner-rs/src/query_log/converter.rs @@ -39,5 +39,6 @@ pub fn to_controller_config( aggregate_cleanup: Some(AggregateCleanupConfig { policy: Some("read_based".to_string()), }), + metrics: None, } } diff --git a/asap-planner-rs/tests/integration.rs b/asap-planner-rs/tests/integration.rs index 337a57e1..3bae948e 100644 --- a/asap-planner-rs/tests/integration.rs +++ b/asap-planner-rs/tests/integration.rs @@ -27,6 +27,36 @@ fn http_requests_schema() -> PromQLSchema { ) } +/// Schema for binary arithmetic tests: errors_total and requests_total. +fn binary_arithmetic_schema() -> PromQLSchema { + PromQLSchema::new() + .add_metric( + "errors_total".to_string(), + KeyByLabelNames::new(vec!["instance".to_string(), "job".to_string()]), + ) + .add_metric( + "requests_total".to_string(), + KeyByLabelNames::new(vec!["instance".to_string(), "job".to_string()]), + ) +} + +/// Schema for nested binary arithmetic test: a_total, b_total, c_total. +fn nested_binary_arithmetic_schema() -> PromQLSchema { + PromQLSchema::new() + .add_metric( + "a_total".to_string(), + KeyByLabelNames::new(vec!["instance".to_string(), "job".to_string()]), + ) + .add_metric( + "b_total".to_string(), + KeyByLabelNames::new(vec!["instance".to_string(), "job".to_string()]), + ) + .add_metric( + "c_total".to_string(), + KeyByLabelNames::new(vec!["instance".to_string(), "job".to_string()]), + ) +} + // ─── query_log integration tests ───────────────────────────────────────────── #[test] @@ -390,9 +420,9 @@ fn malformed_yaml_returns_parse_error() { } #[test] -fn stale_metrics_field_in_yaml_returns_parse_error() { - // Configs that still contain a top-level `metrics:` key must fail loudly - // (deny_unknown_fields is set on ControllerConfig). +fn metrics_field_in_yaml_is_accepted_as_hint() { + // The `metrics` section is a backwards-compatible label hint used as a + // fallback when Prometheus has no series for a metric. It must parse cleanly. let yaml = r#" query_groups: - id: 1 @@ -404,7 +434,7 @@ metrics: labels: ["instance"] "#; let result = Controller::from_yaml_with_schema(yaml, http_requests_schema(), arroyo_opts()); - assert!(matches!(result, Err(ControllerError::YamlParse(_)))); + assert!(result.is_ok()); } // --- Overlapping window tests --- @@ -474,8 +504,9 @@ fn temporal_overlapping_cleanup_param_equals_range_over_repeat() { #[test] fn binary_arithmetic_produces_two_leaf_configs() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/binary_arithmetic.yaml"), + binary_arithmetic_schema(), arroyo_opts(), ) .unwrap(); @@ -488,8 +519,9 @@ fn binary_arithmetic_produces_two_leaf_configs() { #[test] fn binary_arithmetic_deduplicates_shared_arm() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/binary_arithmetic_dedup.yaml"), + binary_arithmetic_schema(), arroyo_opts(), ) .unwrap(); @@ -502,8 +534,9 @@ fn binary_arithmetic_deduplicates_shared_arm() { #[test] fn nested_binary_arithmetic_produces_three_leaf_configs() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/binary_arithmetic_nested.yaml"), + nested_binary_arithmetic_schema(), arroyo_opts(), ) .unwrap(); @@ -514,8 +547,9 @@ fn nested_binary_arithmetic_produces_three_leaf_configs() { #[test] fn binary_arithmetic_scalar_constant_produces_one_leaf_config() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/binary_arithmetic_scalar.yaml"), + binary_arithmetic_schema(), arroyo_opts(), ) .unwrap(); @@ -527,8 +561,9 @@ fn binary_arithmetic_scalar_constant_produces_one_leaf_config() { #[test] fn binary_arithmetic_with_non_acceleratable_arm_produces_no_configs() { - let c = Controller::from_file( + let c = Controller::from_file_with_schema( Path::new("tests/comparison/test_data/configs/binary_arithmetic_non_acceleratable.yaml"), + binary_arithmetic_schema(), arroyo_opts(), ) .unwrap(); From 0b93469b0b5d9e1bd9b9cf1643b0bb3a7ec9d07c Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Tue, 7 Apr 2026 10:35:49 -0400 Subject: [PATCH 4/9] fixed bug in experiment scripts --- asap-tools/experiments/experiment_run_e2e.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/asap-tools/experiments/experiment_run_e2e.py b/asap-tools/experiments/experiment_run_e2e.py index 0b2e66a3..738fbbdf 100644 --- a/asap-tools/experiments/experiment_run_e2e.py +++ b/asap-tools/experiments/experiment_run_e2e.py @@ -364,10 +364,14 @@ def main(cfg: DictConfig): kafka_service.delete_topics() kafka_service.create_topics() - if config.check_exporter_and_queries_exist( - "fake_exporter", cfg.experiment_params + if ( + config.check_exporter_and_queries_exist( + "fake_exporter", cfg.experiment_params + ) + and experiment_mode != constants.SKETCHDB_EXPERIMENT_NAME ): # this DOES NOT block + # (SKETCHDB mode already started the exporter early for label discovery) exporter_service.start( config=exporter_config["exporter_list"]["fake_exporter"], experiment_output_dir=experiment_output_dir, @@ -495,9 +499,12 @@ def main(cfg: DictConfig): system_exporters_service.start(cfg.experiment_params) # Start Prometheus service based on deployment mode + # (SKETCHDB mode already started Prometheus early for label discovery) monitoring = cfg.experiment_params.monitoring - if monitoring.deployment_mode == "containerized": + if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: + pass # already started before the controller + elif monitoring.deployment_mode == "containerized": # Containerized deployment (DockerPrometheusService or DockerVictoriaMetricsService) assert isinstance( prometheus_service, From 9952c17679278e53ddff020df9752edbfccf3362 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Wed, 8 Apr 2026 08:58:11 -0400 Subject: [PATCH 5/9] Addewd retry logic to asap-planner for getting metrics/labels from Prometheus --- asap-planner-rs/src/prometheus_client.rs | 112 ++++++++++++++--------- 1 file changed, 70 insertions(+), 42 deletions(-) diff --git a/asap-planner-rs/src/prometheus_client.rs b/asap-planner-rs/src/prometheus_client.rs index fff6d416..b8e8d288 100644 --- a/asap-planner-rs/src/prometheus_client.rs +++ b/asap-planner-rs/src/prometheus_client.rs @@ -1,4 +1,6 @@ use std::collections::HashSet; +use std::thread; +use std::time::Duration; use promql_parser::parser::Expr; use promql_utilities::data_model::KeyByLabelNames; @@ -7,6 +9,11 @@ use tracing::{debug, warn}; use crate::error::ControllerError; +/// Number of times to retry a Prometheus request on 503 (service not yet ready). +const MAX_RETRIES: u32 = 15; +/// Delay between retries. +const RETRY_DELAY: Duration = Duration::from_secs(2); + /// Walk a PromQL AST and collect all metric names referenced by VectorSelectors. fn collect_metric_names(expr: &Expr, names: &mut HashSet) { match expr { @@ -69,65 +76,86 @@ fn fetch_labels_for_metric( ) -> Result>, ControllerError> { let url = format!("{}/api/v1/series", prometheus_url.trim_end_matches('/')); let client = reqwest::blocking::Client::new(); - let response = client - .get(&url) - .query(&[("match[]", metric_name)]) - .send() - .map_err(|e| { + + for attempt in 1..=MAX_RETRIES { + let response = client + .get(&url) + .query(&[("match[]", metric_name)]) + .send() + .map_err(|e| { + ControllerError::PrometheusClient(format!( + "HTTP request failed for metric '{}': {}", + metric_name, e + )) + })?; + + let status = response.status(); + + if status == reqwest::StatusCode::SERVICE_UNAVAILABLE { + warn!( + "Prometheus returned 503 for metric '{}' (attempt {}/{}); retrying in {}s", + metric_name, + attempt, + MAX_RETRIES, + RETRY_DELAY.as_secs(), + ); + thread::sleep(RETRY_DELAY); + continue; + } + + if !status.is_success() { + return Err(ControllerError::PrometheusClient(format!( + "Prometheus returned HTTP {} for metric '{}'", + status, metric_name + ))); + } + + let body: serde_json::Value = response.json().map_err(|e| { ControllerError::PrometheusClient(format!( - "HTTP request failed for metric '{}': {}", + "Failed to parse Prometheus response for metric '{}': {}", metric_name, e )) })?; - if !response.status().is_success() { - return Err(ControllerError::PrometheusClient(format!( - "Prometheus returned HTTP {} for metric '{}'", - response.status(), - metric_name - ))); - } - - let body: serde_json::Value = response.json().map_err(|e| { - ControllerError::PrometheusClient(format!( - "Failed to parse Prometheus response for metric '{}': {}", - metric_name, e - )) - })?; + let data = match body.get("data").and_then(|d| d.as_array()) { + Some(arr) => arr, + None => { + warn!( + "Prometheus returned no 'data' array for metric '{}'; skipping", + metric_name + ); + return Ok(None); + } + }; - let data = match body.get("data").and_then(|d| d.as_array()) { - Some(arr) => arr, - None => { + if data.is_empty() { warn!( - "Prometheus returned no 'data' array for metric '{}'; skipping", + "Prometheus returned no series for metric '{}' in the last 5 minutes; skipping", metric_name ); return Ok(None); } - }; - - if data.is_empty() { - warn!( - "Prometheus returned no series for metric '{}' in the last 5 minutes; skipping", - metric_name - ); - return Ok(None); - } - // Collect all unique label key names across all returned series, - // filtering out internal __*__ labels. - let mut label_keys: HashSet = HashSet::new(); - for series in data { - if let Some(labels) = series.as_object() { - for key in labels.keys() { - if !key.starts_with("__") { - label_keys.insert(key.clone()); + // Collect all unique label key names across all returned series, + // filtering out internal __*__ labels. + let mut label_keys: HashSet = HashSet::new(); + for series in data { + if let Some(labels) = series.as_object() { + for key in labels.keys() { + if !key.starts_with("__") { + label_keys.insert(key.clone()); + } } } } + + return Ok(Some(label_keys.into_iter().collect())); } - Ok(Some(label_keys.into_iter().collect())) + Err(ControllerError::PrometheusClient(format!( + "Prometheus returned 503 for metric '{}' after {} attempts; giving up", + metric_name, MAX_RETRIES + ))) } /// Build a `PromQLSchema` by querying Prometheus for each metric name found in the given From 483e15a36b7a8fc609796e9914030af5f227549e Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Wed, 8 Apr 2026 13:46:49 -0400 Subject: [PATCH 6/9] Added some delay to Prometheus and modified remote write config to have a limit --- asap-quickstart/config/prometheus.yml | 3 +++ asap-quickstart/docker-compose.yml | 6 ++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/asap-quickstart/config/prometheus.yml b/asap-quickstart/config/prometheus.yml index eb853f23..034a1a6e 100644 --- a/asap-quickstart/config/prometheus.yml +++ b/asap-quickstart/config/prometheus.yml @@ -9,6 +9,9 @@ remote_write: - url: http://arroyo:9091/receive queue_config: batch_send_deadline: 1s + # Drop samples older than 5 minutes before enqueuing — prevents WAL replay + # on restart from flooding Arroyo with historical data. + sample_age_limit: 5m write_relabel_configs: - source_labels: [__name__] regex: sensor_reading diff --git a/asap-quickstart/docker-compose.yml b/asap-quickstart/docker-compose.yml index 299b5e85..3cadae43 100644 --- a/asap-quickstart/docker-compose.yml +++ b/asap-quickstart/docker-compose.yml @@ -132,10 +132,11 @@ services: - "--web.console.templates=/usr/share/prometheus/consoles" - "--web.enable-lifecycle" healthcheck: - test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:9090/-/healthy || exit 1"] + test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:9090/-/ready || exit 1"] interval: 10s timeout: 5s - retries: 5 + retries: 10 + start_period: 3m restart: no grafana: @@ -180,6 +181,7 @@ services: - "--streaming_engine=arroyo" - "--range-duration=300" - "--step=10" + - "-v" volumes: - ./config/controller-config.yaml:/config/controller-config.yaml:ro - asap-planner-output:/asap-planner-output From 4648a2057328aa8fa61f0a3be31472b9d3f6a868 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Wed, 8 Apr 2026 15:51:28 -0400 Subject: [PATCH 7/9] Changed CI to NOT use auto-discovery of metrics/labels --- asap-planner-rs/src/lib.rs | 18 ++++++++++++- asap-planner-rs/src/main.rs | 27 ++++++++++++------- .../installation/setup_dependencies.sh | 0 asap-quickstart/docker-compose.yml | 7 +++-- 4 files changed, 38 insertions(+), 14 deletions(-) mode change 100644 => 100755 asap-query-engine/installation/setup_dependencies.sh diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index a0608c30..5c513f90 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -363,11 +363,27 @@ impl Controller { /// or when the schema is constructed in-process by the caller). pub fn from_file_with_schema( path: &Path, - schema: PromQLSchema, + mut schema: PromQLSchema, opts: RuntimeOptions, ) -> Result { let yaml_str = std::fs::read_to_string(path)?; let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?; + // Fill in any metrics missing from the caller-supplied schema using the + // config-file `metrics` hint (if present). + if let Some(metric_hints) = &config.metrics { + for hint in metric_hints { + if !schema.config.contains_key(&hint.metric) { + debug!( + "Schema missing '{}'; falling back to config-file hint with labels {:?}", + hint.metric, hint.labels + ); + schema = schema.add_metric( + hint.metric.clone(), + KeyByLabelNames::new(hint.labels.clone()), + ); + } + } + } Ok(Self { config, schema, diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index fb08239b..c74b03c8 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -21,7 +21,9 @@ struct Args { prometheus_scrape_interval: Option, /// Base URL of the Prometheus instance used to auto-infer metric label sets. - /// Required for PromQL mode. Example: http://localhost:9090 + /// Optional: when provided, the planner queries Prometheus for label discovery. + /// When absent, labels are taken from the `metrics` hint in the config file. + /// Example: http://localhost:9090 #[arg(long = "prometheus-url", required = false)] prometheus_url: Option, @@ -76,9 +78,6 @@ fn main() -> anyhow::Result<()> { let scrape_interval = args.prometheus_scrape_interval.ok_or_else(|| { anyhow::anyhow!("--prometheus_scrape_interval is required for PromQL mode") })?; - let prometheus_url = args - .prometheus_url - .ok_or_else(|| anyhow::anyhow!("--prometheus-url is required for PromQL mode"))?; let opts = RuntimeOptions { prometheus_scrape_interval: scrape_interval, streaming_engine: engine, @@ -86,13 +85,23 @@ fn main() -> anyhow::Result<()> { range_duration: args.range_duration, step: args.step, }; - let controller = match (args.input_config, args.query_log) { - (Some(config_path), None) => { - Controller::from_file(&config_path, opts, &prometheus_url)? + let controller = match (args.input_config, args.query_log, args.prometheus_url) { + (Some(config_path), None, Some(url)) => { + Controller::from_file(&config_path, opts, &url)? } - (None, Some(log_path)) => { - Controller::from_query_log(&log_path, opts, &prometheus_url)? + (Some(config_path), None, None) => Controller::from_file_with_schema( + &config_path, + asap_planner::PromQLSchema::new(), + opts, + )?, + (None, Some(log_path), Some(url)) => { + Controller::from_query_log(&log_path, opts, &url)? } + (None, Some(log_path), None) => Controller::from_query_log_with_schema( + &log_path, + asap_planner::PromQLSchema::new(), + opts, + )?, _ => anyhow::bail!( "exactly one of --input_config or --query-log must be provided for PromQL mode" ), diff --git a/asap-query-engine/installation/setup_dependencies.sh b/asap-query-engine/installation/setup_dependencies.sh old mode 100644 new mode 100755 diff --git a/asap-quickstart/docker-compose.yml b/asap-quickstart/docker-compose.yml index 3cadae43..9a00d902 100644 --- a/asap-quickstart/docker-compose.yml +++ b/asap-quickstart/docker-compose.yml @@ -137,6 +137,9 @@ services: timeout: 5s retries: 10 start_period: 3m + depends_on: + asap-summary-ingest: + condition: service_completed_successfully restart: no grafana: @@ -177,7 +180,6 @@ services: - "--input_config=/config/controller-config.yaml" - "--output_dir=/asap-planner-output" - "--prometheus_scrape_interval=1" - - "--prometheus-url=http://prometheus:9090" - "--streaming_engine=arroyo" - "--range-duration=300" - "--step=10" @@ -185,9 +187,6 @@ services: volumes: - ./config/controller-config.yaml:/config/controller-config.yaml:ro - asap-planner-output:/asap-planner-output - depends_on: - prometheus: - condition: service_healthy restart: "no" asap-summary-ingest: From 545b90279a83eeee69f469470cd1a2a23b935c4b Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Wed, 8 Apr 2026 16:21:22 -0400 Subject: [PATCH 8/9] refactored changed to from_file_with_schema --- asap-planner-rs/src/lib.rs | 18 +----------------- asap-planner-rs/src/main.rs | 34 ++++++++++++++++++++++++---------- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 5c513f90..a0608c30 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -363,27 +363,11 @@ impl Controller { /// or when the schema is constructed in-process by the caller). pub fn from_file_with_schema( path: &Path, - mut schema: PromQLSchema, + schema: PromQLSchema, opts: RuntimeOptions, ) -> Result { let yaml_str = std::fs::read_to_string(path)?; let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?; - // Fill in any metrics missing from the caller-supplied schema using the - // config-file `metrics` hint (if present). - if let Some(metric_hints) = &config.metrics { - for hint in metric_hints { - if !schema.config.contains_key(&hint.metric) { - debug!( - "Schema missing '{}'; falling back to config-file hint with labels {:?}", - hint.metric, hint.labels - ); - schema = schema.add_metric( - hint.metric.clone(), - KeyByLabelNames::new(hint.labels.clone()), - ); - } - } - } Ok(Self { config, schema, diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index c74b03c8..08426f85 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -1,5 +1,6 @@ use asap_planner::{Controller, RuntimeOptions, SQLController, SQLRuntimeOptions, StreamingEngine}; use clap::Parser; +use promql_utilities::data_model::KeyByLabelNames; use sketch_db_common::enums::QueryLanguage; use std::path::PathBuf; @@ -56,6 +57,19 @@ enum EngineArg { Precompute, } +/// Build a `PromQLSchema` from the `metrics` hints in a controller config file. +fn schema_from_config_file(path: &PathBuf) -> anyhow::Result { + let yaml_str = std::fs::read_to_string(path)?; + let config: asap_planner::ControllerConfig = serde_yaml::from_str(&yaml_str)?; + let mut schema = asap_planner::PromQLSchema::new(); + if let Some(metrics) = &config.metrics { + for m in metrics { + schema = schema.add_metric(m.metric.clone(), KeyByLabelNames::new(m.labels.clone())); + } + } + Ok(schema) +} + fn main() -> anyhow::Result<()> { let args = Args::parse(); @@ -89,19 +103,19 @@ fn main() -> anyhow::Result<()> { (Some(config_path), None, Some(url)) => { Controller::from_file(&config_path, opts, &url)? } - (Some(config_path), None, None) => Controller::from_file_with_schema( - &config_path, - asap_planner::PromQLSchema::new(), - opts, - )?, + (Some(config_path), None, None) => { + let schema = schema_from_config_file(&config_path)?; + Controller::from_file_with_schema(&config_path, schema, opts)? + } (None, Some(log_path), Some(url)) => { Controller::from_query_log(&log_path, opts, &url)? } - (None, Some(log_path), None) => Controller::from_query_log_with_schema( - &log_path, - asap_planner::PromQLSchema::new(), - opts, - )?, + (None, Some(_log_path), None) => { + anyhow::bail!( + "--prometheus-url is required when using --query-log \ + (query logs have no metrics hint to fall back on)" + ) + } _ => anyhow::bail!( "exactly one of --input_config or --query-log must be provided for PromQL mode" ), From 5105000da90f51a020e03c3c025387b00c47de42 Mon Sep 17 00:00:00 2001 From: Milind Srivastava Date: Wed, 8 Apr 2026 16:36:47 -0400 Subject: [PATCH 9/9] refactor --- asap-planner-rs/src/config/input.rs | 17 ++++++++++++++++ asap-planner-rs/src/main.rs | 18 +++-------------- asap-query-engine/src/planner_client.rs | 26 ++++--------------------- 3 files changed, 24 insertions(+), 37 deletions(-) diff --git a/asap-planner-rs/src/config/input.rs b/asap-planner-rs/src/config/input.rs index 699fc884..cc35c99b 100644 --- a/asap-planner-rs/src/config/input.rs +++ b/asap-planner-rs/src/config/input.rs @@ -1,4 +1,6 @@ +use promql_utilities::data_model::KeyByLabelNames; use serde::Deserialize; +use sketch_db_common::PromQLSchema; #[derive(Debug, Clone, Deserialize)] #[serde(deny_unknown_fields)] @@ -12,6 +14,21 @@ pub struct ControllerConfig { pub metrics: Option>, } +impl ControllerConfig { + /// Build a `PromQLSchema` from the `metrics` hints in this config. + /// Returns an empty schema if no hints are present. + pub fn schema_from_hints(&self) -> PromQLSchema { + let mut schema = PromQLSchema::new(); + if let Some(metrics) = &self.metrics { + for m in metrics { + schema = + schema.add_metric(m.metric.clone(), KeyByLabelNames::new(m.labels.clone())); + } + } + schema + } +} + #[derive(Debug, Clone, Deserialize)] pub struct QueryGroup { pub id: Option, diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index 08426f85..bd7fcba1 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -1,6 +1,5 @@ use asap_planner::{Controller, RuntimeOptions, SQLController, SQLRuntimeOptions, StreamingEngine}; use clap::Parser; -use promql_utilities::data_model::KeyByLabelNames; use sketch_db_common::enums::QueryLanguage; use std::path::PathBuf; @@ -57,19 +56,6 @@ enum EngineArg { Precompute, } -/// Build a `PromQLSchema` from the `metrics` hints in a controller config file. -fn schema_from_config_file(path: &PathBuf) -> anyhow::Result { - let yaml_str = std::fs::read_to_string(path)?; - let config: asap_planner::ControllerConfig = serde_yaml::from_str(&yaml_str)?; - let mut schema = asap_planner::PromQLSchema::new(); - if let Some(metrics) = &config.metrics { - for m in metrics { - schema = schema.add_metric(m.metric.clone(), KeyByLabelNames::new(m.labels.clone())); - } - } - Ok(schema) -} - fn main() -> anyhow::Result<()> { let args = Args::parse(); @@ -104,7 +90,9 @@ fn main() -> anyhow::Result<()> { Controller::from_file(&config_path, opts, &url)? } (Some(config_path), None, None) => { - let schema = schema_from_config_file(&config_path)?; + let yaml_str = std::fs::read_to_string(&config_path)?; + let config: asap_planner::ControllerConfig = serde_yaml::from_str(&yaml_str)?; + let schema = config.schema_from_hints(); Controller::from_file_with_schema(&config_path, schema, opts)? } (None, Some(log_path), Some(url)) => { diff --git a/asap-query-engine/src/planner_client.rs b/asap-query-engine/src/planner_client.rs index 2d9c0bcf..48c56b56 100644 --- a/asap-query-engine/src/planner_client.rs +++ b/asap-query-engine/src/planner_client.rs @@ -1,6 +1,5 @@ use anyhow::Result; -use asap_planner::{Controller, ControllerConfig, PlannerOutput, PromQLSchema, RuntimeOptions}; -use promql_utilities::data_model::KeyByLabelNames; +use asap_planner::{Controller, ControllerConfig, PlannerOutput, RuntimeOptions}; use sketch_db_common::enums::QueryLanguage; use sketch_db_common::inference_config::InferenceConfig; use sketch_db_common::streaming_config::StreamingConfig; @@ -37,13 +36,7 @@ impl PlannerClient for LocalPlannerClient { let query_language = self.query_language; let output: PlannerOutput = tokio::task::spawn_blocking(move || { - let mut schema = PromQLSchema::new(); - if let Some(metrics) = &config.metrics { - for m in metrics { - schema = - schema.add_metric(m.metric.clone(), KeyByLabelNames::new(m.labels.clone())); - } - } + let schema = config.schema_from_hints(); let controller = Controller::new(config, schema, opts); controller.generate() }) @@ -91,17 +84,6 @@ mod tests { } } - fn sample_schema(config: &ControllerConfig) -> PromQLSchema { - let mut schema = PromQLSchema::new(); - if let Some(metrics) = &config.metrics { - for m in metrics { - schema = - schema.add_metric(m.metric.clone(), KeyByLabelNames::new(m.labels.clone())); - } - } - schema - } - fn sample_runtime_options() -> RuntimeOptions { RuntimeOptions { prometheus_scrape_interval: 15, @@ -115,7 +97,7 @@ mod tests { #[test] fn test_controller_new_generate() { let config = sample_controller_config(); - let schema = sample_schema(&config); + let schema = config.schema_from_hints(); let opts = sample_runtime_options(); let controller = Controller::new(config, schema, opts); let output = controller.generate().expect("generate should succeed"); @@ -127,7 +109,7 @@ mod tests { #[test] fn test_planner_output_struct_accessors() { let config = sample_controller_config(); - let schema = sample_schema(&config); + let schema = config.schema_from_hints(); let opts = sample_runtime_options(); let controller = Controller::new(config, schema, opts); let output = controller.generate().expect("generate should succeed");