Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions asap-dropin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ kill -HUP $(pgrep prometheus)

See the [Prometheus configuration docs](https://prometheus.io/docs/prometheus/latest/configuration/configuration/) for more details on reloading.

> **Note:** Most editors (vim, VS Code, etc.) replace the file on save rather than writing in place, which changes the inode. If Prometheus is running in Docker, its bind-mount tracks the original inode, so `/-/reload` will silently reload the old content. To avoid this: mount the config **directory** instead of the file (e.g. `-v ./prometheus-config:/etc/prometheus`), which lets Docker follow file replacements correctly. Alternatively, restart the Prometheus container after editing.

### Step 4 — Add an ASAPQuery datasource in Grafana

Create a new datasource in Grafana pointing at ASAPQuery, then switch your dashboards to use it.
Expand Down
17 changes: 16 additions & 1 deletion asap-query-engine/src/drivers/query/fallback/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::{
use serde_json::Value;
use std::collections::HashMap;

use crate::drivers::query::adapters::ParsedQueryRequest;
use crate::drivers::query::adapters::{ParsedQueryRequest, ParsedRangeQueryRequest};

/// Response format from fallback backend
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -69,6 +69,21 @@ pub trait FallbackClient: Send + Sync {
Ok(serde_json::json!({}))
}

async fn execute_range_query(
&self,
_request: &ParsedRangeQueryRequest,
) -> Result<FallbackResponse, StatusCode> {
Err(StatusCode::NOT_IMPLEMENTED)
}

async fn execute_range_query_with_headers(
&self,
request: &ParsedRangeQueryRequest,
_headers: HashMap<String, String>,
) -> Result<FallbackResponse, StatusCode> {
self.execute_range_query(request).await
}

async fn get_runtime_info_with_headers(
&self,
headers: HashMap<String, String>,
Expand Down
66 changes: 65 additions & 1 deletion asap-query-engine/src/drivers/query/fallback/prometheus.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{FallbackClient, FallbackResponse};
use crate::drivers::query::adapters::ParsedQueryRequest;
use crate::drivers::query::adapters::{ParsedQueryRequest, ParsedRangeQueryRequest};
use async_trait::async_trait;
use axum::http::StatusCode;
use reqwest::Client;
Expand Down Expand Up @@ -95,6 +95,70 @@ impl FallbackClient for PrometheusHttpFallback {
}
}

async fn execute_range_query(
&self,
request: &ParsedRangeQueryRequest,
) -> Result<FallbackResponse, StatusCode> {
debug!("=== FORWARDING RANGE QUERY TO PROMETHEUS ===");
debug!(
"Forwarding range query: '{}', start: {}, end: {}, step: {}",
request.query, request.start, request.end, request.step
);

let full_url = format!("{}/api/v1/query_range", self.base_url.trim_end_matches('/'));

let query_params = vec![
("query", request.query.clone()),
("start", request.start.to_string()),
("end", request.end.to_string()),
("step", request.step.to_string()),
];

match self
.client
.get(&full_url)
.query(&query_params)
.timeout(std::time::Duration::from_secs(30))
.send()
.await
{
Ok(response) => {
let status = response.status();
debug!(
"Received range query response from Prometheus, status: {}",
status
);
match response.json::<Value>().await {
Ok(prometheus_response) => {
debug!("=== PROMETHEUS RANGE FORWARD SUCCESS ===");
Ok(FallbackResponse::Json(prometheus_response))
}
Err(parse_err) => {
error!(
"Failed to parse Prometheus range query response: {}",
parse_err
);
use crate::drivers::query::adapters::PrometheusResponse;
let error = PrometheusResponse::error(
"internal",
"Failed to parse Prometheus range query response",
);
Ok(FallbackResponse::Json(serde_json::to_value(error).unwrap()))
}
}
}
Err(req_err) => {
error!("Failed to forward range query to Prometheus: {}", req_err);
use crate::drivers::query::adapters::PrometheusResponse;
let error = PrometheusResponse::error(
"internal",
&format!("Failed to forward range query to Prometheus: {}", req_err),
);
Ok(FallbackResponse::Json(serde_json::to_value(error).unwrap()))
}
}
}

async fn get_runtime_info(&self) -> Result<Value, StatusCode> {
debug!("Fetching runtime info from Prometheus fallback");

Expand Down
91 changes: 73 additions & 18 deletions asap-query-engine/src/drivers/query/servers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,22 +458,34 @@ async fn process_range_query_request(
state: &AppState,
parsed_request: &ParsedRangeQueryRequest,
start_time: Instant,
headers: HashMap<String, String>,
) -> Response {
// Check if handling is enabled
if !state.config.handle_http_requests {
debug!("HTTP request handling is disabled for range query");
// For now, return error - fallback for range queries can be added later
use crate::drivers::query::adapters::AdapterError;
return match state
.adapter
.format_error_response(&AdapterError::ProtocolError(
"Range query handling is disabled".to_string(),
))
.await
{
Ok(json) => json.into_response(),
Err(status) => status.into_response(),
};
if let Some(fallback) = &state.fallback {
debug!("Forwarding range query to fallback due to disabled handling");
return match fallback
.execute_range_query_with_headers(parsed_request, headers)
.await
{
Ok(response) => response.into_response(),
Err(status) => status.into_response(),
};
} else {
debug!("Returning error - both handling and forwarding disabled");
use crate::drivers::query::adapters::AdapterError;
return match state
.adapter
.format_error_response(&AdapterError::ProtocolError(
"Range query handling is disabled".to_string(),
))
.await
{
Ok(json) => json.into_response(),
Err(status) => status.into_response(),
};
}
}

// Record query for passive auto-discovery (if tracker is enabled)
Expand Down Expand Up @@ -523,10 +535,34 @@ async fn process_range_query_request(
}
}
None => {
let total_duration = start_time.elapsed();
debug!("Range query returned None - query not supported");
match state.adapter.format_unsupported_query_response().await {
Ok(json) => json.into_response(),
Err(status) => status.into_response(),

if let Some(fallback) = &state.fallback {
debug!("Range query not supported locally, forwarding to fallback");
info!(
"query='{}' destination=prometheus total_latency_ms={:.2}",
parsed_request.query,
total_duration.as_secs_f64() * 1000.0
);
match fallback
.execute_range_query_with_headers(parsed_request, headers)
.await
{
Ok(response) => response.into_response(),
Err(status) => status.into_response(),
}
} else {
debug!("Range query not supported and forwarding disabled, returning error");
info!(
"query='{}' destination=none_unsupported total_latency_ms={:.2}",
parsed_request.query,
total_duration.as_secs_f64() * 1000.0
);
match state.adapter.format_unsupported_query_response().await {
Ok(json) => json.into_response(),
Err(status) => status.into_response(),
}
}
}
}
Expand All @@ -535,11 +571,19 @@ async fn process_range_query_request(
async fn handle_range_query(
query_params: Query<HashMap<String, String>>,
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Response {
let start_time = Instant::now();
debug!("=== INCOMING RANGE QUERY GET REQUEST ===");
debug!("Raw query params: {:?}", query_params.0);

let mut forwarding_headers = HashMap::new();
if let Some(auth) = headers.get(axum::http::header::AUTHORIZATION) {
if let Ok(auth_str) = auth.to_str() {
forwarding_headers.insert("Authorization".to_string(), auth_str.to_string());
}
}

let parsed_request = match state.adapter.parse_range_get_request(query_params).await {
Ok(req) => {
debug!(
Expand All @@ -557,13 +601,24 @@ async fn handle_range_query(
}
};

process_range_query_request(&state, &parsed_request, start_time).await
process_range_query_request(&state, &parsed_request, start_time, forwarding_headers).await
}

async fn handle_range_query_post(State(state): State<AppState>, body: Bytes) -> Response {
async fn handle_range_query_post(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
body: Bytes,
) -> Response {
let start_time = Instant::now();
debug!("=== INCOMING RANGE QUERY POST REQUEST ===");

let mut forwarding_headers = HashMap::new();
if let Some(auth) = headers.get(axum::http::header::AUTHORIZATION) {
if let Ok(auth_str) = auth.to_str() {
forwarding_headers.insert("Authorization".to_string(), auth_str.to_string());
}
}

// Parse the body as form data
let body_str = match String::from_utf8(body.to_vec()) {
Ok(s) => s,
Expand Down Expand Up @@ -607,7 +662,7 @@ async fn handle_range_query_post(State(state): State<AppState>, body: Bytes) ->
}
};

process_range_query_request(&state, &parsed_request, start_time).await
process_range_query_request(&state, &parsed_request, start_time, forwarding_headers).await
}

#[cfg(test)]
Expand Down
Loading
Loading