Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions asap-query-engine/examples/engine_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@
output_dir: "./output"
log_level: "INFO" # DEBUG | INFO | WARN | ERROR (also respects RUST_LOG)

# Query language used for both ingest and query parsing.
# Values are case-sensitive: use lowercase exactly as shown.
query_language: "promql" # promql | sql | elastic_querydsl | elastic_sql

# Prometheus scrape interval in seconds. Used by the query tracker and planner.
prometheus_scrape_interval: 15

Expand All @@ -33,12 +29,37 @@ do_profiling: false
http_server:
port: 8088

# Prometheus server used for query forwarding and planner context.
prometheus_server: "http://localhost:9090"
# ---------------------------------------------------------------------------
# DB backend — determines the query protocol and optional fallback forwarding.
# Choose exactly one type.
# ---------------------------------------------------------------------------

# When true, queries not answerable from sketches are forwarded to prometheus_server.
# The server must be reachable at startup when this is enabled.
forward_unsupported_queries: false
# Prometheus (default) — exposes a PromQL-compatible HTTP API.
backend:
type: "prometheus"
server: "http://localhost:9090" # used for forwarding and planner context
forward_unsupported_queries: false # when true, server must be reachable at startup

# ClickHouse — exposes an SQL-over-HTTP API.
# backend:
# type: "clickhouse"
# url: "http://localhost:8123"
# database: "default"
# forward_unsupported_queries: false

# Elasticsearch (QueryDSL):
# backend:
# type: "elastic_querydsl"
# url: "http://localhost:9200"
# index: "metrics-*" # required
# forward_unsupported_queries: false

# Elasticsearch (SQL):
# backend:
# type: "elastic_sql"
# url: "http://localhost:9200"
# index: "metrics-*" # required
# forward_unsupported_queries: false

# ---------------------------------------------------------------------------
# Store
Expand Down Expand Up @@ -98,7 +119,7 @@ precompute_engine:
dump_precomputes: false # dump received precomputes to output_dir for debugging

# ---------------------------------------------------------------------------
# Query tracker / planner (optional)
# Query tracker / planner (optional — Prometheus backend only)
# ---------------------------------------------------------------------------

query_tracker:
Expand Down
268 changes: 261 additions & 7 deletions asap-query-engine/src/engine_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub fn check_config(config: &EngineConfig) -> Result<(), String> {
return Err("prometheus_scrape_interval must be greater than 0".into());
}

if config.query_tracker.enabled && !matches!(config.backend, BackendConfig::Prometheus { .. }) {
return Err("query_tracker.enabled=true requires backend.type=prometheus".into());
}

Ok(())
}

Expand All @@ -41,11 +45,11 @@ pub fn check_config(config: &EngineConfig) -> Result<(), String> {
pub struct EngineConfig {
pub output_dir: String,
pub log_level: String,
pub query_language: QueryLanguage,
pub prometheus_scrape_interval: u64,
pub streaming_engine: StreamingEngine,
pub do_profiling: bool,
pub http_server: HttpServerSettings,
pub backend: BackendConfig,
pub store: StoreSettings,
pub ingest: IngestConfig,
pub precompute_engine: PrecomputeSettings,
Expand All @@ -60,11 +64,11 @@ impl Default for EngineConfig {
Self {
output_dir: "./output".to_string(),
log_level: "INFO".to_string(),
query_language: QueryLanguage::promql,
prometheus_scrape_interval: 15,
streaming_engine: StreamingEngine::Precompute,
do_profiling: false,
http_server: HttpServerSettings::default(),
backend: BackendConfig::default(),
store: StoreSettings::default(),
ingest: IngestConfig::default(),
precompute_engine: PrecomputeSettings::default(),
Expand All @@ -80,20 +84,117 @@ impl Default for EngineConfig {
#[serde(default)]
pub struct HttpServerSettings {
pub port: u16,
pub prometheus_server: String,
pub forward_unsupported_queries: bool,
}

impl Default for HttpServerSettings {
fn default() -> Self {
Self {
port: 8088,
prometheus_server: "http://localhost:9090".to_string(),
Self { port: 8088 }
}
}

/// Which DB backend the query server exposes and optionally forwards to.
#[derive(Debug, serde::Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum BackendConfig {
Prometheus {
/// Prometheus server URL used for query forwarding and planner context.
#[serde(default = "default_prometheus_server")]
server: String,
/// When true, queries not answerable from sketches are forwarded to `server`.
/// The server must be reachable at startup.
#[serde(default)]
forward_unsupported_queries: bool,
},
Clickhouse {
/// ClickHouse HTTP interface base URL.
#[serde(default = "default_clickhouse_url")]
url: String,
/// ClickHouse database name.
#[serde(default = "default_clickhouse_database")]
database: String,
/// When true, queries not answerable from sketches are forwarded to `url`.
#[serde(default)]
forward_unsupported_queries: bool,
},
ElasticQuerydsl {
/// Elasticsearch base URL.
#[serde(default = "default_elastic_url")]
url: String,
/// Elasticsearch index pattern to query.
index: String,
/// When true, queries not answerable from sketches are forwarded to `url`.
#[serde(default)]
forward_unsupported_queries: bool,
},
ElasticSql {
/// Elasticsearch base URL.
#[serde(default = "default_elastic_url")]
url: String,
/// Elasticsearch index pattern to query.
index: String,
/// When true, queries not answerable from sketches are forwarded to `url`.
#[serde(default)]
forward_unsupported_queries: bool,
},
}

impl Default for BackendConfig {
fn default() -> Self {
BackendConfig::Prometheus {
server: default_prometheus_server(),
forward_unsupported_queries: false,
}
}
}

impl BackendConfig {
pub fn query_language(&self) -> QueryLanguage {
match self {
BackendConfig::Prometheus { .. } => QueryLanguage::promql,
BackendConfig::Clickhouse { .. } => QueryLanguage::sql,
BackendConfig::ElasticQuerydsl { .. } => QueryLanguage::elastic_querydsl,
BackendConfig::ElasticSql { .. } => QueryLanguage::elastic_sql,
}
}

pub fn forward_unsupported_queries(&self) -> bool {
match self {
BackendConfig::Prometheus {
forward_unsupported_queries,
..
}
| BackendConfig::Clickhouse {
forward_unsupported_queries,
..
}
| BackendConfig::ElasticQuerydsl {
forward_unsupported_queries,
..
}
| BackendConfig::ElasticSql {
forward_unsupported_queries,
..
} => *forward_unsupported_queries,
}
}
}

fn default_prometheus_server() -> String {
"http://localhost:9090".to_string()
}

fn default_clickhouse_url() -> String {
"http://localhost:8123".to_string()
}

fn default_clickhouse_database() -> String {
"default".to_string()
}

fn default_elastic_url() -> String {
"http://localhost:9200".to_string()
}

#[derive(Debug, serde::Deserialize)]
#[serde(default)]
pub struct StoreSettings {
Expand Down Expand Up @@ -374,4 +475,157 @@ output_dir: "./output"
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
assert!(check_config(&config).is_err());
}

#[test]
fn backend_defaults_to_prometheus() {
let config: EngineConfig = Figment::new()
.merge(Yaml::string(MINIMAL_YAML))
.extract()
.unwrap();
assert!(matches!(config.backend, BackendConfig::Prometheus { .. }));
assert_eq!(config.backend.query_language(), QueryLanguage::promql);
assert!(!config.backend.forward_unsupported_queries());
}

#[test]
fn backend_clickhouse_parses() {
let yaml = r#"
streaming_engine: "precompute"
ingest:
type: "http_remote_write"
port: 9090
output_dir: "./output"
backend:
type: "clickhouse"
url: "http://clickhouse:8123"
database: "metrics"
"#;
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
assert!(matches!(config.backend, BackendConfig::Clickhouse { .. }));
assert_eq!(config.backend.query_language(), QueryLanguage::sql);
assert!(!config.backend.forward_unsupported_queries());
}

#[test]
fn backend_clickhouse_defaults_url_and_database() {
let yaml = r#"
streaming_engine: "precompute"
ingest:
type: "http_remote_write"
port: 9090
output_dir: "./output"
backend:
type: "clickhouse"
"#;
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
if let BackendConfig::Clickhouse { url, database, .. } = &config.backend {
assert_eq!(url, "http://localhost:8123");
assert_eq!(database, "default");
} else {
panic!("expected Clickhouse backend");
}
}

#[test]
fn backend_elastic_querydsl_parses() {
let yaml = r#"
streaming_engine: "precompute"
ingest:
type: "http_remote_write"
port: 9090
output_dir: "./output"
backend:
type: "elastic_querydsl"
url: "http://elastic:9200"
index: "metrics-*"
"#;
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
assert!(matches!(
config.backend,
BackendConfig::ElasticQuerydsl { .. }
));
assert_eq!(
config.backend.query_language(),
QueryLanguage::elastic_querydsl
);
}

#[test]
fn backend_elastic_sql_parses() {
let yaml = r#"
streaming_engine: "precompute"
ingest:
type: "http_remote_write"
port: 9090
output_dir: "./output"
backend:
type: "elastic_sql"
url: "http://elastic:9200"
index: "metrics-*"
"#;
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
assert!(matches!(config.backend, BackendConfig::ElasticSql { .. }));
assert_eq!(config.backend.query_language(), QueryLanguage::elastic_sql);
}

#[test]
fn backend_prometheus_explicit_fields() {
let yaml = r#"
streaming_engine: "precompute"
ingest:
type: "http_remote_write"
port: 9090
output_dir: "./output"
backend:
type: "prometheus"
server: "http://prom:9090"
forward_unsupported_queries: true
"#;
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
if let BackendConfig::Prometheus {
server,
forward_unsupported_queries,
} = &config.backend
{
assert_eq!(server, "http://prom:9090");
assert!(forward_unsupported_queries);
} else {
panic!("expected Prometheus backend");
}
assert!(config.backend.forward_unsupported_queries());
}

#[test]
fn check_config_rejects_query_tracker_with_non_prometheus_backend() {
let yaml = r#"
streaming_engine: "precompute"
ingest:
type: "http_remote_write"
port: 9090
output_dir: "./output"
backend:
type: "clickhouse"
query_tracker:
enabled: true
"#;
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
assert!(check_config(&config).is_err());
}

#[test]
fn check_config_allows_query_tracker_with_prometheus_backend() {
let yaml = r#"
streaming_engine: "precompute"
ingest:
type: "http_remote_write"
port: 9090
output_dir: "./output"
backend:
type: "prometheus"
query_tracker:
enabled: true
"#;
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
assert!(check_config(&config).is_ok());
}
}
Loading
Loading