diff --git a/asap-planner-rs/docker-compose.yml.j2 b/asap-planner-rs/docker-compose.yml.j2 index 06d42d2d..d34973cc 100644 --- a/asap-planner-rs/docker-compose.yml.j2 +++ b/asap-planner-rs/docker-compose.yml.j2 @@ -13,4 +13,5 @@ services: "--prometheus-url", "{{ prometheus_url }}"{% if punting %}, "--enable-punting"{% endif %} ] + network_mode: "host" restart: no diff --git a/asap-query-engine/docker-compose.yml.j2 b/asap-query-engine/docker-compose.yml.j2 index 43fd7e1a..ea9879fc 100644 --- a/asap-query-engine/docker-compose.yml.j2 +++ b/asap-query-engine/docker-compose.yml.j2 @@ -18,26 +18,5 @@ services: - "{{ experiment_output_dir }}:/app/outputs" # Mount controller output directory for configuration files (read-only) - "{{ controller_remote_output_dir }}:/app/controller_output:ro" - command: [ - "--kafka-topic", "{{ kafka_topic }}", - "--kafka-broker", "{{ kafka_host }}:9092", - "--input-format", "{{ input_format }}", - "--config", "/app/controller_output/inference_config.yaml", - "--streaming-config", "/app/controller_output/streaming_config.yaml", - "--prometheus-server", "http://{{ prometheus_host }}:{{ prometheus_port }}", - "--prometheus-scrape-interval", "{{ prometheus_scrape_interval }}", - "--delete-existing-db", - "--log-level", "{{ log_level }}", - "--output-dir", "/app/outputs", - "--streaming-engine", "{{ streaming_engine }}", - "--query-language", "{{ query_language }}", - "--lock-strategy", "{{ lock_strategy }}"{% if compress_json %}, - "--decompress-json"{% endif %}{% if profile_query_engine %}, - "--do-profiling"{% endif %}{% if forward_unsupported_queries %}, - "--forward-unsupported-queries"{% endif %}{% if dump_precomputes %}, - "--dump-precomputes"{% endif %} - ] - extra_hosts: - - "kafka:{{ kafka_host }}" - - "prometheus:{{ prometheus_host }}" + command: ["--config-file", "/app/outputs/engine_config.yaml"] restart: no diff --git a/asap-tools/experiments/CONFIG_PARAMETERS_REFERENCE.md b/asap-tools/experiments/CONFIG_PARAMETERS_REFERENCE.md index 6bbd3a0a..6f177586 100644 --- a/asap-tools/experiments/CONFIG_PARAMETERS_REFERENCE.md +++ b/asap-tools/experiments/CONFIG_PARAMETERS_REFERENCE.md @@ -136,9 +136,9 @@ These parameters must be provided for all experiment scripts: #### `streaming.engine` (string, optional) - **Description**: Which streaming engine to use -- **Default**: `"flink"` -- **Choices**: `"flink"`, `"arroyo"` -- **Example**: `"arroyo"` +- **Default**: `"precompute"` +- **Choices**: `"flink"`, `"arroyo"`, `"precompute"` +- **Example**: `"precompute"` - **Usage**: Selects streaming processing framework #### `streaming.flink_input_format` (string, optional) diff --git a/asap-tools/experiments/HYDRA_CONFIG_USAGE.md b/asap-tools/experiments/HYDRA_CONFIG_USAGE.md index b3517705..813fb47d 100644 --- a/asap-tools/experiments/HYDRA_CONFIG_USAGE.md +++ b/asap-tools/experiments/HYDRA_CONFIG_USAGE.md @@ -52,7 +52,7 @@ flow.no_teardown: true/false flow.steady_state_wait: 300 # seconds # Streaming engine -streaming.engine: flink|arroyo +streaming.engine: flink|arroyo|precompute streaming.flink_input_format: json|avro-json|avro-binary streaming.flink_output_format: json|byte streaming.enable_object_reuse: true/false diff --git a/asap-tools/experiments/config/config.yaml b/asap-tools/experiments/config/config.yaml index 9724b9fc..f25f2833 100644 --- a/asap-tools/experiments/config/config.yaml +++ b/asap-tools/experiments/config/config.yaml @@ -47,7 +47,7 @@ flow: # Streaming engine configuration streaming: - engine: "arroyo" # choices: ["flink", "arroyo"] + engine: "precompute" # choices: ["flink", "arroyo", "precompute"] parallelism: 1 # Default parallelism for streaming pipelines flink_input_format: "json" # choices: ["json", "avro-json", "avro-binary"] flink_output_format: "json" # choices: ["json", "byte"] @@ -59,7 +59,7 @@ streaming: remote_write: ip: "${remote_write_ip:${cloudlab.node_offset}}" base_port: 8080 - path: "/receive" + path: "/api/v1/write" enable_optimized_source: true # Use optimized Prometheus remote_write source (10-20x faster) # Prometheus configuration @@ -78,10 +78,7 @@ fake_exporter_language: "rust" # choices: ["python", "rust"] # Cluster data exporter configuration cluster_data_directory: "/data/cluster_traces" # Path to directory containing Google/Alibaba cluster trace data -# Query engine language -query_engine_language: "rust" # choices: ["python", "rust"] - -# Query language (SQL vs PROMQL) - only used by Rust query engine +# Query language (SQL vs PROMQL) query_language: "PROMQL" # choices: ["SQL", "PROMQL"] # Query engine options @@ -120,7 +117,7 @@ prometheus_client: # Container deployment settings use_container: - query_engine: true # QueryEngineService - containerized query engine + query_engine: true # QueryEngineRustService - containerized query engine arroyo: true # ArroyoService - containerized Arroyo streaming engine controller: true # ControllerService - containerized controller fake_exporter: true # ExporterServiceFactory - containerized fake exporters diff --git a/asap-tools/experiments/constants.py b/asap-tools/experiments/constants.py index a0bda070..2358a3ae 100644 --- a/asap-tools/experiments/constants.py +++ b/asap-tools/experiments/constants.py @@ -14,9 +14,7 @@ FLINK_OUTPUT_TOPIC = "flink_output" KAFKA_BROKER = "localhost:9092" -QUERY_ENGINE_PY_PROCESS_KEYWORD = "main_query_engine.py" QUERY_ENGINE_RS_PROCESS_KEYWORD = "query_engine_rust" -QUERY_ENGINE_PY_CONTAINER_NAME = "sketchdb-queryengine" QUERY_ENGINE_RS_CONTAINER_NAME = "sketchdb-queryengine-rust" ARROYO_IMAGE = "ghcr.io/projectasap/asap-arroyo:v0.1.0" diff --git a/asap-tools/experiments/experiment_only_ingest_path.py b/asap-tools/experiments/experiment_only_ingest_path.py index 05e6a4e8..de424ff2 100644 --- a/asap-tools/experiments/experiment_only_ingest_path.py +++ b/asap-tools/experiments/experiment_only_ingest_path.py @@ -232,45 +232,6 @@ def main(cfg: DictConfig): prometheus_scrape_interval = config.get_prometheus_scrape_interval(cfg.prometheus) - # Start V2-specific infrastructure before Prometheus - if is_v2: - print("Starting V2 infrastructure (Controller, Kafka, Arroyo)...") - - # Start controller to generate sketch configs - controller_client_config = os.path.join( - experiment_root_output_dir, - "controller_client_configs", - f"{experiment_mode}.yaml", - ) - controller_service.start( - controller_input_file=controller_client_config, - prometheus_scrape_interval=prometheus_scrape_interval, - streaming_engine=args.streaming_engine, - controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR, - punting=args.controller_punting, - ) - sync.rsync_controller_config_remote_to_local( - provider, - CONTROLLER_REMOTE_OUTPUT_DIR, - CONTROLLER_LOCAL_OUTPUT_DIR, - node_offset=args.node_offset, - ) - - # Start Kafka - kafka_service.start() - kafka_service.wait_until_ready() - kafka_service.delete_topics() - kafka_service.create_topics() - - # Start Arroyo - arroyo_service.stop() - time.sleep(10) - arroyo_service.start( - experiment_output_dir=experiment_output_dir, - remote_write_base_port=args.remote_write_base_port, - parallelism=args.parallelism, - ) - # Start fake exporter if configured if config.check_exporter_and_queries_exist("fake_exporter", cfg.experiment_params): print("Starting fake exporter...") @@ -316,8 +277,61 @@ def main(cfg: DictConfig): ), f"Expected PrometheusService but got {type(prometheus_service).__name__}" prometheus_service.start(experiment_output_dir) - # Start V2-specific: Run ArroyoSketch pipeline if is_v2: + print("Starting V2 infrastructure (Controller, Kafka, Arroyo)...") + + controller_input_config = os.path.join( + experiment_root_output_dir, + "controller_client_configs", + f"{experiment_mode}_controller_input.yaml", + ) + prometheus_url = ( + f"http://localhost:{prometheus_service.get_query_endpoint_port()}" + ) + + prometheus_service.wait_until_ready() + + label_discovery_wait = prometheus_scrape_interval * 2 + print( + f"Waiting {label_discovery_wait}s for Prometheus to scrape initial data " + f"before running controller label inference..." + ) + time.sleep(label_discovery_wait) + + controller_service.start( + controller_input_file=controller_input_config, + prometheus_scrape_interval=prometheus_scrape_interval, + streaming_engine=args.streaming_engine, + controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR, + punting=args.controller_punting, + prometheus_url=prometheus_url, + ) + sync.rsync_controller_config_remote_to_local( + provider, + CONTROLLER_REMOTE_OUTPUT_DIR, + CONTROLLER_LOCAL_OUTPUT_DIR, + node_offset=args.node_offset, + ) + + # Start Kafka + if args.streaming_engine != "precompute": + kafka_service.start() + kafka_service.wait_until_ready() + kafka_service.delete_topics() + kafka_service.create_topics() + + # Start Arroyo + if args.streaming_engine != "precompute": + arroyo_service.stop() + time.sleep(10) + arroyo_service.start( + experiment_output_dir=experiment_output_dir, + remote_write_base_port=args.remote_write_base_port, + parallelism=args.parallelism, + ) + + # Start V2-specific: Run ArroyoSketch pipeline + if is_v2 and args.streaming_engine != "precompute": print("Starting ArroyoSketch pipeline...") arroyosketch_pipeline_id = arroyo_service.run_arroyosketch( experiment_name=args.experiment_name, @@ -402,11 +416,13 @@ def main(cfg: DictConfig): # Stop V2-specific services if is_v2: print("Stopping V2 services...") - if arroyosketch_pipeline_id: - arroyo_service.stop_arroyosketch(arroyosketch_pipeline_id) - arroyo_service.stop() - kafka_service.delete_topics() - kafka_service.stop() + if args.streaming_engine != "precompute": + if arroyosketch_pipeline_id: + arroyo_service.stop_arroyosketch(arroyosketch_pipeline_id) + arroyo_service.stop() + if args.streaming_engine != "precompute": + kafka_service.delete_topics() + kafka_service.stop() controller_service.stop() # Stop core services diff --git a/asap-tools/experiments/experiment_run_e2e.py b/asap-tools/experiments/experiment_run_e2e.py index 738fbbdf..132c9894 100644 --- a/asap-tools/experiments/experiment_run_e2e.py +++ b/asap-tools/experiments/experiment_run_e2e.py @@ -12,7 +12,7 @@ from experiment_utils.services import ( KafkaService, FlinkService, - QueryEngineServiceFactory, + QueryEngineRustService, ExporterServiceFactory, PrometheusKafkaAdapterService, ArroyoService, @@ -135,9 +135,7 @@ def main(cfg: DictConfig): # Initialize services kafka_service = KafkaService(provider, args.node_offset, num_tries=KAFKA_NUM_TRIES) flink_service = FlinkService(provider, args.node_offset) - # Initialize query engine service based on language - query_engine_service = QueryEngineServiceFactory.create_query_engine_service( - args.query_engine_language, + query_engine_service = QueryEngineRustService( provider, use_container=args.use_container_query_engine, node_offset=args.node_offset, @@ -242,6 +240,13 @@ def main(cfg: DictConfig): "controller_client_configs", f"{experiment_mode}.yaml", ) + # Stripped to the fields ControllerConfig accepts (deny_unknown_fields). + # The full config above is still used by the prometheus_client. + controller_input_config = os.path.join( + experiment_root_output_dir, + "controller_client_configs", + f"{experiment_mode}_controller_input.yaml", + ) if ( experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME @@ -334,6 +339,11 @@ def main(cfg: DictConfig): local_experiment_dir=local_experiment_dir, experiment_mode=experiment_mode, ) + # Poll until Prometheus is actually accepting connections before sleeping + # for scrape data. Prometheus takes a few seconds to bind the port after + # its process starts, so a fixed sleep alone can race. + prometheus_service.wait_until_ready() + # Wait for two scrape intervals so Prometheus has series to return. label_discovery_wait = prometheus_scrape_interval * 2 print( @@ -346,7 +356,7 @@ def main(cfg: DictConfig): f"http://localhost:{prometheus_service.get_query_endpoint_port()}" ) controller_service.start( - controller_input_file=controller_client_config, + controller_input_file=controller_input_config, prometheus_scrape_interval=prometheus_scrape_interval, streaming_engine=args.streaming_engine, controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR, @@ -359,10 +369,11 @@ def main(cfg: DictConfig): CONTROLLER_LOCAL_OUTPUT_DIR, node_offset=args.node_offset, ) - kafka_service.start() - kafka_service.wait_until_ready() - kafka_service.delete_topics() - kafka_service.create_topics() + if args.streaming_engine != "precompute": + kafka_service.start() + kafka_service.wait_until_ready() + kafka_service.delete_topics() + kafka_service.create_topics() if ( config.check_exporter_and_queries_exist( @@ -463,9 +474,9 @@ def main(cfg: DictConfig): pipeline_id=arroyosketch_pipeline_id, experiment_output_dir=experiment_output_dir, ) - else: + elif args.streaming_engine not in ("precompute",): raise ValueError( - "Invalid streaming engine: {}. Supported engines are 'flink' and 'arroyo'".format( + "Invalid streaming engine: {}. Supported engines are 'flink', 'arroyo', and 'precompute'".format( args.streaming_engine ) ) @@ -479,6 +490,7 @@ def main(cfg: DictConfig): query_engine_service.start( experiment_output_dir=experiment_output_dir, + local_experiment_dir=local_experiment_dir, flink_output_format=args.flink_output_format, prometheus_scrape_interval=prometheus_scrape_interval, log_level=args.log_level, @@ -493,6 +505,7 @@ def main(cfg: DictConfig): query_language=args.query_language, prometheus_port=prometheus_port, http_port=http_port, + remote_write_port=args.remote_write_base_port, ) # Start system exporters (node_exporter, blackbox_exporter, cadvisor) @@ -660,8 +673,9 @@ def main(cfg: DictConfig): arroyo_service.stop() if args.use_kafka_ingest: prometheus_kafka_adapter_service.stop() - kafka_service.delete_topics() - kafka_service.stop() + if args.streaming_engine != "precompute": + kafka_service.delete_topics() + kafka_service.stop() system_exporters_service.stop() prometheus_service.stop() diff --git a/asap-tools/experiments/experiment_run_grafana_demo.py b/asap-tools/experiments/experiment_run_grafana_demo.py index 741f6322..58ae42e3 100644 --- a/asap-tools/experiments/experiment_run_grafana_demo.py +++ b/asap-tools/experiments/experiment_run_grafana_demo.py @@ -11,7 +11,7 @@ from experiment_utils.providers.factory import create_provider from experiment_utils.services import ( KafkaService, - QueryEngineServiceFactory, + QueryEngineRustService, ExporterServiceFactory, ArroyoService, ArroyoThroughputMonitor, @@ -120,9 +120,7 @@ def main(cfg: DictConfig): # Initialize services kafka_service = KafkaService(provider, args.node_offset, num_tries=KAFKA_NUM_TRIES) - # Initialize query engine service based on language - query_engine_service = QueryEngineServiceFactory.create_query_engine_service( - args.query_engine_language, + query_engine_service = QueryEngineRustService( provider, use_container=args.use_container_query_engine, node_offset=args.node_offset, @@ -213,10 +211,10 @@ def main(cfg: DictConfig): wait=True, ) - controller_client_config = os.path.join( + controller_input_config = os.path.join( experiment_root_output_dir, "controller_client_configs", - f"{experiment_mode}.yaml", + f"{experiment_mode}_controller_input.yaml", ) if args.streaming_engine == "flink": @@ -287,26 +285,6 @@ def main(cfg: DictConfig): ) prometheus_scrape_interval = config.get_prometheus_scrape_interval(cfg.prometheus) - # copy_controller_client_config(args.controller_client_config, local_experiment_dir) - if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: - controller_service.start( - controller_input_file=controller_client_config, - prometheus_scrape_interval=prometheus_scrape_interval, - streaming_engine=args.streaming_engine, - controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR, - punting=args.controller_punting, - ) - sync.rsync_controller_config_remote_to_local( - provider, - CONTROLLER_REMOTE_OUTPUT_DIR, - CONTROLLER_LOCAL_OUTPUT_DIR, - node_offset=args.node_offset, - ) - kafka_service.start() - kafka_service.wait_until_ready() - kafka_service.delete_topics() - kafka_service.create_topics() - if config.check_exporter_and_queries_exist("fake_exporter", cfg.experiment_params): # this DOES NOT block exporter_service.start( @@ -331,6 +309,75 @@ def main(cfg: DictConfig): ): deathstar_service.start() + # Start system exporters (node_exporter, blackbox_exporter, cadvisor) + system_exporters_service.start(cfg.experiment_params) + + # Start Prometheus service based on deployment mode + monitoring = cfg.experiment_params.monitoring + + if monitoring.deployment_mode == "containerized": + # Containerized deployment (DockerPrometheusService or DockerVictoriaMetricsService) + assert isinstance( + prometheus_service, (DockerPrometheusService, DockerVictoriaMetricsService) + ), f"Expected Docker-based service but got {type(prometheus_service).__name__}" + + # Check if resource limits are specified + if hasattr(monitoring, "resource_limits"): + prometheus_service.start( + experiment_output_dir=experiment_output_dir, + local_experiment_dir=local_experiment_dir, + experiment_mode=experiment_mode, + cpu_limit=monitoring.resource_limits.cpu_limit, + memory_limit=monitoring.resource_limits.memory_limit, + ) + else: + # Containerized without resource limits + prometheus_service.start( + experiment_output_dir=experiment_output_dir, + local_experiment_dir=local_experiment_dir, + experiment_mode=experiment_mode, + ) + else: # bare_metal + # Bare-metal deployment (PrometheusService) + assert isinstance( + prometheus_service, PrometheusService + ), f"Expected PrometheusService but got {type(prometheus_service).__name__}" + prometheus_service.start(experiment_output_dir) + + if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: + prometheus_url = ( + f"http://localhost:{prometheus_service.get_query_endpoint_port()}" + ) + + prometheus_service.wait_until_ready() + + label_discovery_wait = prometheus_scrape_interval * 2 + print( + f"Waiting {label_discovery_wait}s for Prometheus to scrape initial data " + f"before running controller label inference..." + ) + time.sleep(label_discovery_wait) + + controller_service.start( + controller_input_file=controller_input_config, + prometheus_scrape_interval=prometheus_scrape_interval, + streaming_engine=args.streaming_engine, + controller_remote_output_dir=CONTROLLER_REMOTE_OUTPUT_DIR, + punting=args.controller_punting, + prometheus_url=prometheus_url, + ) + sync.rsync_controller_config_remote_to_local( + provider, + CONTROLLER_REMOTE_OUTPUT_DIR, + CONTROLLER_LOCAL_OUTPUT_DIR, + node_offset=args.node_offset, + ) + if args.streaming_engine != "precompute": + kafka_service.start() + kafka_service.wait_until_ready() + kafka_service.delete_topics() + kafka_service.create_topics() + if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: if args.use_kafka_ingest: # prometheus_kafka_adapter_service.start( @@ -391,9 +438,9 @@ def main(cfg: DictConfig): pipeline_id=arroyosketch_pipeline_id, experiment_output_dir=experiment_output_dir, ) - else: + elif args.streaming_engine not in ("precompute",): raise ValueError( - "Invalid streaming engine: {}. Supported engines are 'flink' and 'arroyo'".format( + "Invalid streaming engine: {}. Supported engines are 'flink', 'arroyo', and 'precompute'".format( args.streaming_engine ) ) @@ -411,9 +458,12 @@ def main(cfg: DictConfig): if not cfg.flow.replace_query_engine_with_dumb_consumer: # Get prometheus port from prometheus service prometheus_port = prometheus_service.get_query_endpoint_port() + # Get http port from query engine service + http_port = query_engine_service.get_http_port() query_engine_service.start( experiment_output_dir=experiment_output_dir, + local_experiment_dir=local_experiment_dir, flink_output_format=args.flink_output_format, prometheus_scrape_interval=prometheus_scrape_interval, log_level=args.log_level, @@ -427,42 +477,10 @@ def main(cfg: DictConfig): lock_strategy=args.lock_strategy, query_language=args.query_language, prometheus_port=prometheus_port, + http_port=http_port, + remote_write_port=args.remote_write_base_port, ) - # Start system exporters (node_exporter, blackbox_exporter, cadvisor) - system_exporters_service.start(cfg.experiment_params) - - # Start Prometheus service based on deployment mode - monitoring = cfg.experiment_params.monitoring - - if monitoring.deployment_mode == "containerized": - # Containerized deployment (DockerPrometheusService or DockerVictoriaMetricsService) - assert isinstance( - prometheus_service, (DockerPrometheusService, DockerVictoriaMetricsService) - ), f"Expected Docker-based service but got {type(prometheus_service).__name__}" - - # Check if resource limits are specified - if hasattr(monitoring, "resource_limits"): - prometheus_service.start( - experiment_output_dir=experiment_output_dir, - local_experiment_dir=local_experiment_dir, - experiment_mode=experiment_mode, - cpu_limit=monitoring.resource_limits.cpu_limit, - memory_limit=monitoring.resource_limits.memory_limit, - ) - else: - # Containerized without resource limits - prometheus_service.start( - experiment_output_dir=experiment_output_dir, - local_experiment_dir=local_experiment_dir, - experiment_mode=experiment_mode, - ) - else: # bare_metal - # Bare-metal deployment (PrometheusService) - assert isinstance( - prometheus_service, PrometheusService - ), f"Expected PrometheusService but got {type(prometheus_service).__name__}" - prometheus_service.start(experiment_output_dir) # this DOES NOT block if ( workloads_config is not None diff --git a/asap-tools/experiments/experiment_teardown_everything.py b/asap-tools/experiments/experiment_teardown_everything.py index f9a73abe..fd280abe 100644 --- a/asap-tools/experiments/experiment_teardown_everything.py +++ b/asap-tools/experiments/experiment_teardown_everything.py @@ -18,7 +18,7 @@ from experiment_utils.services import ( KafkaService, FlinkService, - QueryEngineServiceFactory, + QueryEngineRustService, ExporterServiceFactory, PrometheusKafkaAdapterService, ArroyoService, @@ -72,22 +72,11 @@ def main(cfg: DictConfig): kafka_service = KafkaService(provider, args.node_offset, num_tries=KAFKA_NUM_TRIES) flink_service = FlinkService(provider, args.node_offset) - # Initialize both query engine languages - query_engine_service_rust = QueryEngineServiceFactory.create_query_engine_service( - "rust", provider, use_container=True, node_offset=args.node_offset - ) - query_engine_service_python = QueryEngineServiceFactory.create_query_engine_service( - "python", provider, use_container=True, node_offset=args.node_offset - ) - query_engine_service_rust_native = ( - QueryEngineServiceFactory.create_query_engine_service( - "rust", provider, use_container=False, node_offset=args.node_offset - ) + query_engine_service_container = QueryEngineRustService( + provider, use_container=True, node_offset=args.node_offset ) - query_engine_service_python_native = ( - QueryEngineServiceFactory.create_query_engine_service( - "python", provider, use_container=False, node_offset=args.node_offset - ) + query_engine_service_native = QueryEngineRustService( + provider, use_container=False, node_offset=args.node_offset ) system_exporters_service = SystemExportersService( @@ -176,10 +165,8 @@ def main(cfg: DictConfig): ("Prometheus Client (container)", prometheus_client_service_container), ("Prometheus Client (native)", prometheus_client_service_native), ("Remote Monitor", remote_monitor_service), - ("Query Engine Rust (container)", query_engine_service_rust), - ("Query Engine Python (container)", query_engine_service_python), - ("Query Engine Rust (native)", query_engine_service_rust_native), - ("Query Engine Python (native)", query_engine_service_python_native), + ("Query Engine (container)", query_engine_service_container), + ("Query Engine (native)", query_engine_service_native), ("Kafka", kafka_service), ("Prometheus-Kafka Adapter", prometheus_kafka_adapter_service), ("System Exporters", system_exporters_service), diff --git a/asap-tools/experiments/experiment_utils/config.py b/asap-tools/experiments/experiment_utils/config.py index 02d22f46..eae7593c 100644 --- a/asap-tools/experiments/experiment_utils/config.py +++ b/asap-tools/experiments/experiment_utils/config.py @@ -253,12 +253,23 @@ def generate_controller_client_configs( server_name = server_config["name"] experiment_to_server_config_map[server_name] = server_config + # Fields accepted by ControllerConfig (deny_unknown_fields in asap-planner-rs). + # Everything else (exporters, monitoring, servers, …) is experiment-only and + # must not appear in the file passed to the controller binary. + CONTROLLER_ALLOWED_KEYS = { + "query_groups", + "sketch_parameters", + "aggregate_cleanup", + "metrics", + "existing_streaming_config", + } + for experiment_mode in experiment_modes: - controller_client_config = copy.deepcopy(experiment_config) - del controller_client_config["experiment"] - if "workloads" in controller_client_config: - del controller_client_config["workloads"] - controller_client_config["servers"] = [ + full_config = copy.deepcopy(experiment_config) + del full_config["experiment"] + if "workloads" in full_config: + del full_config["workloads"] + full_config["servers"] = [ experiment_to_server_config_map[experiment_mode["server"]] ] @@ -267,12 +278,25 @@ def generate_controller_client_configs( and "query_prometheus_too" in experiment_mode and experiment_mode["query_prometheus_too"] ): - controller_client_config["servers"] = servers_config + full_config["servers"] = servers_config + # Full config — used by prometheus_client (needs "servers", etc.) with open( os.path.join(output_dir, "{}.yaml".format(experiment_mode["mode"])), "w" ) as f: - yaml.dump(controller_client_config, f) + yaml.dump(full_config, f) + + # Controller-only config — stripped to the fields ControllerConfig accepts. + controller_only_config = { + k: v for k, v in full_config.items() if k in CONTROLLER_ALLOWED_KEYS + } + with open( + os.path.join( + output_dir, "{}_controller_input.yaml".format(experiment_mode["mode"]) + ), + "w", + ) as f: + yaml.dump(controller_only_config, f) metrics_to_remote_write = [ metric_config["metric"] for metric_config in experiment_config["metrics"] @@ -413,9 +437,6 @@ def __init__(self, cfg: DictConfig): # Fake exporter language self.fake_exporter_language = cfg.fake_exporter_language - # Query engine language - self.query_engine_language = cfg.query_engine_language - # Query language (SQL vs PROMQL) - only used by Rust query engine self.query_language = cfg.query_language @@ -511,17 +532,6 @@ def validate_config(cfg: DictConfig, script_name: str = "experiment_run_e2e"): f"Valid options: {valid_policies}" ) - # Validate Python query engine only supports no_cleanup - if ( - hasattr(cfg, "query_engine_language") - and cfg.query_engine_language == "python" - and policy != "no_cleanup" - ): - raise ValueError( - f"aggregate_cleanup.policy='{policy}' is not supported by the Python query engine. " - "Either use query_engine_language='rust' or set aggregate_cleanup.policy='no_cleanup'" - ) - def generate_and_copy_prometheus_config( num_nodes_in_experiment, diff --git a/asap-tools/experiments/experiment_utils/services/__init__.py b/asap-tools/experiments/experiment_utils/services/__init__.py index 1cc05ba2..a7f365e6 100644 --- a/asap-tools/experiments/experiment_utils/services/__init__.py +++ b/asap-tools/experiments/experiment_utils/services/__init__.py @@ -9,9 +9,7 @@ from .kafka import KafkaService from .flink import FlinkService from .query_engine import ( - QueryEngineService, QueryEngineRustService, - QueryEngineServiceFactory, ) from .monitoring import MonitoringService from .fake_exporters import ( @@ -131,9 +129,7 @@ def create_prometheus_service(cfg, provider, num_nodes: int, node_offset: int): "DockerServiceBase", "KafkaService", "FlinkService", - "QueryEngineService", "QueryEngineRustService", - "QueryEngineServiceFactory", "MonitoringService", "ExporterServiceFactory", "PythonExporterService", diff --git a/asap-tools/experiments/experiment_utils/services/base.py b/asap-tools/experiments/experiment_utils/services/base.py index ca594bc8..82c9d25e 100644 --- a/asap-tools/experiments/experiment_utils/services/base.py +++ b/asap-tools/experiments/experiment_utils/services/base.py @@ -57,6 +57,18 @@ def is_healthy(self) -> bool: """ return True + def wait_until_ready(self, timeout: int = 60) -> None: + """Block until is_healthy() returns True, or raise RuntimeError on timeout.""" + print(f"Waiting for {self.__class__.__name__} to become ready...") + start = time.time() + while not self.is_healthy(): + if time.time() - start > timeout: + raise RuntimeError( + f"{self.__class__.__name__} did not become ready within {timeout}s" + ) + time.sleep(2) + print(f"{self.__class__.__name__} is ready.") + def restart(self, **kwargs) -> None: """ Restart the service. Default implementation stops then starts. diff --git a/asap-tools/experiments/experiment_utils/services/misc.py b/asap-tools/experiments/experiment_utils/services/misc.py index 5afaf678..3c37da99 100644 --- a/asap-tools/experiments/experiment_utils/services/misc.py +++ b/asap-tools/experiments/experiment_utils/services/misc.py @@ -241,7 +241,12 @@ def _start_bare_metal( punting: bool, prometheus_url: str, ) -> None: - cmd = "./target/release/asap-planner --input_config {} --prometheus_scrape_interval {} --output_dir {} --streaming_engine {} --prometheus-url {}".format( + controller_log = os.path.join(controller_remote_output_dir, "controller.log") + cmd = ( + "./target/release/asap-planner" + " --input_config {} --prometheus_scrape_interval {} --output_dir {}" + " --streaming_engine {} --prometheus-url {}" + ).format( controller_input_file, prometheus_scrape_interval, controller_remote_output_dir, @@ -250,6 +255,7 @@ def _start_bare_metal( ) if punting: cmd += " --enable-punting" + cmd += f" > {controller_log} 2>&1" cmd_dir = os.path.join(self.provider.get_home_dir(), "code", "asap-planner-rs") self.provider.execute_command( node_idx=self.node_offset, @@ -299,7 +305,11 @@ def _start_containerized( if punting: generate_cmd += " --punting" - cmd = f"mkdir -p {controller_remote_output_dir}; {generate_cmd}; docker compose -f {remote_compose_file} up --no-build -d" + controller_log = os.path.join(controller_remote_output_dir, "controller.log") + cmd = ( + f"mkdir -p {controller_remote_output_dir}; {generate_cmd}; " + f"docker compose -f {remote_compose_file} up --no-build > {controller_log} 2>&1" + ) try: self.provider.execute_command( node_idx=self.node_offset, @@ -311,6 +321,7 @@ def _start_containerized( ) except Exception as e: print(f"Failed to start Controller container: {e}") + print(f"Check controller logs at: {controller_log}") raise return None diff --git a/asap-tools/experiments/experiment_utils/services/prometheus.py b/asap-tools/experiments/experiment_utils/services/prometheus.py index 35b6d29c..1a5d0b73 100644 --- a/asap-tools/experiments/experiment_utils/services/prometheus.py +++ b/asap-tools/experiments/experiment_utils/services/prometheus.py @@ -140,9 +140,23 @@ def reset(self) -> None: def is_healthy(self) -> bool: """ - Check if Prometheus service is healthy. + Check if Prometheus is ready to serve queries. Returns: - True if service is running + True if Prometheus /-/ready returns HTTP 200 """ - return True + try: + port = self.get_query_endpoint_port() + cmd = f"curl -sf http://localhost:{port}/-/ready" + result = self.provider.execute_command( + node_idx=self.node_offset, + cmd=cmd, + cmd_dir=None, + nohup=False, + popen=False, + ignore_errors=True, + ) + assert isinstance(result, subprocess.CompletedProcess) + return result.returncode == 0 + except Exception: + return False diff --git a/asap-tools/experiments/experiment_utils/services/query_engine.py b/asap-tools/experiments/experiment_utils/services/query_engine.py index a88e34a8..ca121efb 100644 --- a/asap-tools/experiments/experiment_utils/services/query_engine.py +++ b/asap-tools/experiments/experiment_utils/services/query_engine.py @@ -5,7 +5,10 @@ import os import subprocess +import yaml + import constants +import utils from .base import BaseService from experiment_utils.providers.base import InfrastructureProvider @@ -41,8 +44,8 @@ def get_http_port(self) -> int: return 8088 -class QueryEngineService(BaseQueryEngineService): - """Service for managing the Python query engine process.""" +class QueryEngineRustService(BaseQueryEngineService): + """Service for managing the Rust query engine process.""" def __init__( self, @@ -51,7 +54,7 @@ def __init__( node_offset: int, ): """ - Initialize Python Query Engine service. + Initialize Rust Query Engine service. Args: provider: Infrastructure provider for node communication and management @@ -59,355 +62,155 @@ def __init__( node_offset: Starting node index offset """ super().__init__(provider, use_container, node_offset) - self.container_name = constants.QUERY_ENGINE_PY_CONTAINER_NAME + self.container_name = constants.QUERY_ENGINE_RS_CONTAINER_NAME - def start( + # ------------------------------------------------------------------ + # Config-file helpers (used by both bare-metal and containerized paths) + # ------------------------------------------------------------------ + + def _build_engine_config( self, - experiment_output_dir: str, + output_dir: str, flink_output_format: str, prometheus_scrape_interval: int, log_level: str, - profile_query_engine: bool, - manual: bool, streaming_engine: str, forward_unsupported_queries: bool, - controller_remote_output_dir: str, + controller_config_dir: str, compress_json: bool, + prometheus_server: str, + http_port: int, + remote_write_port: int, dump_precomputes: bool, - **kwargs, - ) -> None: + query_language: str, + lock_strategy: str, + profile_query_engine: bool, + kafka_broker: str, + ) -> dict: """ - Start the query engine. + Build an EngineConfig dict matching asap-query-engine's engine_config.rs schema. Args: - experiment_output_dir: Directory for experiment output - flink_output_format: Format of data from Flink - prometheus_scrape_interval: Prometheus scraping interval + output_dir: Output directory path (remote host path or container-internal path) + flink_output_format: Kafka input_format when streaming_engine=arroyo + prometheus_scrape_interval: Prometheus scraping interval in seconds log_level: Logging level - profile_query_engine: Whether to enable profiling - manual: Whether to run in manual mode - streaming_engine: Type of streaming engine (flink/arroyo) - forward_unsupported_queries: Whether to forward unsupported queries - controller_remote_output_dir: Controller output directory - compress_json: Whether JSON is compressed - dump_precomputes: Whether to dump precomputed values - **kwargs: Additional configuration + streaming_engine: 'arroyo' (Kafka ingest) or 'precompute' (HTTP remote write) + forward_unsupported_queries: Whether to forward unsupported queries to backend + controller_config_dir: Directory containing inference_config.yaml and streaming_config.yaml + compress_json: Whether incoming JSON is gzip-compressed (arroyo/Kafka only) + prometheus_server: Full Prometheus URL, e.g. http://host:9090 + http_port: Port for the query engine's HTTP API server + remote_write_port: Port to listen on for Prometheus remote write (precompute only); + should match streaming.remote_write.base_port in the Hydra config + dump_precomputes: Whether to dump received precomputes to output_dir for debugging + query_language: 'PROMQL' → prometheus backend, 'SQL' → clickhouse backend + lock_strategy: Lock strategy for SimpleMapStore ('global' or 'per-key') + profile_query_engine: Whether to enable do_profiling in the engine + kafka_broker: Kafka broker address, e.g. '10.10.1.1:9092' (arroyo only) + + Returns: + Dict matching the EngineConfig YAML schema """ - if dump_precomputes: - raise ValueError( - "dump_precomputes is not supported by the Python query engine. Use the Rust query engine instead." - ) - if self.use_container: - prometheus_host = kwargs.get( - "prometheus_host", self.provider.get_node_ip(self.node_offset) - ) - self._start_containerized( - experiment_output_dir, - flink_output_format, - prometheus_scrape_interval, - log_level, - profile_query_engine, - manual, - streaming_engine, - forward_unsupported_queries, - controller_remote_output_dir, - compress_json, - prometheus_host, - dump_precomputes, - ) + # Map query_language to the backend type (determines PromQL vs SQL API) + if query_language.upper() == "PROMQL": + backend: dict = { + "type": "prometheus", + "server": prometheus_server, + "forward_unsupported_queries": forward_unsupported_queries, + } else: - self._start_bare_metal( - experiment_output_dir, - flink_output_format, - prometheus_scrape_interval, - log_level, - profile_query_engine, - manual, - streaming_engine, - forward_unsupported_queries, - controller_remote_output_dir, - compress_json, - dump_precomputes, + # SQL mode: use clickhouse backend with the same host as prometheus_server + backend = { + "type": "clickhouse", + "url": prometheus_server, + "forward_unsupported_queries": forward_unsupported_queries, + } + + # Ingest config depends on the streaming engine. + # Both flink and arroyo produce to the same Kafka topic. + if streaming_engine in ("arroyo", "flink"): + ingest: dict = { + "type": "kafka", + "broker": kafka_broker, + "topic": constants.FLINK_OUTPUT_TOPIC, + "input_format": flink_output_format, + "decompress_json": compress_json, + } + elif streaming_engine == "precompute": + ingest = { + "type": "http_remote_write", + "port": remote_write_port, + } + else: + raise ValueError( + f"streaming_engine='{streaming_engine}' is not supported by the Rust query engine. " + "Use 'flink', 'arroyo', or 'precompute'." ) - def _start_bare_metal( - self, - experiment_output_dir: str, - flink_output_format: str, - prometheus_scrape_interval: int, - log_level: str, - profile_query_engine: bool, - manual: bool, - streaming_engine: str, - forward_unsupported_queries: bool, - controller_remote_output_dir: str, - compress_json: bool, - dump_precomputes: bool, - ) -> None: - """Start QueryEngine using bare metal deployment (original implementation).""" - output_dir = os.path.join(experiment_output_dir, "query_engine_output") - - cmd = ( - "mkdir -p {}; python3 -u main_query_engine.py " - "--kafka_topic {} " - "--input_format {} " - "--config {}/inference_config.yaml " - "--streaming_config {}/streaming_config.yaml " - "--prometheus_scrape_interval {} " - "--delete_existing_db " - "--log_level {} " - "--output_dir {} " - "{} " - "--streaming_engine {} " - ).format( - output_dir, - constants.FLINK_OUTPUT_TOPIC, - flink_output_format, - controller_remote_output_dir, - controller_remote_output_dir, - prometheus_scrape_interval, - log_level, - output_dir, - "--decompress_json" if compress_json else "", - streaming_engine, - ) - - if profile_query_engine: - cmd += "--do_profiling " - if forward_unsupported_queries: - cmd += "--forward_unsupported_queries " - cmd += "> {}/main_query_engine.out 2>&1 &".format(output_dir) - - cmd_dir = os.path.join(self.provider.get_home_dir(), "code", "QueryEngine") - self.provider.execute_command( - node_idx=self.node_offset, - cmd=cmd, - cmd_dir=cmd_dir, - nohup=True, - popen=False, - ignore_errors=False, - manual=manual, - ) - - def _start_containerized( - self, - experiment_output_dir: str, - flink_output_format: str, - prometheus_scrape_interval: int, - log_level: str, - profile_query_engine: bool, - manual: bool, - streaming_engine: str, - forward_unsupported_queries: bool, - controller_remote_output_dir: str, - compress_json: bool, - prometheus_host: str, - dump_precomputes: bool, + return { + "output_dir": output_dir, + "log_level": log_level, + "prometheus_scrape_interval": prometheus_scrape_interval, + "streaming_engine": streaming_engine, + "do_profiling": profile_query_engine, + "http_server": {"port": http_port}, + "backend": backend, + "store": {"lock_strategy": lock_strategy}, + "ingest": ingest, + "precompute_engine": {"dump_precomputes": dump_precomputes}, + "inference_config": os.path.join( + controller_config_dir, "inference_config.yaml" + ), + "streaming_config": os.path.join( + controller_config_dir, "streaming_config.yaml" + ), + } + + def _write_engine_config_to_remote( + self, config_dict: dict, local_path: str, remote_path: str ) -> None: - """Start QueryEngine using containerized deployment with Jinja template.""" - output_dir = os.path.join(experiment_output_dir, "query_engine_output") - - # Paths on remote CloudLab node - queryengine_dir = os.path.join( - constants.CLOUDLAB_HOME_DIR, "code", "QueryEngine" - ) - template_path = os.path.join(queryengine_dir, "docker-compose.yml.j2") - remote_compose_file = os.path.join(output_dir, "docker-compose.yml") - helper_script = os.path.join( - constants.CLOUDLAB_HOME_DIR, - "code", - "asap-tools", - "experiments", - "generate_queryengine_compose.py", - ) - self.compose_file = remote_compose_file - - # Build command to generate docker-compose file using helper script - generate_cmd = f"python3 {helper_script}" - generate_cmd += f" --template-path '{template_path}'" - generate_cmd += f" --output-path '{remote_compose_file}'" - generate_cmd += f" --queryengine-dir '{queryengine_dir}'" - generate_cmd += f" --container-name '{self.container_name}'" - generate_cmd += f" --experiment-output-dir '{output_dir}'" - generate_cmd += ( - f" --controller-remote-output-dir '{controller_remote_output_dir}'" - ) - generate_cmd += f" --kafka-topic '{constants.FLINK_OUTPUT_TOPIC}'" - generate_cmd += f" --input-format '{flink_output_format}'" - generate_cmd += f" --prometheus-scrape-interval '{prometheus_scrape_interval}'" - generate_cmd += f" --log-level '{log_level}'" - generate_cmd += f" --streaming-engine '{streaming_engine}'" - generate_cmd += f" --kafka-host '{self.provider.get_node_ip(self.node_offset)}'" - generate_cmd += f" --prometheus-host '{prometheus_host}'" - - # Add optional flags - if compress_json: - generate_cmd += " --compress-json" - if profile_query_engine: - generate_cmd += " --profile-query-engine" - if forward_unsupported_queries: - generate_cmd += " --forward-unsupported-queries" - if dump_precomputes: - generate_cmd += " --dump-precomputes" - if manual: - generate_cmd += " --manual" - - cmd = f"mkdir -p {output_dir}; {generate_cmd}; docker compose -f {remote_compose_file} up --no-build -d" - - if manual: - print(f"Directory to run command: {queryengine_dir}") - print(f"Manual mode: Run command: {cmd}") - input("Press Enter to continue...") - else: - try: - self.provider.execute_command( - node_idx=self.node_offset, - cmd=cmd, - cmd_dir=queryengine_dir, - nohup=False, - popen=False, - ignore_errors=False, - ) - except Exception as e: - print(f"Failed to start QueryEngine container: {e}") - raise - - def stop(self, **kwargs) -> None: """ - Stop the query engine process. + Write the engine config YAML locally then rsync it to the remote node. + + Follows the same local-write + rsync pattern used for controller configs + (see sync.rsync_controller_client_configs). Args: - **kwargs: Additional configuration (currently unused) + config_dict: Engine config dict as returned by _build_engine_config + local_path: Local path to write the YAML file to + remote_path: Absolute path on the remote node where the file should land """ - if self.use_container: - self._stop_containerized() - else: - self._stop_bare_metal() + os.makedirs(os.path.dirname(local_path), exist_ok=True) + config_yaml = yaml.dump( + config_dict, default_flow_style=False, allow_unicode=True + ) + with open(local_path, "w") as f: + f.write(config_yaml) - def _stop_bare_metal(self) -> None: - """Stop QueryEngine using bare metal deployment (original implementation).""" - cmd = "pkill -f main_query_engine.py" + hostname = f"node{self.node_offset}.{self.provider.hostname_suffix}" + # Ensure the remote directory exists before rsyncing self.provider.execute_command( node_idx=self.node_offset, - cmd=cmd, + cmd=f"mkdir -p {os.path.dirname(remote_path)}", cmd_dir=None, nohup=False, popen=False, - ignore_errors=True, + ignore_errors=False, ) - - def _stop_containerized(self) -> None: - """Stop QueryEngine using containerized deployment.""" - try: - if self.compose_file: - # Stop using docker compose command on remote node - cmd = f"docker compose -f {self.compose_file} down" - self.provider.execute_command( - node_idx=self.node_offset, - cmd=cmd, - cmd_dir=None, - nohup=False, - popen=False, - ignore_errors=True, - ) - self.compose_file = None - else: - # Fallback: stop by container name on remote node - cmd = f"docker stop {self.container_name}; docker rm {self.container_name}" - self.provider.execute_command( - node_idx=self.node_offset, - cmd=cmd, - cmd_dir=None, - nohup=False, - popen=False, - ignore_errors=True, - ) - except Exception as e: - print(f"Error stopping QueryEngine container: {e}") - - def is_healthy(self) -> bool: - """ - Check if query engine is healthy by checking if process is running. - - Returns: - True if query engine process is running - """ - if self.use_container: - return self._is_healthy_containerized() - else: - return self._is_healthy_bare_metal() - - def _is_healthy_bare_metal(self) -> bool: - """Check if QueryEngine is healthy using bare metal deployment.""" - try: - cmd = "pgrep -f main_query_engine.py" - result = self.provider.execute_command( - node_idx=self.node_offset, - cmd=cmd, - cmd_dir=None, - nohup=False, - popen=False, - ignore_errors=True, - ) - import subprocess - - assert isinstance(result, subprocess.CompletedProcess) - return result.stdout.strip() != "" - except Exception: - return False - - def _is_healthy_containerized(self) -> bool: - """Check if QueryEngine is healthy using containerized deployment.""" - try: - # Check if container is running - result = subprocess.run( - ["docker", "inspect", "-f", "{{.State.Running}}", self.container_name], - capture_output=True, - text=True, - check=True, - ) - return result.stdout.strip() == "true" - except subprocess.CalledProcessError: - return False - except Exception: - return False - - def get_monitoring_keyword(self) -> str: - """ - Get the keyword to use for process monitoring. - - Returns: - Container name if using containers, otherwise process name - """ - if self.use_container: - return self.container_name - else: - return constants.QUERY_ENGINE_PY_PROCESS_KEYWORD - - -class QueryEngineRustService(BaseQueryEngineService): - """Service for managing the Rust query engine process.""" - - def __init__( - self, - provider: InfrastructureProvider, - use_container: bool, - node_offset: int, - ): - """ - Initialize Rust Query Engine service. - - Args: - provider: Infrastructure provider for node communication and management - use_container: Whether to use containerized deployment - node_offset: Starting node index offset - """ - super().__init__(provider, use_container, node_offset) - self.container_name = constants.QUERY_ENGINE_RS_CONTAINER_NAME + cmd = 'rsync -azh -e "ssh {}" {} {}@{}:{}'.format( + constants.SSH_OPTIONS, + local_path, + self.provider.username, + hostname, + remote_path, + ) + utils.run_cmd_with_retry(cmd, popen=False, ignore_errors=False) def start( self, experiment_output_dir: str, + local_experiment_dir: str, flink_output_format: str, prometheus_scrape_interval: int, log_level: str, @@ -426,20 +229,28 @@ def start( Start the Rust query engine. Args: - experiment_output_dir: Directory for experiment output - flink_output_format: Format of data from Flink + experiment_output_dir: Remote directory for experiment output + local_experiment_dir: Local experiment directory (used to write the engine + config YAML locally before rsyncing to remote) + flink_output_format: Format of data from Flink (used as Kafka input_format + when streaming_engine=arroyo) prometheus_scrape_interval: Prometheus scraping interval log_level: Logging level profile_query_engine: Whether to enable profiling manual: Whether to run in manual mode - streaming_engine: Type of streaming engine (flink/arroyo) + streaming_engine: Type of streaming engine ('arroyo' or 'precompute') forward_unsupported_queries: Whether to forward unsupported queries controller_remote_output_dir: Controller output directory - compress_json: Whether JSON is compressed + compress_json: Whether JSON is compressed (arroyo/Kafka only) dump_precomputes: Whether to dump precomputed values lock_strategy: Lock strategy for SimpleMapStore (global or per-key) query_language: Query language (SQL or PROMQL), defaults to PROMQL - **kwargs: Additional configuration (requires prometheus_port, http_port) + **kwargs: Additional configuration. + Required: prometheus_port, http_port. + Optional: prometheus_host (defaults to coordinator node IP), + remote_write_port (port the precompute engine listens on + for Prometheus remote write; should match + streaming.remote_write.base_port, defaults to 8080). """ # Extract prometheus configuration prometheus_host = kwargs.get( @@ -447,10 +258,14 @@ def start( ) prometheus_port = kwargs["prometheus_port"] # Required, no default http_port = kwargs["http_port"] # Required, no default + # Port the precompute engine listens on for Prometheus remote write. + # Should match streaming.remote_write.base_port in the Hydra config. + remote_write_port = kwargs.get("remote_write_port", 8080) if self.use_container: self._start_containerized( experiment_output_dir, + local_experiment_dir, flink_output_format, prometheus_scrape_interval, log_level, @@ -463,6 +278,7 @@ def start( prometheus_host, prometheus_port, http_port, + remote_write_port, dump_precomputes, query_language, lock_strategy, @@ -470,6 +286,7 @@ def start( else: self._start_bare_metal( experiment_output_dir, + local_experiment_dir, flink_output_format, prometheus_scrape_interval, log_level, @@ -482,6 +299,7 @@ def start( prometheus_host, prometheus_port, http_port, + remote_write_port, dump_precomputes, query_language, lock_strategy, @@ -490,6 +308,7 @@ def start( def _start_bare_metal( self, experiment_output_dir: str, + local_experiment_dir: str, flink_output_format: str, prometheus_scrape_interval: int, log_level: str, @@ -502,58 +321,47 @@ def _start_bare_metal( prometheus_host: str, prometheus_port: int, http_port: int, + remote_write_port: int, dump_precomputes: bool, query_language: str, lock_strategy: str, ) -> None: """Start Rust QueryEngine using bare metal deployment.""" output_dir = os.path.join(experiment_output_dir, "query_engine_output") - prometheus_server = f"http://{prometheus_host}:{prometheus_port}" - - cmd = ( - "mkdir -p {}; ../target/release/query_engine_rust " - "--kafka-topic {} " - "--input-format {} " - "--config {}/inference_config.yaml " - "--streaming-config {}/streaming_config.yaml " - "--prometheus-scrape-interval {} " - "--prometheus-server {} " - "--http-port {} " - "--delete-existing-db " - "--log-level {} " - "--output-dir {} " - "{} " - "--streaming-engine {} " - "--query-language {} " - "--lock-strategy {} " - ).format( - output_dir, - constants.FLINK_OUTPUT_TOPIC, - flink_output_format, - controller_remote_output_dir, - controller_remote_output_dir, - prometheus_scrape_interval, - prometheus_server, - http_port, - log_level, - output_dir, - "--decompress-json" if compress_json else "", - streaming_engine, - query_language, - lock_strategy, + local_output_dir = os.path.join(local_experiment_dir, "query_engine_output") + + config = self._build_engine_config( + output_dir=output_dir, + flink_output_format=flink_output_format, + prometheus_scrape_interval=prometheus_scrape_interval, + log_level=log_level, + streaming_engine=streaming_engine, + forward_unsupported_queries=forward_unsupported_queries, + controller_config_dir=controller_remote_output_dir, + compress_json=compress_json, + prometheus_server=f"http://{prometheus_host}:{prometheus_port}", + http_port=http_port, + remote_write_port=remote_write_port, + dump_precomputes=dump_precomputes, + query_language=query_language, + lock_strategy=lock_strategy, + profile_query_engine=profile_query_engine, + kafka_broker=f"{self.provider.get_node_ip(self.node_offset)}:9092", + ) + self._write_engine_config_to_remote( + config_dict=config, + local_path=os.path.join(local_output_dir, "engine_config.yaml"), + remote_path=os.path.join(output_dir, "engine_config.yaml"), ) - - if profile_query_engine: - cmd += "--do-profiling " - if forward_unsupported_queries: - cmd += "--forward-unsupported-queries " - if dump_precomputes: - cmd += "--dump-precomputes " - cmd += "> {}/query_engine_rust.out 2>&1 &".format(output_dir) cmd_dir = os.path.join( self.provider.get_home_dir(), "code", "asap-query-engine" ) + cmd = ( + f"../target/release/query_engine_rust" + f" --config-file {output_dir}/engine_config.yaml" + f" > {output_dir}/query_engine_rust.out 2>&1 &" + ) self.provider.execute_command( node_idx=self.node_offset, cmd=cmd, @@ -567,6 +375,7 @@ def _start_bare_metal( def _start_containerized( self, experiment_output_dir: str, + local_experiment_dir: str, flink_output_format: str, prometheus_scrape_interval: int, log_level: str, @@ -579,12 +388,46 @@ def _start_containerized( prometheus_host: str, prometheus_port: int, http_port: int, + remote_write_port: int, dump_precomputes: bool, query_language: str, lock_strategy: str, ) -> None: """Start Rust QueryEngine using containerized deployment with Jinja template.""" output_dir = os.path.join(experiment_output_dir, "query_engine_output") + local_output_dir = os.path.join(local_experiment_dir, "query_engine_output") + + # Inside the container, outputs are mounted at /app/outputs and controller + # configs are mounted at /app/controller_output (read-only). All paths + # written into the config dict must be container-internal paths. + container_output_dir = "/app/outputs" + container_controller_dir = "/app/controller_output" + + config = self._build_engine_config( + output_dir=container_output_dir, + flink_output_format=flink_output_format, + prometheus_scrape_interval=prometheus_scrape_interval, + log_level=log_level, + streaming_engine=streaming_engine, + forward_unsupported_queries=forward_unsupported_queries, + controller_config_dir=container_controller_dir, + compress_json=compress_json, + prometheus_server=f"http://{prometheus_host}:{prometheus_port}", + http_port=http_port, + remote_write_port=remote_write_port, + dump_precomputes=dump_precomputes, + query_language=query_language, + lock_strategy=lock_strategy, + profile_query_engine=profile_query_engine, + kafka_broker=f"{self.provider.get_node_ip(self.node_offset)}:9092", + ) + # Write the config to the host path that is volume-mounted as /app/outputs, + # so the container finds it at /app/outputs/engine_config.yaml. + self._write_engine_config_to_remote( + config_dict=config, + local_path=os.path.join(local_output_dir, "engine_config.yaml"), + remote_path=os.path.join(output_dir, "engine_config.yaml"), + ) # Paths on remote CloudLab node queryengine_dir = os.path.join( @@ -611,27 +454,8 @@ def _start_containerized( generate_cmd += ( f" --controller-remote-output-dir '{controller_remote_output_dir}'" ) - generate_cmd += f" --kafka-topic '{constants.FLINK_OUTPUT_TOPIC}'" - generate_cmd += f" --input-format '{flink_output_format}'" - generate_cmd += f" --prometheus-scrape-interval '{prometheus_scrape_interval}'" generate_cmd += f" --log-level '{log_level}'" - generate_cmd += f" --streaming-engine '{streaming_engine}'" - generate_cmd += f" --query-language '{query_language}'" - generate_cmd += f" --lock-strategy '{lock_strategy}'" - generate_cmd += f" --kafka-host '{self.provider.get_node_ip(self.node_offset)}'" - generate_cmd += f" --prometheus-host '{prometheus_host}'" - generate_cmd += f" --prometheus-port '{prometheus_port}'" generate_cmd += f" --http-port '{http_port}'" - - # Add optional flags - if compress_json: - generate_cmd += " --compress-json" - if profile_query_engine: - generate_cmd += " --profile-query-engine" - if forward_unsupported_queries: - generate_cmd += " --forward-unsupported-queries" - if dump_precomputes: - generate_cmd += " --dump-precomputes" if manual: generate_cmd += " --manual" @@ -766,38 +590,3 @@ def get_monitoring_keyword(self) -> str: return self.container_name else: return constants.QUERY_ENGINE_RS_PROCESS_KEYWORD - - -class QueryEngineServiceFactory: - """Factory for creating appropriate query engine services.""" - - @staticmethod - def create_query_engine_service( - language: str, - provider: InfrastructureProvider, - use_container: bool, - node_offset: int, - ) -> BaseQueryEngineService: - """ - Create a query engine service based on language. - - Args: - language: Programming language ("python" or "rust") - provider: Infrastructure provider for node communication and management - use_container: Whether to use containerized deployment - node_offset: Starting node index offset - - Returns: - Appropriate query engine service instance - - Raises: - ValueError: If language is not supported - """ - if language == "python": - return QueryEngineService(provider, use_container, node_offset) - elif language == "rust": - return QueryEngineRustService(provider, use_container, node_offset) - else: - raise ValueError( - f"Invalid query engine language: {language}. Supported languages are 'python' and 'rust'" - ) diff --git a/asap-tools/experiments/experiment_utils/services/remote_monitor_service.py b/asap-tools/experiments/experiment_utils/services/remote_monitor_service.py index 527c83d5..bb07d44f 100644 --- a/asap-tools/experiments/experiment_utils/services/remote_monitor_service.py +++ b/asap-tools/experiments/experiment_utils/services/remote_monitor_service.py @@ -81,7 +81,7 @@ def start( if query_engine_service is not None: keywords.append(query_engine_service.get_monitoring_keyword()) else: - keywords.append(constants.QUERY_ENGINE_PROCESS_KEYWORD) + keywords.append(constants.QUERY_ENGINE_RS_PROCESS_KEYWORD) if streaming_engine == "flink": keywords.append("sketch-0.1.jar") @@ -152,7 +152,7 @@ def start( if query_engine_service is not None: keywords.append(query_engine_service.get_monitoring_keyword()) else: - keywords.append(constants.QUERY_ENGINE_PROCESS_KEYWORD) + keywords.append(constants.QUERY_ENGINE_RS_PROCESS_KEYWORD) if streaming_engine == "flink": keywords.append("sketch-0.1.jar") # flinksketch jar diff --git a/asap-tools/experiments/generate_controller_compose.py b/asap-tools/experiments/generate_controller_compose.py index 63ed78e1..67721b65 100644 --- a/asap-tools/experiments/generate_controller_compose.py +++ b/asap-tools/experiments/generate_controller_compose.py @@ -111,7 +111,7 @@ def main(): parser.add_argument( "--streaming-engine", required=True, - choices=["flink", "arroyo"], + choices=["flink", "arroyo", "precompute"], help="Streaming engine", ) parser.add_argument( diff --git a/asap-tools/experiments/generate_queryengine_compose.py b/asap-tools/experiments/generate_queryengine_compose.py index 5532e897..eefd78d4 100644 --- a/asap-tools/experiments/generate_queryengine_compose.py +++ b/asap-tools/experiments/generate_queryengine_compose.py @@ -17,23 +17,9 @@ def generate_compose_file( container_name: str, experiment_output_dir: str, controller_remote_output_dir: str, - kafka_topic: str, - input_format: str, - prometheus_scrape_interval: str, log_level: str, - streaming_engine: str, - query_language: str, - kafka_host: str, - prometheus_host: str, - prometheus_port: int, - lock_strategy: str, http_port: str, - compress_json: bool = False, - profile_query_engine: bool = False, - forward_unsupported_queries: bool = False, manual: bool = False, - kafka_proxy_container_name: str = "sketchdb-kafka-proxy", - dump_precomputes: bool = False, ): """Generate docker-compose.yml from template with provided variables.""" @@ -53,24 +39,9 @@ def generate_compose_file( "queryengine_dir": queryengine_dir, "container_name": container_name, "http_port": http_port, + "log_level": log_level, "experiment_output_dir": experiment_output_dir, "controller_remote_output_dir": controller_remote_output_dir, - "kafka_topic": kafka_topic, - "input_format": input_format, - "prometheus_scrape_interval": prometheus_scrape_interval, - "log_level": log_level, - "streaming_engine": streaming_engine, - "query_language": query_language, - "lock_strategy": lock_strategy, - "compress_json": compress_json, - "profile_query_engine": profile_query_engine, - "forward_unsupported_queries": forward_unsupported_queries, - "manual": manual, - "kafka_host": kafka_host, - "prometheus_host": prometheus_host, - "prometheus_port": prometheus_port, - "kafka_proxy_container_name": kafka_proxy_container_name, - "dump_precomputes": dump_precomputes, } # Render the template @@ -122,63 +93,9 @@ def main(): required=True, help="Controller output directory", ) - parser.add_argument("--kafka-topic", required=True, help="Kafka topic name") - parser.add_argument( - "--input-format", required=True, choices=["json", "byte"], help="Input format" - ) - parser.add_argument( - "--prometheus-scrape-interval", required=True, help="Prometheus scrape interval" - ) parser.add_argument("--log-level", required=True, help="Log level") - parser.add_argument( - "--streaming-engine", - required=True, - choices=["flink", "arroyo"], - help="Streaming engine", - ) - parser.add_argument( - "--query-language", - required=True, - choices=["SQL", "PROMQL"], - help="Query language (SQL or PROMQL)", - ) - parser.add_argument( - "--lock-strategy", - required=True, - choices=["global", "per-key"], - help="Lock strategy for SimpleMapStore", - ) - - # Optional arguments - parser.add_argument( - "--compress-json", action="store_true", help="Enable JSON compression" - ) - parser.add_argument( - "--profile-query-engine", action="store_true", help="Enable profiling" - ) - parser.add_argument( - "--forward-unsupported-queries", - action="store_true", - help="Forward unsupported queries", - ) - parser.add_argument("--manual", action="store_true", help="Manual mode") - parser.add_argument("--kafka-host", required=True, help="Kafka host IP") - parser.add_argument("--prometheus-host", required=True, help="Prometheus host IP") - parser.add_argument( - "--prometheus-port", - type=int, - required=True, - help="Prometheus server port (9090 for Prometheus, 8428 for VictoriaMetrics)", - ) - parser.add_argument( - "--kafka-proxy-container-name", - default="sketchdb-kafka-proxy", - help="Kafka proxy container name", - ) parser.add_argument("--http-port", required=True, help="HTTP port") - parser.add_argument( - "--dump-precomputes", action="store_true", help="Dump precomputes" - ) + parser.add_argument("--manual", action="store_true", help="Manual mode") args = parser.parse_args() @@ -189,23 +106,9 @@ def main(): container_name=args.container_name, experiment_output_dir=args.experiment_output_dir, controller_remote_output_dir=args.controller_remote_output_dir, - kafka_topic=args.kafka_topic, - input_format=args.input_format, - prometheus_scrape_interval=args.prometheus_scrape_interval, log_level=args.log_level, - streaming_engine=args.streaming_engine, - query_language=args.query_language, - lock_strategy=args.lock_strategy, http_port=args.http_port, - compress_json=args.compress_json, - profile_query_engine=args.profile_query_engine, - forward_unsupported_queries=args.forward_unsupported_queries, manual=args.manual, - kafka_host=args.kafka_host, - prometheus_host=args.prometheus_host, - prometheus_port=args.prometheus_port, - kafka_proxy_container_name=args.kafka_proxy_container_name, - dump_precomputes=args.dump_precomputes, ) diff --git a/asap-tools/experiments/remote_monitor.py b/asap-tools/experiments/remote_monitor.py index cc113efe..34a5c2c4 100644 --- a/asap-tools/experiments/remote_monitor.py +++ b/asap-tools/experiments/remote_monitor.py @@ -236,14 +236,6 @@ def main(args): profile_query_engine_pid = None if args.profile_query_engine: if ( - constants.QUERY_ENGINE_PY_PROCESS_KEYWORD in args.keywords - or constants.QUERY_ENGINE_PY_CONTAINER_NAME in args.keywords - ): - query_engine_pids = get_pids(constants.QUERY_ENGINE_PY_PROCESS_KEYWORD) - profile_query_engine_pid = query_engine_pids[ - 0 - ] # Take first PID for profiling - elif ( constants.QUERY_ENGINE_RS_PROCESS_KEYWORD in args.keywords or constants.QUERY_ENGINE_RS_CONTAINER_NAME in args.keywords ):