diff --git a/asap-query-engine/examples/engine_config.yaml b/asap-query-engine/examples/engine_config.yaml index eee867a..753ccfd 100644 --- a/asap-query-engine/examples/engine_config.yaml +++ b/asap-query-engine/examples/engine_config.yaml @@ -12,10 +12,6 @@ output_dir: "./output" log_level: "INFO" # DEBUG | INFO | WARN | ERROR (also respects RUST_LOG) -# Query language used for both ingest and query parsing. -# Values are case-sensitive: use lowercase exactly as shown. -query_language: "promql" # promql | sql | elastic_querydsl | elastic_sql - # Prometheus scrape interval in seconds. Used by the query tracker and planner. prometheus_scrape_interval: 15 @@ -33,12 +29,37 @@ do_profiling: false http_server: port: 8088 - # Prometheus server used for query forwarding and planner context. - prometheus_server: "http://localhost:9090" +# --------------------------------------------------------------------------- +# DB backend — determines the query protocol and optional fallback forwarding. +# Choose exactly one type. +# --------------------------------------------------------------------------- - # When true, queries not answerable from sketches are forwarded to prometheus_server. - # The server must be reachable at startup when this is enabled. - forward_unsupported_queries: false +# Prometheus (default) — exposes a PromQL-compatible HTTP API. +backend: + type: "prometheus" + server: "http://localhost:9090" # used for forwarding and planner context + forward_unsupported_queries: false # when true, server must be reachable at startup + +# ClickHouse — exposes an SQL-over-HTTP API. +# backend: +# type: "clickhouse" +# url: "http://localhost:8123" +# database: "default" +# forward_unsupported_queries: false + +# Elasticsearch (QueryDSL): +# backend: +# type: "elastic_querydsl" +# url: "http://localhost:9200" +# index: "metrics-*" # required +# forward_unsupported_queries: false + +# Elasticsearch (SQL): +# backend: +# type: "elastic_sql" +# url: "http://localhost:9200" +# index: "metrics-*" # required +# forward_unsupported_queries: false # --------------------------------------------------------------------------- # Store @@ -98,7 +119,7 @@ precompute_engine: dump_precomputes: false # dump received precomputes to output_dir for debugging # --------------------------------------------------------------------------- -# Query tracker / planner (optional) +# Query tracker / planner (optional — Prometheus backend only) # --------------------------------------------------------------------------- query_tracker: diff --git a/asap-query-engine/src/engine_config.rs b/asap-query-engine/src/engine_config.rs index 09b51d5..e39be19 100644 --- a/asap-query-engine/src/engine_config.rs +++ b/asap-query-engine/src/engine_config.rs @@ -33,6 +33,10 @@ pub fn check_config(config: &EngineConfig) -> Result<(), String> { return Err("prometheus_scrape_interval must be greater than 0".into()); } + if config.query_tracker.enabled && !matches!(config.backend, BackendConfig::Prometheus { .. }) { + return Err("query_tracker.enabled=true requires backend.type=prometheus".into()); + } + Ok(()) } @@ -41,11 +45,11 @@ pub fn check_config(config: &EngineConfig) -> Result<(), String> { pub struct EngineConfig { pub output_dir: String, pub log_level: String, - pub query_language: QueryLanguage, pub prometheus_scrape_interval: u64, pub streaming_engine: StreamingEngine, pub do_profiling: bool, pub http_server: HttpServerSettings, + pub backend: BackendConfig, pub store: StoreSettings, pub ingest: IngestConfig, pub precompute_engine: PrecomputeSettings, @@ -60,11 +64,11 @@ impl Default for EngineConfig { Self { output_dir: "./output".to_string(), log_level: "INFO".to_string(), - query_language: QueryLanguage::promql, prometheus_scrape_interval: 15, streaming_engine: StreamingEngine::Precompute, do_profiling: false, http_server: HttpServerSettings::default(), + backend: BackendConfig::default(), store: StoreSettings::default(), ingest: IngestConfig::default(), precompute_engine: PrecomputeSettings::default(), @@ -80,20 +84,117 @@ impl Default for EngineConfig { #[serde(default)] pub struct HttpServerSettings { pub port: u16, - pub prometheus_server: String, - pub forward_unsupported_queries: bool, } impl Default for HttpServerSettings { fn default() -> Self { - Self { - port: 8088, - prometheus_server: "http://localhost:9090".to_string(), + Self { port: 8088 } + } +} + +/// Which DB backend the query server exposes and optionally forwards to. +#[derive(Debug, serde::Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum BackendConfig { + Prometheus { + /// Prometheus server URL used for query forwarding and planner context. + #[serde(default = "default_prometheus_server")] + server: String, + /// When true, queries not answerable from sketches are forwarded to `server`. + /// The server must be reachable at startup. + #[serde(default)] + forward_unsupported_queries: bool, + }, + Clickhouse { + /// ClickHouse HTTP interface base URL. + #[serde(default = "default_clickhouse_url")] + url: String, + /// ClickHouse database name. + #[serde(default = "default_clickhouse_database")] + database: String, + /// When true, queries not answerable from sketches are forwarded to `url`. + #[serde(default)] + forward_unsupported_queries: bool, + }, + ElasticQuerydsl { + /// Elasticsearch base URL. + #[serde(default = "default_elastic_url")] + url: String, + /// Elasticsearch index pattern to query. + index: String, + /// When true, queries not answerable from sketches are forwarded to `url`. + #[serde(default)] + forward_unsupported_queries: bool, + }, + ElasticSql { + /// Elasticsearch base URL. + #[serde(default = "default_elastic_url")] + url: String, + /// Elasticsearch index pattern to query. + index: String, + /// When true, queries not answerable from sketches are forwarded to `url`. + #[serde(default)] + forward_unsupported_queries: bool, + }, +} + +impl Default for BackendConfig { + fn default() -> Self { + BackendConfig::Prometheus { + server: default_prometheus_server(), forward_unsupported_queries: false, } } } +impl BackendConfig { + pub fn query_language(&self) -> QueryLanguage { + match self { + BackendConfig::Prometheus { .. } => QueryLanguage::promql, + BackendConfig::Clickhouse { .. } => QueryLanguage::sql, + BackendConfig::ElasticQuerydsl { .. } => QueryLanguage::elastic_querydsl, + BackendConfig::ElasticSql { .. } => QueryLanguage::elastic_sql, + } + } + + pub fn forward_unsupported_queries(&self) -> bool { + match self { + BackendConfig::Prometheus { + forward_unsupported_queries, + .. + } + | BackendConfig::Clickhouse { + forward_unsupported_queries, + .. + } + | BackendConfig::ElasticQuerydsl { + forward_unsupported_queries, + .. + } + | BackendConfig::ElasticSql { + forward_unsupported_queries, + .. + } => *forward_unsupported_queries, + } + } +} + +fn default_prometheus_server() -> String { + "http://localhost:9090".to_string() +} + +fn default_clickhouse_url() -> String { + "http://localhost:8123".to_string() +} + +fn default_clickhouse_database() -> String { + "default".to_string() +} + +fn default_elastic_url() -> String { + "http://localhost:9200".to_string() +} + #[derive(Debug, serde::Deserialize)] #[serde(default)] pub struct StoreSettings { @@ -374,4 +475,157 @@ output_dir: "./output" let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap(); assert!(check_config(&config).is_err()); } + + #[test] + fn backend_defaults_to_prometheus() { + let config: EngineConfig = Figment::new() + .merge(Yaml::string(MINIMAL_YAML)) + .extract() + .unwrap(); + assert!(matches!(config.backend, BackendConfig::Prometheus { .. })); + assert_eq!(config.backend.query_language(), QueryLanguage::promql); + assert!(!config.backend.forward_unsupported_queries()); + } + + #[test] + fn backend_clickhouse_parses() { + let yaml = r#" +streaming_engine: "precompute" +ingest: + type: "http_remote_write" + port: 9090 +output_dir: "./output" +backend: + type: "clickhouse" + url: "http://clickhouse:8123" + database: "metrics" +"#; + let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap(); + assert!(matches!(config.backend, BackendConfig::Clickhouse { .. })); + assert_eq!(config.backend.query_language(), QueryLanguage::sql); + assert!(!config.backend.forward_unsupported_queries()); + } + + #[test] + fn backend_clickhouse_defaults_url_and_database() { + let yaml = r#" +streaming_engine: "precompute" +ingest: + type: "http_remote_write" + port: 9090 +output_dir: "./output" +backend: + type: "clickhouse" +"#; + let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap(); + if let BackendConfig::Clickhouse { url, database, .. } = &config.backend { + assert_eq!(url, "http://localhost:8123"); + assert_eq!(database, "default"); + } else { + panic!("expected Clickhouse backend"); + } + } + + #[test] + fn backend_elastic_querydsl_parses() { + let yaml = r#" +streaming_engine: "precompute" +ingest: + type: "http_remote_write" + port: 9090 +output_dir: "./output" +backend: + type: "elastic_querydsl" + url: "http://elastic:9200" + index: "metrics-*" +"#; + let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap(); + assert!(matches!( + config.backend, + BackendConfig::ElasticQuerydsl { .. } + )); + assert_eq!( + config.backend.query_language(), + QueryLanguage::elastic_querydsl + ); + } + + #[test] + fn backend_elastic_sql_parses() { + let yaml = r#" +streaming_engine: "precompute" +ingest: + type: "http_remote_write" + port: 9090 +output_dir: "./output" +backend: + type: "elastic_sql" + url: "http://elastic:9200" + index: "metrics-*" +"#; + let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap(); + assert!(matches!(config.backend, BackendConfig::ElasticSql { .. })); + assert_eq!(config.backend.query_language(), QueryLanguage::elastic_sql); + } + + #[test] + fn backend_prometheus_explicit_fields() { + let yaml = r#" +streaming_engine: "precompute" +ingest: + type: "http_remote_write" + port: 9090 +output_dir: "./output" +backend: + type: "prometheus" + server: "http://prom:9090" + forward_unsupported_queries: true +"#; + let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap(); + if let BackendConfig::Prometheus { + server, + forward_unsupported_queries, + } = &config.backend + { + assert_eq!(server, "http://prom:9090"); + assert!(forward_unsupported_queries); + } else { + panic!("expected Prometheus backend"); + } + assert!(config.backend.forward_unsupported_queries()); + } + + #[test] + fn check_config_rejects_query_tracker_with_non_prometheus_backend() { + let yaml = r#" +streaming_engine: "precompute" +ingest: + type: "http_remote_write" + port: 9090 +output_dir: "./output" +backend: + type: "clickhouse" +query_tracker: + enabled: true +"#; + let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap(); + assert!(check_config(&config).is_err()); + } + + #[test] + fn check_config_allows_query_tracker_with_prometheus_backend() { + let yaml = r#" +streaming_engine: "precompute" +ingest: + type: "http_remote_write" + port: 9090 +output_dir: "./output" +backend: + type: "prometheus" +query_tracker: + enabled: true +"#; + let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap(); + assert!(check_config(&config).is_ok()); + } } diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index 49b51b4..af0134e 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -1,7 +1,7 @@ mod engine_config; use clap::Parser; -use engine_config::{EngineConfig, IngestConfig}; +use engine_config::{BackendConfig, EngineConfig, IngestConfig}; use figment::{ providers::{Format, Yaml}, Figment, @@ -67,14 +67,16 @@ async fn main() -> Result<()> { info!("Starting Query Engine Rust"); info!("Output directory: {}", config.output_dir); + let query_language = config.backend.query_language(); + let inference_config = match &config.inference_config { Some(path) => { info!("Config file: {}", path); - read_inference_config(path, config.query_language)? + read_inference_config(path, query_language)? } None => { info!("No config file provided; starting with empty inference config"); - InferenceConfig::new(config.query_language, CleanupPolicy::NoCleanup) + InferenceConfig::new(query_language, CleanupPolicy::NoCleanup) } }; info!( @@ -113,7 +115,7 @@ async fn main() -> Result<()> { inference_config, streaming_config.clone(), config.prometheus_scrape_interval, - config.query_language, + query_language, )); // Kafka consumer — only when streaming_engine=arroyo and ingest.type=kafka. @@ -271,10 +273,35 @@ async fn main() -> Result<()> { None }; - let adapter_config = AdapterConfig::prometheus_promql( - config.http_server.prometheus_server.clone(), - config.http_server.forward_unsupported_queries, - ); + let adapter_config = match &config.backend { + BackendConfig::Prometheus { + server, + forward_unsupported_queries, + } => AdapterConfig::prometheus_promql(server.clone(), *forward_unsupported_queries), + BackendConfig::Clickhouse { + url, + database, + forward_unsupported_queries, + } => AdapterConfig::clickhouse_sql( + url.clone(), + database.clone(), + *forward_unsupported_queries, + ), + BackendConfig::ElasticQuerydsl { + url, + index, + forward_unsupported_queries, + } => AdapterConfig::elastic_querydsl( + url.clone(), + index.clone(), + *forward_unsupported_queries, + ), + BackendConfig::ElasticSql { + url, + index, + forward_unsupported_queries, + } => AdapterConfig::elastic_sql(url.clone(), index.clone(), *forward_unsupported_queries), + }; let http_config = HttpServerConfig { port: config.http_server.port, @@ -282,12 +309,21 @@ async fn main() -> Result<()> { adapter_config, }; - if config.http_server.forward_unsupported_queries { + if config.backend.forward_unsupported_queries() { let client = reqwest::Client::new(); - let health_url = format!( - "{}/api/v1/status/runtimeinfo", - config.http_server.prometheus_server.trim_end_matches('/') - ); + let (health_url, backend_label) = match &config.backend { + BackendConfig::Prometheus { server, .. } => ( + format!("{}/api/v1/status/runtimeinfo", server.trim_end_matches('/')), + server.clone(), + ), + BackendConfig::Clickhouse { url, .. } => { + (format!("{}/ping", url.trim_end_matches('/')), url.clone()) + } + BackendConfig::ElasticQuerydsl { url, .. } | BackendConfig::ElasticSql { url, .. } => ( + format!("{}/_cluster/health", url.trim_end_matches('/')), + url.clone(), + ), + }; match client .get(&health_url) .timeout(std::time::Duration::from_secs(5)) @@ -295,24 +331,18 @@ async fn main() -> Result<()> { .await { Ok(resp) if resp.status().is_success() => { - info!( - "Prometheus reachable at {}", - config.http_server.prometheus_server - ); + info!("Backend reachable at {}", backend_label); } Ok(resp) => { error!( - "Prometheus at {} returned HTTP {} — cannot start", - config.http_server.prometheus_server, + "Backend at {} returned HTTP {} — cannot start", + backend_label, resp.status() ); std::process::exit(1); } Err(e) => { - error!( - "Cannot reach Prometheus at {}: {}", - config.http_server.prometheus_server, e - ); + error!("Cannot reach backend at {}: {}", backend_label, e); std::process::exit(1); } } @@ -333,10 +363,14 @@ async fn main() -> Result<()> { range_duration: 300, step: config.prometheus_scrape_interval, }; + let prometheus_url = match &config.backend { + BackendConfig::Prometheus { server, .. } => server.clone(), + _ => unreachable!("check_config rejects non-prometheus backends with query_tracker"), + }; let planner_client = Arc::new(LocalPlannerClient::new( runtime_options, - config.query_language, - config.http_server.prometheus_server.clone(), + query_language, + prometheus_url, )); let (plan_tx, plan_rx) = tokio::sync::watch::channel(None::); diff --git a/benchmarks/config/engine_config.yaml b/benchmarks/config/engine_config.yaml index eb2e5a6..63c6feb 100644 --- a/benchmarks/config/engine_config.yaml +++ b/benchmarks/config/engine_config.yaml @@ -1,13 +1,15 @@ output_dir: "/app/outputs" log_level: "INFO" -query_language: "promql" prometheus_scrape_interval: 1 streaming_engine: "precompute" do_profiling: false http_server: port: 8088 - prometheus_server: "http://prometheus:9090" + +backend: + type: "prometheus" + server: "http://prometheus:9090" forward_unsupported_queries: true store: