diff --git a/Cargo.lock b/Cargo.lock index 78b6cfd2..3bd79354 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1679,6 +1679,18 @@ name = "elastic_dsl_utilities" version = "0.3.0" dependencies = [ "chrono", + "elasticsearch-dsl-ast", + "serde", + "serde_json", +] + +[[package]] +name = "elasticsearch-dsl-ast" +version = "0.1.0" +source = "git+https://github.com/ProjectASAP/elasticsearch-dsl-ast#f1e1e3565be1c04d64ba0234def1d3b1042838e9" +dependencies = [ + "chrono", + "num-traits", "serde", "serde_json", ] diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml b/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml index 3726cabb..db353488 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/Cargo.toml @@ -7,3 +7,4 @@ version.workspace = true serde.workspace = true serde_json.workspace = true chrono.workspace = true +elasticsearch-dsl-ast = { version = "0.1.0", git = "https://github.com/ProjectASAP/elasticsearch-dsl-ast" } diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs new file mode 100644 index 00000000..299af3b1 --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/extract_info.rs @@ -0,0 +1,360 @@ +use crate::ast_parsing::query_info::{ + AggregationType, ElasticDSLQueryInfo, FieldName, GroupBySpec, Predicate, TermValue, +}; +use crate::helpers::strip_keyword_suffix; +use elasticsearch_dsl_ast::{self as dsl}; +use serde_json; + +pub fn extract_query_info(query: &str) -> Option { + // Main entry point for extracting relevant information from the parsed query pattern. + let search_request = serde_json::from_str(query).ok()?; + walk_ast_and_extract_info(&search_request) +} + +pub fn parse_query_to_ast(query: &str) -> Option { + serde_json::from_str(query).ok()? +} + +pub fn walk_ast_and_extract_info(ast: &dsl::Search) -> Option { + // Traverses the AST and extracts relevant information for answering sketchable aggregations within ASAPQuery. + // This would involve traversing the AST nodes and applying logic to determine query patterns, labels, statistics, etc. + let query = ast.query.clone(); + let predicates = match query { + Some(dsl::Query::Bool(bool_query)) => { + // Extract information from the bool query + walk_bool_query_and_extract_info(&bool_query) + } + Some(other) => { + // Predicates may just be specified directly without enclosing bool context. + if let Some(predicate) = extract_predicates_from_query(&other) { + vec![predicate] + } else { + Vec::new() + } + } + None => Vec::new(), // Return an empty vector of predicates if no query is specified + }; + let (target_field, aggregation_type, group_by_spec) = + walk_aggregations_and_extract_info(&ast.aggs)?; + Some(ElasticDSLQueryInfo::new( + target_field, + predicates, + group_by_spec, + aggregation_type, + )) +} + +fn walk_bool_query_and_extract_info(bool_query: &dsl::BoolQuery) -> Vec { + // Placeholder for walking the filter context of the AST and extracting relevant information + // This would involve traversing the filter nodes and applying logic to determine label filters, time ranges, etc. + let dsl::QueryCollection(filters) = bool_query.filter.clone(); + let mut predicates = Vec::new(); + for query in filters { + if let Some(predicate) = extract_predicates_from_query(&query) { + predicates.push(predicate); + } + } + predicates +} + +fn extract_predicates_from_query(query: &dsl::Query) -> Option { + // Extract predicate information from a given query node, if it matches supported patterns (term or range queries). + match query { + dsl::Query::Term(term_query) => { + // Extract information from the term query + let field = strip_keyword_suffix(&term_query.field).to_owned(); + let Some(value) = term_query.value.clone() else { + return None; // Skip if term query does not have a value + }; + let Some(term_value) = map_term_to_json_value(&value) else { + return None; // Skip if term query value cannot be mapped to a JSON value + }; + // Process the term query information as needed + Some(Predicate::Term { + field, + value: term_value, + }) + } + + dsl::Query::Range(range_query) => { + // Extract information from the range query + let field = strip_keyword_suffix(&range_query.field).to_owned(); + let gte = range_query.gte.clone(); + let lte = range_query.lte.clone(); + // Process the range query information as needed + let gte_value = gte.as_ref().and_then(map_term_to_json_value); + let lte_value = lte.as_ref().and_then(map_term_to_json_value); + Some(Predicate::Range { + field, + gte: gte_value, + lte: lte_value, + }) + } + _ => { + // Handle other query types + None // Skip unsupported query types + } + } +} + +fn walk_aggregations_and_extract_info( + aggregations: &dsl::Aggregations, +) -> Option<(FieldName, AggregationType, Option)> { + // Traverse the aggregations in the AST and extracting relevant information. Extract the first valid aggregation type found, along with any associated group by specifications. + for agg in aggregations.values() { + match agg { + dsl::Aggregation::MultiTerms(terms_agg) => { + // Extract information from the terms aggregation + let field_names: Vec = terms_agg + .multi_terms + .terms + .iter() + .filter_map(|multi_term| multi_term.field.clone()) + .collect(); + let field_names: Vec = field_names + .iter() + .map(|s| strip_keyword_suffix(s).to_owned()) + .collect(); + if field_names.is_empty() { + return None; // Return None if no valid field names are found in the multi-terms aggregation. + } + let group_by_spec = Some(GroupBySpec::Fields(field_names)); + let (target_field, aggregation_type) = + find_aggregation_info(&terms_agg.aggs.clone())?; + return Some((target_field, aggregation_type, group_by_spec)); + } + dsl::Aggregation::Terms(terms_agg) => { + // Extract information from the terms aggregation + if let Some(field) = terms_agg.terms.field.clone() { + let field = strip_keyword_suffix(&field).to_owned(); + // Process the terms aggregation information as needed + let group_by_spec = Some(GroupBySpec::Fields(vec![field])); + let (target_field, aggregation_type) = + find_aggregation_info(&terms_agg.aggs.clone())?; + return Some((target_field, aggregation_type, group_by_spec)); + } + } + other => { + // Handle other aggregation types + let (target_field, aggregation_type) = extract_aggregation_info(other)?; + return Some((target_field, aggregation_type, None)); + } + } + } + None // Return None if no relevant aggregation information is found +} + +fn find_aggregation_info(aggregations: &dsl::Aggregations) -> Option<(FieldName, AggregationType)> { + // Placeholder for extracting specific information from an aggregation node + if let Some((_, agg)) = aggregations.iter().next() { + let (field, aggregation_type) = extract_aggregation_info(agg)?; + return Some((field, aggregation_type)); + } + None // Return None if no relevant aggregation information is found +} + +fn extract_aggregation_info(agg: &dsl::Aggregation) -> Option<(FieldName, AggregationType)> { + // Extracts the specific aggregation type and target field from the given aggregation node, if it matches supported types (avg, sum, min, max, percentiles). + match agg { + dsl::Aggregation::Avg(avg_agg) => { + let field = strip_keyword_suffix(&avg_agg.avg.field).to_owned(); + let aggregation_type = AggregationType::Avg; + Some((field, aggregation_type)) + } + dsl::Aggregation::Sum(sum_agg) => { + let field = strip_keyword_suffix(&sum_agg.sum.field.clone()?).to_owned(); + let aggregation_type = AggregationType::Sum; + Some((field, aggregation_type)) + } + dsl::Aggregation::Min(min_agg) => { + let field = strip_keyword_suffix(&min_agg.min.field.clone()?).to_owned(); + let aggregation_type = AggregationType::Min; + Some((field, aggregation_type)) + } + dsl::Aggregation::Max(max_agg) => { + let field = strip_keyword_suffix(&max_agg.max.field.clone()?).to_owned(); + let aggregation_type = AggregationType::Max; + Some((field, aggregation_type)) + } + dsl::Aggregation::Percentiles(percentiles_agg) => { + let field = percentiles_agg.percentiles.field.clone(); + let percents = percentiles_agg + .percentiles + .percents + .clone() + .unwrap_or_default(); + let aggregation_type = AggregationType::Percentiles(percents); + Some((field, aggregation_type)) + } + _ => None, // Return None for unsupported aggregation types + } +} + +fn map_term_to_json_value(term: &dsl::Term) -> Option { + // Placeholder for extracting field and value from a term query + match term { + dsl::Term::String(value) => { + let value_str = value.to_string(); // Convert the term value to a string representation + Some(TermValue::String(value_str)) + } + dsl::Term::Float32(value) => Some(TermValue::Float(*value as f64)), + dsl::Term::Float64(value) => Some(TermValue::Float(*value)), + dsl::Term::PositiveNumber(value) => Some(TermValue::UnsignedInt(*value)), + dsl::Term::NegativeNumber(value) => Some(TermValue::Int(*value)), + dsl::Term::Boolean(value) => Some(TermValue::Boolean(*value)), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_query_to_ast_parses_valid_search_request() { + let query = r#" + { + "query": { + "bool": { + "filter": [ + { "term": { "service.keyword": { "value": "frontend" } } } + ] + } + }, + "aggs": { + "avg_latency": { "avg": { "field": "latency_ms" } } + } + } + "#; + + let ast = parse_query_to_ast(query); + assert!(ast.is_some()); + } + + #[test] + fn parse_query_to_ast_returns_none_for_invalid_json() { + let query = r#"{ "query": { "bool": { "filter": [ } }"#; + assert!(parse_query_to_ast(query).is_none()); + } + + #[test] + fn walk_bool_query_and_extract_info_extracts_term_and_range_predicates() { + let bool_query = dsl::Query::bool() + .filter(dsl::Query::term("service.keyword", "frontend")) + .filter(dsl::Query::term("is_canary", true)) + .filter(dsl::Query::range("@timestamp").gte("now-30s").lte("now")); + + let predicates = walk_bool_query_and_extract_info(&bool_query); + assert_eq!(predicates.len(), 3); + assert_eq!( + predicates[0], + Predicate::Term { + field: "service".to_string(), + value: TermValue::String("frontend".to_string()), + } + ); + assert_eq!( + predicates[1], + Predicate::Term { + field: "is_canary".to_string(), + value: TermValue::Boolean(true), + } + ); + assert_eq!( + predicates[2], + Predicate::Range { + field: "@timestamp".to_string(), + gte: Some(TermValue::String("now-30s".to_string())), + lte: Some(TermValue::String("now".to_string())), + } + ); + } + + #[test] + fn walk_aggregations_and_extract_info_extracts_terms_group_by_and_metric() { + let query = r#" + { + "aggs": { + "by_service": { + "terms": { "field": "service.keyword" }, + "aggs": { + "avg_latency": { "avg": { "field": "latency_ms" } } + } + } + } + } + "#; + let ast = parse_query_to_ast(query).expect("query should parse"); + + let (target_field, agg_type, group_by) = + walk_aggregations_and_extract_info(&ast.aggs).expect("aggregation info should parse"); + assert_eq!(target_field, "latency_ms"); + assert_eq!(agg_type, AggregationType::Avg); + assert_eq!( + group_by, + Some(GroupBySpec::Fields(vec!["service".to_string()])) + ); + } + + #[test] + fn walk_aggregations_and_extract_info_extracts_multi_terms_and_percentiles() { + let query = r#" + { + "aggs": { + "by_labels": { + "multi_terms": { + "terms": [ + { "field": "service.keyword" }, + { "field": "env.keyword" } + ] + }, + "aggs": { + "latency_percentiles": { + "percentiles": { + "field": "latency_ms", + "percents": [50.0, 95.0] + } + } + } + } + } + } + "#; + let ast = parse_query_to_ast(query).expect("query should parse"); + + let (target_field, agg_type, group_by) = + walk_aggregations_and_extract_info(&ast.aggs).expect("aggregation info should parse"); + assert_eq!(target_field, "latency_ms"); + assert_eq!(agg_type, AggregationType::Percentiles(vec![50.0, 95.0])); + assert_eq!( + group_by, + Some(GroupBySpec::Fields(vec![ + "service".to_string(), + "env".to_string() + ])) + ); + } + + #[test] + fn walk_ast_and_extract_info_builds_elastic_dsl_query() { + let ast = dsl::Search::new() + .query( + dsl::Query::bool() + .filter(dsl::Query::term("service.keyword", "frontend")) + .filter(dsl::Query::range("@timestamp").gte("now-30s").lte("now")), + ) + .aggregate( + "by_service", + dsl::Aggregation::terms("service.keyword") + .aggregate("max_latency", dsl::Aggregation::max("latency_ms")), + ); + let info = walk_ast_and_extract_info(&ast).expect("info should parse"); + + assert_eq!(info.target_field, "latency_ms"); + assert_eq!(info.aggregation, AggregationType::Max); + assert_eq!(info.predicates.len(), 2); + assert_eq!( + info.group_by_buckets, + Some(GroupBySpec::Fields(vec!["service".to_string()])) + ); + } +} diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs new file mode 100644 index 00000000..a50b887d --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/mod.rs @@ -0,0 +1,5 @@ +pub mod extract_info; +pub mod query_info; + +pub use extract_info::*; +pub use query_info::*; diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs new file mode 100644 index 00000000..ce82d705 --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/ast_parsing/query_info.rs @@ -0,0 +1,73 @@ +use serde::{Deserialize, Serialize}; + +pub type FieldName = String; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ElasticDSLQueryInfo { + // A distilled representation of an ElasticSearch DSL query, capturing the essential logic and structure. + pub target_field: FieldName, // List of metrics being queried + pub predicates: Vec, // Predicates applied to the query (e.g. filters in bool.filter) + pub group_by_buckets: Option, // Grouping specification if the query includes a group by clause + pub aggregation: AggregationType, // The statistic being computed (e.g. avg, sum, percentiles) +} + +impl ElasticDSLQueryInfo { + // Additional methods for processing or analyzing the query can be added here + + pub fn new( + target_field: FieldName, + predicates: Vec, + group_by_buckets: Option, + aggregation: AggregationType, + ) -> Self { + Self { + target_field, + predicates, + group_by_buckets, + aggregation, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum AggregationType { + Avg, + Sum, + Min, + Max, + Percentiles(Vec), // List of percentiles being computed +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Predicate { + Term { + field: FieldName, + value: TermValue, + }, + Range { + field: FieldName, + gte: Option, + lte: Option, + }, + // Other predicate types can be added here (e.g. exists, wildcard, etc.) +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum TermValue { + String(String), + Float(f64), + Int(i64), + UnsignedInt(u64), + Boolean(bool), + // Other term value types can be added here (e.g. boolean, date, etc.) +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum GroupBySpec { + Fields(Vec), + Filters(Vec), // Grouping by filters (e.g. group by whether a field matches a certain value) +} diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs new file mode 100644 index 00000000..64f2cf87 --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs @@ -0,0 +1,205 @@ +use crate::ast_parsing::query_info::{FieldName, Predicate, TermValue}; +use serde::{Deserialize, Serialize}; + +/// Time range bounds resolved into epoch milliseconds. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ResolvedTimeRange { + pub field: FieldName, + pub gte_ms: Option, + pub lte_ms: Option, +} + +/// An optional time range applied to a timestamp field. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TimeRange { + pub field: FieldName, + pub gte: Option, + pub lte: Option, +} + +impl TimeRange { + /// Parse a date-math expression into epoch milliseconds using the provided + /// `now_ms` as reference for `now`-relative expressions. + /// + /// Supported forms: + /// - `now` + /// - `now-30s`, `now+5m`, `now-1h`, `now-2d`, `now-1w`, `now-500ms` + /// - RFC3339 timestamps (e.g. `2026-03-22T12:34:56Z`) + /// - Plain integer timestamps (returned as-is) + pub fn parse_date_math(expr: &str, now_ms: i64) -> Option { + if expr == "now" { + return Some(now_ms); + } + + if let Some(delta) = Self::parse_now_delta_ms(expr) { + return now_ms.checked_add(delta); + } + + if let Ok(v) = expr.parse::() { + return Some(v); + } + + chrono::DateTime::parse_from_rfc3339(expr) + .ok() + .map(|dt| dt.timestamp_millis()) + } + + /// Resolve `gte`/`lte` date-math strings into numeric epoch-millisecond + /// values relative to `now_ms`. + pub fn resolve_epoch_millis(&self, now_ms: i64) -> Option { + let gte_ms = match &self.gte { + Some(v) => Some(Self::parse_date_math(v, now_ms)?), + None => None, + }; + let lte_ms = match &self.lte { + Some(v) => Some(Self::parse_date_math(v, now_ms)?), + None => None, + }; + + Some(ResolvedTimeRange { + field: self.field.clone(), + gte_ms, + lte_ms, + }) + } + + fn parse_now_delta_ms(expr: &str) -> Option { + let rest = expr.strip_prefix("now")?; + if rest.is_empty() { + return Some(0); + } + + let sign_char = rest.chars().next()?; + let sign = match sign_char { + '+' => 1_i64, + '-' => -1_i64, + _ => return None, + }; + + let offset = &rest[1..]; + if offset.is_empty() { + return None; + } + + let digit_count = offset.chars().take_while(|c| c.is_ascii_digit()).count(); + if digit_count == 0 || digit_count == offset.len() { + return None; + } + + let qty = offset[..digit_count].parse::().ok()?; + let unit = &offset[digit_count..]; + let unit_ms = match unit { + "ms" => 1_i64, + "s" => 1_000_i64, + "m" => 60_000_i64, + "h" => 3_600_000_i64, + "d" => 86_400_000_i64, + "w" => 604_800_000_i64, + _ => return None, + }; + + qty.checked_mul(unit_ms)?.checked_mul(sign) + } +} + +pub fn range_query_to_time_range(predicate: &Predicate, now_ms: i64) -> Option { + match predicate { + Predicate::Range { field, gte, lte } => { + let tr = TimeRange { + field: field.clone(), + gte: gte.as_ref().and_then(|v| match v { + TermValue::String(s) => Some(s.clone()), + _ => None, + }), + lte: lte.as_ref().and_then(|v| match v { + TermValue::String(s) => Some(s.clone()), + _ => None, + }), + }; + let resolved = tr.resolve_epoch_millis(now_ms)?; + Some(resolved) + } + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_date_math_supports_now_relative_and_numeric() { + let now_ms = 1_700_000_000_000_i64; + assert_eq!(TimeRange::parse_date_math("now", now_ms), Some(now_ms)); + assert_eq!( + TimeRange::parse_date_math("now-30s", now_ms), + Some(now_ms - 30_000) + ); + assert_eq!( + TimeRange::parse_date_math("now+5m", now_ms), + Some(now_ms + 300_000) + ); + assert_eq!( + TimeRange::parse_date_math("1700000000123", now_ms), + Some(1_700_000_000_123) + ); + } + + #[test] + fn parse_date_math_supports_rfc3339() { + let now_ms = 0; + let value = TimeRange::parse_date_math("2026-03-22T12:34:56Z", now_ms) + .expect("RFC3339 timestamp should parse"); + assert_eq!(value, 1_774_182_896_000); + } + + #[test] + fn parse_date_math_rejects_invalid_expressions() { + let now_ms = 1_700_000_000_000_i64; + assert_eq!(TimeRange::parse_date_math("now+", now_ms), None); + assert_eq!(TimeRange::parse_date_math("now-10", now_ms), None); + assert_eq!(TimeRange::parse_date_math("yesterday", now_ms), None); + } + + #[test] + fn resolve_epoch_millis_resolves_both_bounds() { + let range = TimeRange { + field: "@timestamp".to_string(), + gte: Some("now-1m".to_string()), + lte: Some("now".to_string()), + }; + let now_ms = 2_000_000_i64; + + let resolved = range + .resolve_epoch_millis(now_ms) + .expect("range should resolve"); + assert_eq!(resolved.field, "@timestamp"); + assert_eq!(resolved.gte_ms, Some(1_940_000)); + assert_eq!(resolved.lte_ms, Some(2_000_000)); + } + + #[test] + fn range_query_to_time_range_converts_range_predicate() { + let predicate = Predicate::Range { + field: "@timestamp".to_string(), + gte: Some(TermValue::String("now-30s".to_string())), + lte: Some(TermValue::String("now".to_string())), + }; + let now_ms = 1_000_000_i64; + + let resolved = + range_query_to_time_range(&predicate, now_ms).expect("range predicate should convert"); + assert_eq!(resolved.field, "@timestamp"); + assert_eq!(resolved.gte_ms, Some(970_000)); + assert_eq!(resolved.lte_ms, Some(1_000_000)); + } + + #[test] + fn range_query_to_time_range_returns_none_for_non_range_predicates() { + let predicate = Predicate::Term { + field: "service".to_string(), + value: TermValue::String("frontend".to_string()), + }; + assert!(range_query_to_time_range(&predicate, 100).is_none()); + } +} diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/helpers.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/helpers.rs new file mode 100644 index 00000000..c2cf7a2c --- /dev/null +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/helpers.rs @@ -0,0 +1,15 @@ +/// Strip the `.keyword` suffix from a field name, if present. +pub fn strip_keyword_suffix(field: &str) -> &str { + field.strip_suffix(".keyword").unwrap_or(field) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn strip_keyword_suffix_removes_only_trailing_keyword() { + assert_eq!(strip_keyword_suffix("service.keyword"), "service"); + assert_eq!(strip_keyword_suffix("env"), "env"); + } +} diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs index c03929fb..4a86ba8b 100644 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs +++ b/asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs @@ -1,7 +1,7 @@ -pub mod parsing; -pub mod pattern; -pub mod types; +pub mod ast_parsing; +pub mod datemath; +pub mod helpers; -pub use parsing::*; -pub use pattern::{classify, parse_and_classify}; -pub use types::*; +pub use ast_parsing::*; +pub use datemath::*; +pub use helpers::*; diff --git a/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs b/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs deleted file mode 100644 index 3963c1f7..00000000 --- a/asap-common/dependencies/rs/elastic_dsl_utilities/src/parsing.rs +++ /dev/null @@ -1,439 +0,0 @@ -use serde_json::Value; - -use crate::types::{GroupBySpec, LabelFilter, MetricAggType, MetricAggregation, TimeRange}; - -// --------------------------------------------------------------------------- -// Metric aggregation helpers -// --------------------------------------------------------------------------- - -/// Try to extract a list of metric aggregations from the top-level `"aggs"` -/// object of a query. Returns `None` if *any* aggregation entry is not one of -/// the recognised metric types (avg / min / max / sum / percentiles). -pub fn extract_metric_aggs(aggs: &Value) -> Option> { - let obj = aggs.as_object()?; - if obj.is_empty() { - return None; - } - - let mut result = Vec::with_capacity(obj.len()); - for (result_name, agg_body) in obj { - // Each aggregation body is an object that should contain exactly one - // recognised metric aggregation key. - let body_obj = agg_body.as_object()?; - let mut found = None; - for (key, inner) in body_obj { - if let Some(agg_type) = MetricAggType::from_json_str(key) { - let field = inner.get("field")?.as_str()?.to_owned(); - let kwargs_map = inner - .as_object()? - .iter() - .filter(|(k, _)| *k != "field") - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - let kwargs = serde_json::Value::Object(kwargs_map); - found = Some(MetricAggregation { - result_name: result_name.clone(), - agg_type, - field, - params: if kwargs.as_object().is_some_and(|o| o.is_empty()) { - None - } else { - Some(kwargs) - }, - }); - break; - } - } - result.push(found?); - } - Some(result) -} - -// --------------------------------------------------------------------------- -// Time range helpers -// --------------------------------------------------------------------------- - -/// Try to extract a `TimeRange` from a bare `{"range": {"": {...}}}` -/// query value. Accepts either string or numeric values for gte/lte. -pub fn extract_time_range(query: &Value) -> Option { - let range_obj = query.get("range")?.as_object()?; - // There should be exactly one field entry in the range object. - if range_obj.len() != 1 { - return None; - } - let (field, bounds) = range_obj.iter().next()?; - let gte = bounds.get("gte").and_then(value_to_string); - let lte = bounds.get("lte").and_then(value_to_string); - Some(TimeRange { - field: field.clone(), - gte, - lte, - }) -} - -fn value_to_string(v: &Value) -> Option { - match v { - Value::String(s) => Some(s.clone()), - Value::Number(n) => Some(n.to_string()), - _ => None, - } -} - -// --------------------------------------------------------------------------- -// Term / label-filter helpers -// --------------------------------------------------------------------------- - -/// Strip the `.keyword` suffix from a field name, if present. -fn strip_keyword_suffix(field: &str) -> &str { - field.strip_suffix(".keyword").unwrap_or(field) -} - -/// Try to extract a `LabelFilter` from a single `"term"` query object. -/// -/// Handles both the opensearch-dsl long form: -/// ```json -/// { "term": { "field": { "value": "val" } } } -/// ``` -/// and the ES shorthand: -/// ```json -/// { "term": { "field": "val" } } -/// ``` -pub fn extract_label_filter_from_term(term_query: &Value) -> Option { - let term_obj = term_query.get("term")?.as_object()?; - if term_obj.len() != 1 { - return None; - } - let (raw_field, field_value) = term_obj.iter().next()?; - let field = strip_keyword_suffix(raw_field).to_owned(); - let value = if let Some(s) = field_value.as_str() { - // Shorthand: "field": "value" - s.to_owned() - } else if let Some(inner) = field_value.as_object() { - // Long form: "field": { "value": "..." } - inner.get("value")?.as_str()?.to_owned() - } else { - return None; - }; - Some(LabelFilter { field, value }) -} - -// --------------------------------------------------------------------------- -// Bool filter helpers -// --------------------------------------------------------------------------- - -/// Try to extract a list of label filters (and optionally a time range) from a -/// `{"bool": {"filter": [...]}}` query structure. -/// -/// The `filter` array must contain at least a term query, and may also contain -/// a range query. Additional (unrecognised) entries in the array cause this -/// function to return `None`. -pub fn extract_label_filters(query: &Value) -> Option<(Vec, Option)> { - let filter_clauses = query.get("bool")?.get("filter")?; - - // The filter value may be an array (multiple clauses) or a single object. - let clauses: Vec<&Value> = if let Some(arr) = filter_clauses.as_array() { - arr.iter().collect() - } else if filter_clauses.is_object() { - vec![filter_clauses] - } else { - return None; - }; - - let mut label_filters: Vec = Vec::new(); - let mut time_range: Option = None; - - for clause in clauses { - if clause.get("term").is_some() { - label_filters.push(extract_label_filter_from_term(clause)?); - } else if clause.get("range").is_some() { - if time_range.is_some() { - return None; - } - time_range = Some(extract_time_range(clause)?); - } else { - // Unknown clause type in the filter. - return None; - } - } - - Some((label_filters, time_range)) -} - -// --------------------------------------------------------------------------- -// Query predicate helpers -// --------------------------------------------------------------------------- - -/// Extract optional predicates from top-level query: -/// - `{"range": ...}` -> `(label_filters=[], time_range=Some(...))` -/// - `{"bool": {"filter": ...}}` -> label filters + optional time range -/// - `None`/`null` query is represented by caller as `(vec![], None)`. -pub fn extract_predicates_from_query( - query: &Value, -) -> Option<(Vec, Option)> { - if query.is_null() { - return Some((Vec::new(), None)); - } - - if let Some(time_range) = extract_time_range(query) { - return Some((Vec::new(), Some(time_range))); - } - - if query.get("bool").is_some() { - return extract_label_filters(query); - } - - None -} - -// --------------------------------------------------------------------------- -// Group-by helpers -// --------------------------------------------------------------------------- - -/// Try to extract a grouped aggregation from top-level `"aggs"` object. -/// -/// Expected shape: -/// ```json -/// { -/// "aggs": { -/// "": { -/// "terms": { "field": "