Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
362 changes: 214 additions & 148 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion asap-common/dependencies/rs/asap_types/src/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::str::FromStr;
// Re-export AggregationType from promql_utilities (defined there to avoid circular deps).
pub use promql_utilities::query_logics::enums::AggregationType;

#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq)]
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
#[allow(non_camel_case_types)]
pub enum QueryLanguage {
#[value(alias = "SQL")]
Expand Down
1 change: 1 addition & 0 deletions asap-query-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ lazy_static = "1.4"
zstd = "0.13"
reqwest = { version = "0.11", features = ["json"] }
tracing-appender = "0.2"
figment = { version = "0.10", features = ["yaml"] }
arc-swap = "1"
csv = "1"
elastic_dsl_utilities.workspace = true
Expand Down
116 changes: 116 additions & 0 deletions asap-query-engine/examples/engine_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# ASAPQuery Engine Configuration
#
# Usage:
# query-engine-rust --config-file engine_config.yaml
# query-engine-rust --config-file engine_config.yaml precompute_engine.num_workers=8
# query-engine-rust --config-file engine_config.yaml http_server.port=9000 ingest.topic=new-topic

# ---------------------------------------------------------------------------
# Core
# ---------------------------------------------------------------------------

output_dir: "./output"
log_level: "INFO" # DEBUG | INFO | WARN | ERROR (also respects RUST_LOG)

# Query language used for both ingest and query parsing.
# Values are case-sensitive: use lowercase exactly as shown.
query_language: "promql" # promql | sql | elastic_querydsl | elastic_sql

# Prometheus scrape interval in seconds. Used by the query tracker and planner.
prometheus_scrape_interval: 15

# Processing backend. Must be consistent with the ingest source chosen below:
# arroyo → requires ingest.type=kafka
# precompute → requires ingest.type=http_remote_write or csv
streaming_engine: "precompute" # arroyo | precompute

do_profiling: false

# ---------------------------------------------------------------------------
# HTTP query server
# ---------------------------------------------------------------------------

http_server:
port: 8088

# Prometheus server used for query forwarding and planner context.
prometheus_server: "http://localhost:9090"

# When true, queries not answerable from sketches are forwarded to prometheus_server.
# The server must be reachable at startup when this is enabled.
forward_unsupported_queries: false

# ---------------------------------------------------------------------------
# Store
# ---------------------------------------------------------------------------

store:
lock_strategy: "per-key" # global | per-key

# ---------------------------------------------------------------------------
# Ingest source — choose exactly one type.
# streaming_engine must match the chosen type (see above).
# ---------------------------------------------------------------------------

# HTTP remote write (Prometheus remote write protocol):
ingest:
type: "http_remote_write"
port: 9090

# Kafka:
# ingest:
# type: "kafka"
# broker: "localhost:9092"
# topic: "my-topic"
# input_format: "json" # json | byte
# decompress_json: false

# CSV file:
# ingest:
# type: "csv"
# path: "./data.csv"
# metric_name: "my_metric"
# value_col: "value"
# label_cols: ["id1", "id2"] # empty list if no label columns
# timestamp_col: null # column name, or null to synthesize timestamps
# start_ts_ms: 0 # synthesized timestamp start (ms); used when timestamp_col is null
# ts_step_ms: 15000 # ms between rows; required when timestamp_col is null
# batch_size: 1000

# OTLP (OpenTelemetry metrics — gRPC + HTTP):
# Requires streaming_engine: "arroyo". No precompute or Kafka path is used for OTLP;
# the receiver runs as a standalone gRPC/HTTP server.
# ingest:
# type: "otlp"
# grpc_port: 4317
# http_port: 4318

# ---------------------------------------------------------------------------
# Precompute engine (active when streaming_engine=precompute)
# ---------------------------------------------------------------------------

precompute_engine:
num_workers: 4
allowed_lateness_ms: 5000 # max out-of-order sample delay before drop
max_buffer_per_series: 10000 # max buffered samples per series before eviction
flush_interval_ms: 1000 # how often the flush timer fires
channel_buffer_size: 10000 # capacity of router-to-worker channels
dump_precomputes: false # dump received precomputes to output_dir for debugging

# ---------------------------------------------------------------------------
# Query tracker / planner (optional)
# ---------------------------------------------------------------------------

query_tracker:
enabled: false

# How long to observe query patterns before triggering planning (seconds).
observation_window_secs: 100

# ---------------------------------------------------------------------------
# External config file paths (optional — engine starts with empty configs if omitted)
# ---------------------------------------------------------------------------

inference_config: null # path to inference_config.yaml
streaming_config: null # path to streaming_config.yaml
promsketch_config: null # path to promsketch_config.yaml
10 changes: 7 additions & 3 deletions asap-query-engine/src/data_model/enums.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#[derive(clap::ValueEnum, Clone, Debug)]
#[derive(clap::ValueEnum, Clone, Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum InputFormat {
Json,
Byte,
}

#[derive(clap::ValueEnum, Clone, Debug, PartialEq)]
#[derive(clap::ValueEnum, Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StreamingEngine {
Arroyo,
Precompute,
Expand All @@ -24,10 +26,12 @@ pub enum QueryProtocol {
// Future: DuckDbHttp, etc.
}

#[derive(clap::ValueEnum, Clone, Debug, Copy, PartialEq)]
#[derive(clap::ValueEnum, Clone, Debug, Copy, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum LockStrategy {
#[value(name = "global")]
#[serde(rename = "global")]
Global,
#[value(name = "per-key")]
#[serde(rename = "per-key")]
PerKey,
}
Loading
Loading