Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn map_statistic_to_precompute_operator(
Ok((AggregationType::MultipleIncrease, "".to_string()))
}
Statistic::Topk => Ok((AggregationType::CountMinSketchWithHeap, "topk".to_string())),
_ => Err(format!("Statistic {statistic:?} not supported")),
Statistic::Cardinality => Ok((AggregationType::SetAggregator, "".to_string())),
}
}

Expand Down Expand Up @@ -81,6 +81,9 @@ pub fn does_precompute_operator_support_subpopulations(

// CountMinSketchWithHeap is only supported for Topk — does not support subpopulations
AggregationType::CountMinSketchWithHeap if matches!(statistic, Statistic::Topk) => false,
AggregationType::SetAggregator => false,
AggregationType::DeltaSetAggregator => false,
AggregationType::HLL => false,

// Default: not supported
_ => panic!("Unexpected precompute operator: {}", precompute_operator),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,51 @@ mod tests {
assert_eq!(result.error, expected_error);
}

/// Multi-column SELECT: GROUP BY keys repeated as bare identifiers + one aggregate (ClickHouse style).
#[test]
fn test_multi_column_select_quantile_matches_single_aggregate_semantics() {
let sql = "\
SELECT L1, L2, L3, L4, quantile(0.99)(value) AS p99 \
FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, '2025-10-01 00:00:00') AND '2025-10-01 00:00:00' \
GROUP BY L1, L2, L3, L4";
let q = parse_sql_query(sql).expect("multi-column quantile should parse");
assert_eq!(q.metric, "cpu_usage");
assert_eq!(q.aggregation_info.get_name(), "QUANTILE");
assert_eq!(
q.labels,
HashSet::from_iter(
["L1", "L2", "L3", "L4"]
.map(|s| s.to_string())
)
);
}

#[test]
fn test_order_by_with_grouped_count_is_accepted() {
check_query(
"SELECT L1, COUNT(value) AS c FROM cpu_usage WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() GROUP BY L1 ORDER BY c DESC",
vec![QueryType::SpatioTemporal],
None,
);
}

#[test]
fn test_count_distinct_marks_distinct_arg() {
let q = parse_sql_query(
"SELECT L1, COUNT(DISTINCT value) AS distinct_values FROM cpu_usage WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() GROUP BY L1",
)
.expect("count distinct should parse");
assert_eq!(q.aggregation_info.get_name(), "COUNT");
assert!(
q.aggregation_info
.get_args()
.iter()
.any(|arg| arg.eq_ignore_ascii_case("distinct")),
"COUNT(DISTINCT ...) should carry a distinct marker in AggregationInfo args",
);
}

// ── Basic smoke tests ────────────────────────────────────────────────────

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,18 @@ impl SQLPatternMatcher {
));
}

// Wall-clock span in units of prometheus scrape interval (ratio, not seconds).
let scrape_duration = time_info.get_duration();
scraped_intervals = scrape_duration / self.scrape_interval;

if scraped_intervals < self.scrape_interval {
if scraped_intervals < 1.0 {
println!("Returned QueryError::SpatialDurationSmall");

return Err((
QueryError::SpatialDurationSmall,
format!(
"scrape duration {} less than one interval {}",
scraped_intervals, self.scrape_interval
"query time span {:.4} is shorter than one scrape interval ({:.4} s)",
scrape_duration, self.scrape_interval
),
));
}
Expand Down Expand Up @@ -216,7 +217,8 @@ impl SQLPatternMatcher {

let mut sql_query = SQLQuery::new(Vec::new(), None, None);

for (i, (metric, aggregation_info, scrape_duration, labels, time_info)) in
// Third tuple field is `wall_seconds / scrape_interval` (how many scrape steps).
for (i, (metric, aggregation_info, scrape_steps, labels, time_info)) in
query_data.iter().enumerate()
{
if i < query_data.len() - 1 {
Expand All @@ -233,15 +235,15 @@ impl SQLPatternMatcher {
// Last query
// let time_info = TimeInfo::new("time".to_string(), *start, *scrape_duration);

if (scrape_duration - self.scrape_interval).abs() < f64::EPSILON {
if (scrape_steps - 1.0).abs() < f64::EPSILON {
sql_query.add_subquery(
QueryType::Spatial,
aggregation_info.clone(),
metric.clone(),
labels.clone(),
time_info.clone(),
);
} else if *scrape_duration > self.scrape_interval {
} else if *scrape_steps > 1.0 {
// Check if labels match all metadata columns
let has_all_labels = self
.schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ impl SQLPatternParser {
fn parse_select(&self, select: &Select) -> Option<SQLQueryData> {
let (metric, has_subquery) = self.get_metric(select)?;

let aggregation = self.get_aggregation(select)?;

let group_bys = self.get_groupbys(select)?;
let aggregation = self.get_aggregation(select, &group_bys)?;

if !has_subquery {
let time_info = self.get_time_info(select, &metric)?;
Expand All @@ -98,7 +97,6 @@ impl SQLPatternParser {
|| select.prewhere.is_some()
|| !select.cluster_by.is_empty()
|| !select.distribute_by.is_empty()
|| !select.sort_by.is_empty()
|| select.having.is_some()
|| !select.named_window.is_empty()
|| select.window_before_qualify
Expand All @@ -119,8 +117,9 @@ impl SQLPatternParser {
let subquery = match &select.from[0].relation {
TableFactor::Derived { subquery, .. } => match subquery.body.as_ref() {
SetExpr::Select(inner_select) => {
let inner_aggregation = self.get_aggregation(inner_select)?;
let inner_group_bys = self.get_groupbys(inner_select)?;
let inner_aggregation =
self.get_aggregation(inner_select, &inner_group_bys)?;
let time_info = self.get_time_info(inner_select, &metric)?;

Some(Box::new(SQLQueryData {
Expand Down Expand Up @@ -205,89 +204,141 @@ impl SQLPatternParser {
}
}

fn get_aggregation(&self, select: &Select) -> Option<AggregationInfo> {
if select.projection.len() != 1 {
return None;
/// Parses `func(...)` into `AggregationInfo` (SUM, COUNT, QUANTILE, etc.).
fn try_parse_aggregation_function(&self, func: &Function) -> Option<AggregationInfo> {
let name = func.name.to_string().to_uppercase();

let mut args = self.get_quantile_args(func);
let is_distinct = matches!(
&func.args,
FunctionArguments::List(func_args)
if matches!(
func_args.duplicate_treatment,
Some(DuplicateTreatment::Distinct)
)
);
if is_distinct && name == "COUNT" {
args.push("distinct".to_string());
}

match &select.projection[0] {
SelectItem::UnnamedExpr(Expr::Function(func))
| SelectItem::ExprWithAlias {
expr: Expr::Function(func),
..
} => {
let name = func.name.to_string().to_uppercase();

let args = self.get_quantile_args(func);

// Get the column being aggregated
let col = match &func.args {
FunctionArguments::None => return None,
FunctionArguments::Subquery(_) => return None,
FunctionArguments::List(func_args) => {
if name == "QUANTILE" {
if let FunctionArguments::List(params) = &func.parameters {
if !params.args.is_empty() {
// ClickHouse parametric syntax: quantile(0.95)(column)
// Column is the sole argument in func.args.
match func_args.args.first() {
Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(
Expr::Identifier(ident),
))) => ident.value.clone(),
_ => return None,
}
} else {
return None;
}
} else {
// ASAP syntax: QUANTILE(0.95, value) - column is second argument
if func_args.args.len() < 2 {
return None;
}
match &func_args.args[1] {
FunctionArg::Unnamed(FunctionArgExpr::Expr(
Expr::Identifier(ident),
)) => ident.value.clone(),
_ => return None,
}
}
} else if name == "PERCENTILE" {
// PERCENTILE(value, 95) - column is first argument
if func_args.args.is_empty() {
return None;
}
match &func_args.args[0] {
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier(
ident,
// Get the column being aggregated
let col = match &func.args {
FunctionArguments::None => return None,
FunctionArguments::Subquery(_) => return None,
FunctionArguments::List(func_args) => {
if name == "QUANTILE" {
if let FunctionArguments::List(params) = &func.parameters {
if !params.args.is_empty() {
// ClickHouse parametric syntax: quantile(0.95)(column)
match func_args.args.first() {
Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(
Expr::Identifier(ident),
))) => ident.value.clone(),
_ => return None,
}
} else {
// For other aggregations - column is first argument
if func_args.args.is_empty() {
return None;
}
match &func_args.args[0] {
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier(
ident,
))) => ident.value.clone(),
_ => return None,
}
return None;
}
} else {
// ASAP syntax: QUANTILE(0.95, value) - column is second argument
if func_args.args.len() < 2 {
return None;
}
match &func_args.args[1] {
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier(
ident,
))) => ident.value.clone(),
_ => return None,
}
}
};

// Always store PERCENTILE as QUANTILE internally
let normalized_name = if name == "PERCENTILE" {
"QUANTILE".to_string()
} else if name == "PERCENTILE" {
if func_args.args.is_empty() {
return None;
}
match &func_args.args[0] {
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier(ident))) => {
ident.value.clone()
}
_ => return None,
}
} else {
name
};
if func_args.args.is_empty() {
return None;
}
match &func_args.args[0] {
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Identifier(ident))) => {
ident.value.clone()
}
_ => return None,
}
}
}
};

let normalized_name = if name == "PERCENTILE" {
"QUANTILE".to_string()
} else {
name
};

Some(AggregationInfo::new(normalized_name, col, args))
}

/// Single aggregate in `SELECT`, or grouping columns (identifiers) plus exactly one aggregate.
/// In the multi-column form, bare identifiers must match `GROUP BY` exactly as a set.
fn get_aggregation(
&self,
select: &Select,
group_bys: &HashSet<String>,
) -> Option<AggregationInfo> {
if select.projection.is_empty() {
return None;
}

if select.projection.len() == 1 {
return match &select.projection[0] {
SelectItem::UnnamedExpr(Expr::Function(func))
| SelectItem::ExprWithAlias {
expr: Expr::Function(func),
..
} => self.try_parse_aggregation_function(func),
_ => None,
};
}

let mut agg: Option<AggregationInfo> = None;
let mut key_cols: HashSet<String> = HashSet::new();

Some(AggregationInfo::new(normalized_name, col, args))
for item in &select.projection {
match item {
SelectItem::UnnamedExpr(Expr::Function(func))
| SelectItem::ExprWithAlias {
expr: Expr::Function(func),
..
} => {
let parsed = self.try_parse_aggregation_function(func)?;
if agg.replace(parsed).is_some() {
return None;
}
}
SelectItem::UnnamedExpr(Expr::Identifier(ident)) => {
key_cols.insert(ident.value.clone());
}
SelectItem::ExprWithAlias {
expr: Expr::Identifier(ident),
..
} => {
key_cols.insert(ident.value.clone());
}
_ => return None,
}
_ => None,
}

let agg = agg?;
if &key_cols != group_bys {
return None;
}
Some(agg)
}

fn get_metric(&self, select: &Select) -> Option<(String, bool)> {
Expand Down
Loading