Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions asap-planner-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ clap.workspace = true
indexmap.workspace = true
chrono.workspace = true
promql-parser = "0.5.0"
reqwest = { version = "0.11", features = ["blocking", "json"] }

[dev-dependencies]
tempfile = "3.20"
Expand Down
3 changes: 2 additions & 1 deletion asap-planner-rs/docker-compose.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ services:
"--input_config", "/app/input/config.yaml",
"--output_dir", "/app/output",
"--prometheus_scrape_interval", "{{ prometheus_scrape_interval }}",
"--streaming_engine", "{{ streaming_engine }}"{% if punting %},
"--streaming_engine", "{{ streaming_engine }}",
"--prometheus-url", "{{ prometheus_url }}"{% if punting %},
"--enable-punting"{% endif %}
]
restart: no
23 changes: 22 additions & 1 deletion asap-planner-rs/src/config/input.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
use promql_utilities::data_model::KeyByLabelNames;
use serde::Deserialize;
use sketch_db_common::PromQLSchema;

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ControllerConfig {
pub query_groups: Vec<QueryGroup>,
pub metrics: Vec<MetricDefinition>,
pub sketch_parameters: Option<SketchParameterOverrides>,
pub aggregate_cleanup: Option<AggregateCleanupConfig>,
/// Optional hint: per-metric label sets used as a fallback when Prometheus
/// returns no series for a metric. Prometheus-inferred labels take priority.
#[serde(default)]
pub metrics: Option<Vec<MetricDefinition>>,
}

impl ControllerConfig {
/// Build a `PromQLSchema` from the `metrics` hints in this config.
/// Returns an empty schema if no hints are present.
pub fn schema_from_hints(&self) -> PromQLSchema {
let mut schema = PromQLSchema::new();
if let Some(metrics) = &self.metrics {
for m in metrics {
schema =
schema.add_metric(m.metric.clone(), KeyByLabelNames::new(m.labels.clone()));
}
}
schema
}
}

#[derive(Debug, Clone, Deserialize)]
Expand Down
2 changes: 2 additions & 0 deletions asap-planner-rs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ pub enum ControllerError {
SqlParse(String),
#[error("Unknown table: {0}")]
UnknownTable(String),
#[error("Prometheus client error: {0}")]
PrometheusClient(String),
}
120 changes: 106 additions & 14 deletions asap-planner-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ pub mod config;
pub mod error;
pub mod output;
pub mod planner;
pub mod prometheus_client;
pub mod query_log;

use promql_utilities::data_model::KeyByLabelNames;
use serde_yaml::Value as YamlValue;
use sketch_db_common::enums::QueryLanguage;
use sketch_db_common::inference_config::InferenceConfig;
use sketch_db_common::streaming_config::StreamingConfig;
use std::path::Path;
use tracing::debug;

pub use config::input::ControllerConfig;
pub use config::input::SQLControllerConfig;
pub use error::ControllerError;
pub use output::generator::{GeneratorOutput, PuntedQuery};
pub use output::sql_generator::SQLRuntimeOptions;
pub use prometheus_client::build_schema_from_prometheus;
pub use sketch_db_common::PromQLSchema;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamingEngine {
Expand All @@ -34,6 +39,7 @@ pub struct RuntimeOptions {

pub struct Controller {
config: ControllerConfig,
schema: PromQLSchema,
options: RuntimeOptions,
}

Expand Down Expand Up @@ -302,49 +308,135 @@ impl SQLController {
}

impl Controller {
pub fn new(config: ControllerConfig, options: RuntimeOptions) -> Self {
Self { config, options }
pub fn new(config: ControllerConfig, schema: PromQLSchema, options: RuntimeOptions) -> Self {
Self {
config,
schema,
options,
}
}

pub fn from_file(path: &Path, opts: RuntimeOptions) -> Result<Self, ControllerError> {
/// Build a `Controller` from a config file, fetching metric labels from Prometheus.
///
/// `prometheus_url` is queried via `GET /api/v1/series?match[]=<metric>` for each metric
/// name found in the config's PromQL queries.
pub fn from_file(
path: &Path,
opts: RuntimeOptions,
prometheus_url: &str,
) -> Result<Self, ControllerError> {
let yaml_str = std::fs::read_to_string(path)?;
Self::from_yaml(&yaml_str, opts)
let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?;
let all_queries: Vec<String> = config
.query_groups
.iter()
.flat_map(|qg| qg.queries.clone())
.collect();
let mut schema =
prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?;
// For any metric that Prometheus had no series for, fall back to the
// `metrics` hint in the config file (if present).
if let Some(metric_hints) = &config.metrics {
for hint in metric_hints {
if !schema.config.contains_key(&hint.metric) {
debug!(
"Prometheus had no series for '{}'; falling back to config-file hint with labels {:?}",
hint.metric, hint.labels
);
schema = schema.add_metric(
hint.metric.clone(),
KeyByLabelNames::new(hint.labels.clone()),
);
}
}
}
Ok(Self {
config,
schema,
options: opts,
})
}

/// Build a `Controller` from a config file with a caller-supplied `PromQLSchema`.
///
/// Use this when the schema is available without querying Prometheus (e.g. in tests
/// or when the schema is constructed in-process by the caller).
pub fn from_file_with_schema(
path: &Path,
schema: PromQLSchema,
opts: RuntimeOptions,
) -> Result<Self, ControllerError> {
let yaml_str = std::fs::read_to_string(path)?;
let config: ControllerConfig = serde_yaml::from_str(&yaml_str)?;
Ok(Self {
config,
schema,
options: opts,
})
}

pub fn from_yaml(yaml: &str, opts: RuntimeOptions) -> Result<Self, ControllerError> {
/// Build a `Controller` from a YAML string with a caller-supplied `PromQLSchema`.
pub fn from_yaml_with_schema(
yaml: &str,
schema: PromQLSchema,
opts: RuntimeOptions,
) -> Result<Self, ControllerError> {
let config: ControllerConfig = serde_yaml::from_str(yaml)?;
Ok(Self {
config,
schema,
options: opts,
})
}

/// Build a `Controller` from a Prometheus query log file and a metrics config YAML.
/// Build a `Controller` from a Prometheus query log file, fetching metric labels from
/// Prometheus.
///
/// - `log_path`: newline-delimited JSON query log (Prometheus `--query.log-file` output)
/// - `metrics_path`: YAML file with a `metrics:` section listing metric names and labels
/// - `prometheus_url`: base URL queried for label discovery
pub fn from_query_log(
log_path: &Path,
metrics_path: &Path,
opts: RuntimeOptions,
prometheus_url: &str,
) -> Result<Self, ControllerError> {
let entries = query_log::parse_log_file(log_path)?;
let (instants, ranges) =
query_log::infer_queries(&entries, opts.prometheus_scrape_interval);
let config = query_log::to_controller_config(instants, ranges);
let all_queries: Vec<String> = config
.query_groups
.iter()
.flat_map(|qg| qg.queries.clone())
.collect();
let schema = prometheus_client::build_schema_from_prometheus(prometheus_url, &all_queries)?;
Ok(Self {
config,
schema,
options: opts,
})
}

let metrics_yaml = std::fs::read_to_string(metrics_path)?;
let metrics_config: query_log::MetricsConfig = serde_yaml::from_str(&metrics_yaml)?;

let config = query_log::to_controller_config(instants, ranges, metrics_config.metrics);

/// Build a `Controller` from a Prometheus query log file with a caller-supplied `PromQLSchema`.
///
/// Use this when the schema is available without querying Prometheus (e.g. in tests).
pub fn from_query_log_with_schema(
log_path: &Path,
schema: PromQLSchema,
opts: RuntimeOptions,
) -> Result<Self, ControllerError> {
let entries = query_log::parse_log_file(log_path)?;
let (instants, ranges) =
query_log::infer_queries(&entries, opts.prometheus_scrape_interval);
let config = query_log::to_controller_config(instants, ranges);
Ok(Self {
config,
schema,
options: opts,
})
}

pub fn generate(&self) -> Result<PlannerOutput, ControllerError> {
let output = output::generator::generate_plan(&self.config, &self.options)?;
let output = output::generator::generate_plan(&self.config, &self.schema, &self.options)?;
Ok(PlannerOutput {
punted_queries: output.punted_queries,
streaming_yaml: output.streaming_yaml,
Expand Down
36 changes: 25 additions & 11 deletions asap-planner-rs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ struct Args {
#[arg(long = "query-log", conflicts_with = "input_config")]
query_log: Option<PathBuf>,

/// Path to a metrics config YAML (required when using --query-log).
#[arg(long = "metrics-config", requires = "query_log")]
metrics_config: Option<PathBuf>,

#[arg(long = "output_dir")]
output_dir: PathBuf,

#[arg(long = "prometheus_scrape_interval", required = false)]
prometheus_scrape_interval: Option<u64>,

/// Base URL of the Prometheus instance used to auto-infer metric label sets.
/// Optional: when provided, the planner queries Prometheus for label discovery.
/// When absent, labels are taken from the `metrics` hint in the config file.
/// Example: http://localhost:9090
#[arg(long = "prometheus-url", required = false)]
prometheus_url: Option<String>,

#[arg(long = "streaming_engine", value_enum)]
streaming_engine: EngineArg,

Expand Down Expand Up @@ -82,13 +85,24 @@ fn main() -> anyhow::Result<()> {
range_duration: args.range_duration,
step: args.step,
};
let controller = match (args.input_config, args.query_log) {
(Some(config_path), None) => Controller::from_file(&config_path, opts)?,
(None, Some(log_path)) => {
let metrics_path = args
.metrics_config
.expect("--metrics-config is required when using --query-log");
Controller::from_query_log(&log_path, &metrics_path, opts)?
let controller = match (args.input_config, args.query_log, args.prometheus_url) {
(Some(config_path), None, Some(url)) => {
Controller::from_file(&config_path, opts, &url)?
}
(Some(config_path), None, None) => {
let yaml_str = std::fs::read_to_string(&config_path)?;
let config: asap_planner::ControllerConfig = serde_yaml::from_str(&yaml_str)?;
let schema = config.schema_from_hints();
Controller::from_file_with_schema(&config_path, schema, opts)?
}
(None, Some(log_path), Some(url)) => {
Controller::from_query_log(&log_path, opts, &url)?
}
(None, Some(_log_path), None) => {
anyhow::bail!(
"--prometheus-url is required when using --query-log \
(query logs have no metrics hint to fall back on)"
)
}
_ => anyhow::bail!(
"exactly one of --input_config or --query-log must be provided for PromQL mode"
Expand Down
9 changes: 3 additions & 6 deletions asap-planner-rs/src/output/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashMap;

use promql_utilities::data_model::KeyByLabelNames;
use sketch_db_common::enums::CleanupPolicy;
use sketch_db_common::PromQLSchema;

use crate::config::input::ControllerConfig;
use crate::error::ControllerError;
Expand All @@ -17,14 +18,10 @@ type LeafEntries = Vec<(String, Vec<(String, Option<u64>)>)>;
/// Run the full planning pipeline and produce YAML outputs
pub fn generate_plan(
controller_config: &ControllerConfig,
schema: &PromQLSchema,
opts: &RuntimeOptions,
) -> Result<GeneratorOutput, ControllerError> {
// Build metric schema
let mut metric_schema = sketch_db_common::PromQLSchema::new();
for md in &controller_config.metrics {
metric_schema =
metric_schema.add_metric(md.metric.clone(), KeyByLabelNames::new(md.labels.clone()));
}
let metric_schema = schema.clone();

// Determine cleanup policy
let cleanup_policy_str = controller_config
Expand Down
Loading
Loading