Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions asap-planner-rs/docker-compose.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ services:
"--prometheus-url", "{{ prometheus_url }}"{% if punting %},
"--enable-punting"{% endif %}
]
network_mode: "host"
restart: no
23 changes: 1 addition & 22 deletions asap-query-engine/docker-compose.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,5 @@ services:
- "{{ experiment_output_dir }}:/app/outputs"
# Mount controller output directory for configuration files (read-only)
- "{{ controller_remote_output_dir }}:/app/controller_output:ro"
command: [
"--kafka-topic", "{{ kafka_topic }}",
"--kafka-broker", "{{ kafka_host }}:9092",
"--input-format", "{{ input_format }}",
"--config", "/app/controller_output/inference_config.yaml",
"--streaming-config", "/app/controller_output/streaming_config.yaml",
"--prometheus-server", "http://{{ prometheus_host }}:{{ prometheus_port }}",
"--prometheus-scrape-interval", "{{ prometheus_scrape_interval }}",
"--delete-existing-db",
"--log-level", "{{ log_level }}",
"--output-dir", "/app/outputs",
"--streaming-engine", "{{ streaming_engine }}",
"--query-language", "{{ query_language }}",
"--lock-strategy", "{{ lock_strategy }}"{% if compress_json %},
"--decompress-json"{% endif %}{% if profile_query_engine %},
"--do-profiling"{% endif %}{% if forward_unsupported_queries %},
"--forward-unsupported-queries"{% endif %}{% if dump_precomputes %},
"--dump-precomputes"{% endif %}
]
extra_hosts:
- "kafka:{{ kafka_host }}"
- "prometheus:{{ prometheus_host }}"
command: ["--config-file", "/app/outputs/engine_config.yaml"]
restart: no
6 changes: 3 additions & 3 deletions asap-tools/experiments/CONFIG_PARAMETERS_REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ These parameters must be provided for all experiment scripts:

#### `streaming.engine` (string, optional)
- **Description**: Which streaming engine to use
- **Default**: `"flink"`
- **Choices**: `"flink"`, `"arroyo"`
- **Example**: `"arroyo"`
- **Default**: `"precompute"`
- **Choices**: `"flink"`, `"arroyo"`, `"precompute"`
- **Example**: `"precompute"`
- **Usage**: Selects streaming processing framework

#### `streaming.flink_input_format` (string, optional)
Expand Down
2 changes: 1 addition & 1 deletion asap-tools/experiments/HYDRA_CONFIG_USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ flow.no_teardown: true/false
flow.steady_state_wait: 300 # seconds

# Streaming engine
streaming.engine: flink|arroyo
streaming.engine: flink|arroyo|precompute
streaming.flink_input_format: json|avro-json|avro-binary
streaming.flink_output_format: json|byte
streaming.enable_object_reuse: true/false
Expand Down
11 changes: 4 additions & 7 deletions asap-tools/experiments/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ flow:

# Streaming engine configuration
streaming:
engine: "arroyo" # choices: ["flink", "arroyo"]
engine: "precompute" # choices: ["flink", "arroyo", "precompute"]
parallelism: 1 # Default parallelism for streaming pipelines
flink_input_format: "json" # choices: ["json", "avro-json", "avro-binary"]
flink_output_format: "json" # choices: ["json", "byte"]
Expand All @@ -59,7 +59,7 @@ streaming:
remote_write:
ip: "${remote_write_ip:${cloudlab.node_offset}}"
base_port: 8080
path: "/receive"
path: "/api/v1/write"
enable_optimized_source: true # Use optimized Prometheus remote_write source (10-20x faster)

# Prometheus configuration
Expand All @@ -78,10 +78,7 @@ fake_exporter_language: "rust" # choices: ["python", "rust"]
# Cluster data exporter configuration
cluster_data_directory: "/data/cluster_traces" # Path to directory containing Google/Alibaba cluster trace data

# Query engine language
query_engine_language: "rust" # choices: ["python", "rust"]

# Query language (SQL vs PROMQL) - only used by Rust query engine
# Query language (SQL vs PROMQL)
query_language: "PROMQL" # choices: ["SQL", "PROMQL"]

# Query engine options
Expand Down Expand Up @@ -120,7 +117,7 @@ prometheus_client:

# Container deployment settings
use_container:
query_engine: true # QueryEngineService - containerized query engine
query_engine: true # QueryEngineRustService - containerized query engine
arroyo: true # ArroyoService - containerized Arroyo streaming engine
controller: true # ControllerService - containerized controller
fake_exporter: true # ExporterServiceFactory - containerized fake exporters
Expand Down
2 changes: 0 additions & 2 deletions asap-tools/experiments/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
FLINK_OUTPUT_TOPIC = "flink_output"
KAFKA_BROKER = "localhost:9092"

QUERY_ENGINE_PY_PROCESS_KEYWORD = "main_query_engine.py"
QUERY_ENGINE_RS_PROCESS_KEYWORD = "query_engine_rust"
QUERY_ENGINE_PY_CONTAINER_NAME = "sketchdb-queryengine"
QUERY_ENGINE_RS_CONTAINER_NAME = "sketchdb-queryengine-rust"

ARROYO_IMAGE = "ghcr.io/projectasap/asap-arroyo:v0.1.0"
Expand Down
106 changes: 61 additions & 45 deletions asap-tools/experiments/experiment_only_ingest_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,45 +232,6 @@ def main(cfg: DictConfig):

prometheus_scrape_interval = config.get_prometheus_scrape_interval(cfg.prometheus)

# Start V2-specific infrastructure before Prometheus
if is_v2:
print("Starting V2 infrastructure (Controller, Kafka, Arroyo)...")

# Start controller to generate sketch configs
controller_client_config = os.path.join(
experiment_root_output_dir,
"controller_client_configs",
f"{experiment_mode}.yaml",
)
controller_service.start(
controller_input_file=controller_client_config,
prometheus_scrape_interval=prometheus_scrape_interval,
streaming_engine=args.streaming_engine,
controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR,
punting=args.controller_punting,
)
sync.rsync_controller_config_remote_to_local(
provider,
CONTROLLER_REMOTE_OUTPUT_DIR,
CONTROLLER_LOCAL_OUTPUT_DIR,
node_offset=args.node_offset,
)

# Start Kafka
kafka_service.start()
kafka_service.wait_until_ready()
kafka_service.delete_topics()
kafka_service.create_topics()

# Start Arroyo
arroyo_service.stop()
time.sleep(10)
arroyo_service.start(
experiment_output_dir=experiment_output_dir,
remote_write_base_port=args.remote_write_base_port,
parallelism=args.parallelism,
)

# Start fake exporter if configured
if config.check_exporter_and_queries_exist("fake_exporter", cfg.experiment_params):
print("Starting fake exporter...")
Expand Down Expand Up @@ -316,8 +277,61 @@ def main(cfg: DictConfig):
), f"Expected PrometheusService but got {type(prometheus_service).__name__}"
prometheus_service.start(experiment_output_dir)

# Start V2-specific: Run ArroyoSketch pipeline
if is_v2:
print("Starting V2 infrastructure (Controller, Kafka, Arroyo)...")

controller_input_config = os.path.join(
experiment_root_output_dir,
"controller_client_configs",
f"{experiment_mode}_controller_input.yaml",
)
prometheus_url = (
f"http://localhost:{prometheus_service.get_query_endpoint_port()}"
)

prometheus_service.wait_until_ready()

label_discovery_wait = prometheus_scrape_interval * 2
print(
f"Waiting {label_discovery_wait}s for Prometheus to scrape initial data "
f"before running controller label inference..."
)
time.sleep(label_discovery_wait)

controller_service.start(
controller_input_file=controller_input_config,
prometheus_scrape_interval=prometheus_scrape_interval,
streaming_engine=args.streaming_engine,
controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR,
punting=args.controller_punting,
prometheus_url=prometheus_url,
)
sync.rsync_controller_config_remote_to_local(
provider,
CONTROLLER_REMOTE_OUTPUT_DIR,
CONTROLLER_LOCAL_OUTPUT_DIR,
node_offset=args.node_offset,
)

# Start Kafka
if args.streaming_engine != "precompute":
kafka_service.start()
kafka_service.wait_until_ready()
kafka_service.delete_topics()
kafka_service.create_topics()

# Start Arroyo
if args.streaming_engine != "precompute":
arroyo_service.stop()
time.sleep(10)
arroyo_service.start(
experiment_output_dir=experiment_output_dir,
remote_write_base_port=args.remote_write_base_port,
parallelism=args.parallelism,
)

# Start V2-specific: Run ArroyoSketch pipeline
if is_v2 and args.streaming_engine != "precompute":
print("Starting ArroyoSketch pipeline...")
arroyosketch_pipeline_id = arroyo_service.run_arroyosketch(
experiment_name=args.experiment_name,
Expand Down Expand Up @@ -402,11 +416,13 @@ def main(cfg: DictConfig):
# Stop V2-specific services
if is_v2:
print("Stopping V2 services...")
if arroyosketch_pipeline_id:
arroyo_service.stop_arroyosketch(arroyosketch_pipeline_id)
arroyo_service.stop()
kafka_service.delete_topics()
kafka_service.stop()
if args.streaming_engine != "precompute":
if arroyosketch_pipeline_id:
arroyo_service.stop_arroyosketch(arroyosketch_pipeline_id)
arroyo_service.stop()
if args.streaming_engine != "precompute":
kafka_service.delete_topics()
kafka_service.stop()
controller_service.stop()

# Stop core services
Expand Down
40 changes: 27 additions & 13 deletions asap-tools/experiments/experiment_run_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from experiment_utils.services import (
KafkaService,
FlinkService,
QueryEngineServiceFactory,
QueryEngineRustService,
ExporterServiceFactory,
PrometheusKafkaAdapterService,
ArroyoService,
Expand Down Expand Up @@ -135,9 +135,7 @@ def main(cfg: DictConfig):
# Initialize services
kafka_service = KafkaService(provider, args.node_offset, num_tries=KAFKA_NUM_TRIES)
flink_service = FlinkService(provider, args.node_offset)
# Initialize query engine service based on language
query_engine_service = QueryEngineServiceFactory.create_query_engine_service(
args.query_engine_language,
query_engine_service = QueryEngineRustService(
provider,
use_container=args.use_container_query_engine,
node_offset=args.node_offset,
Expand Down Expand Up @@ -242,6 +240,13 @@ def main(cfg: DictConfig):
"controller_client_configs",
f"{experiment_mode}.yaml",
)
# Stripped to the fields ControllerConfig accepts (deny_unknown_fields).
# The full config above is still used by the prometheus_client.
controller_input_config = os.path.join(
experiment_root_output_dir,
"controller_client_configs",
f"{experiment_mode}_controller_input.yaml",
)

if (
experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME
Expand Down Expand Up @@ -334,6 +339,11 @@ def main(cfg: DictConfig):
local_experiment_dir=local_experiment_dir,
experiment_mode=experiment_mode,
)
# Poll until Prometheus is actually accepting connections before sleeping
# for scrape data. Prometheus takes a few seconds to bind the port after
# its process starts, so a fixed sleep alone can race.
prometheus_service.wait_until_ready()

# Wait for two scrape intervals so Prometheus has series to return.
label_discovery_wait = prometheus_scrape_interval * 2
print(
Expand All @@ -346,7 +356,7 @@ def main(cfg: DictConfig):
f"http://localhost:{prometheus_service.get_query_endpoint_port()}"
)
controller_service.start(
controller_input_file=controller_client_config,
controller_input_file=controller_input_config,
prometheus_scrape_interval=prometheus_scrape_interval,
streaming_engine=args.streaming_engine,
controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR,
Expand All @@ -359,10 +369,11 @@ def main(cfg: DictConfig):
CONTROLLER_LOCAL_OUTPUT_DIR,
node_offset=args.node_offset,
)
kafka_service.start()
kafka_service.wait_until_ready()
kafka_service.delete_topics()
kafka_service.create_topics()
if args.streaming_engine != "precompute":
kafka_service.start()
kafka_service.wait_until_ready()
kafka_service.delete_topics()
kafka_service.create_topics()

if (
config.check_exporter_and_queries_exist(
Expand Down Expand Up @@ -463,9 +474,9 @@ def main(cfg: DictConfig):
pipeline_id=arroyosketch_pipeline_id,
experiment_output_dir=experiment_output_dir,
)
else:
elif args.streaming_engine not in ("precompute",):
raise ValueError(
"Invalid streaming engine: {}. Supported engines are 'flink' and 'arroyo'".format(
"Invalid streaming engine: {}. Supported engines are 'flink', 'arroyo', and 'precompute'".format(
args.streaming_engine
)
)
Expand All @@ -479,6 +490,7 @@ def main(cfg: DictConfig):

query_engine_service.start(
experiment_output_dir=experiment_output_dir,
local_experiment_dir=local_experiment_dir,
flink_output_format=args.flink_output_format,
prometheus_scrape_interval=prometheus_scrape_interval,
log_level=args.log_level,
Expand All @@ -493,6 +505,7 @@ def main(cfg: DictConfig):
query_language=args.query_language,
prometheus_port=prometheus_port,
http_port=http_port,
remote_write_port=args.remote_write_base_port,
)

# Start system exporters (node_exporter, blackbox_exporter, cadvisor)
Expand Down Expand Up @@ -660,8 +673,9 @@ def main(cfg: DictConfig):
arroyo_service.stop()
if args.use_kafka_ingest:
prometheus_kafka_adapter_service.stop()
kafka_service.delete_topics()
kafka_service.stop()
if args.streaming_engine != "precompute":
kafka_service.delete_topics()
kafka_service.stop()

system_exporters_service.stop()
prometheus_service.stop()
Expand Down
Loading
Loading