diff --git a/README.md b/README.md index e922856..6b19575 100644 --- a/README.md +++ b/README.md @@ -135,14 +135,15 @@ cmake --build --preset macos-release 📖 **For complete build instructions, troubleshooting, and platform-specific notes, see [README_BUILD.md](README_BUILD.md)** ### Building with Docker -The Dockerfile COPYs folders/files required to build the CPP SDK into the image. +The Docker setup is split into a reusable base image and an SDK image layered on top of it. **NOTE:** this has only been tested on Linux ```bash -docker build -t livekit-cpp-sdk . -f docker/Dockerfile +docker build -t livekit-cpp-sdk-base . -f docker/Dockerfile.base +docker build --build-arg BASE_IMAGE=livekit-cpp-sdk-base -t livekit-cpp-sdk . -f docker/Dockerfile.sdk docker run -it --network host livekit-cpp-sdk:latest bash ``` -__NOTE:__ if you are building your own Dockerfile, you will likely need to set the same `ENV` variables as in `docker/Dockerfile`, but to the relevant directories: +__NOTE:__ if you are building your own Dockerfile, you will likely need to set the same `ENV` variables as in `docker/Dockerfile.base`, but to the relevant directories: ```bash export CC=$HOME/gcc-14/bin/gcc export CXX=$HOME/gcc-14/bin/g++ diff --git a/benchmarks/data_track_throughput/.gitignore b/benchmarks/data_track_throughput/.gitignore new file mode 100644 index 0000000..56d866d --- /dev/null +++ b/benchmarks/data_track_throughput/.gitignore @@ -0,0 +1,3 @@ +*env*/ +*/__pycache__/ +*throughput_results/* diff --git a/benchmarks/data_track_throughput/CMakeLists.txt b/benchmarks/data_track_throughput/CMakeLists.txt new file mode 100644 index 0000000..533f6d8 --- /dev/null +++ b/benchmarks/data_track_throughput/CMakeLists.txt @@ -0,0 +1,95 @@ +# Copyright 2026 LiveKit, Inc. +# +# Standalone CMake build for the data-track throughput experiment. +# All paths are relative to CMAKE_CURRENT_SOURCE_DIR so this directory +# can be moved or renamed freely. + +cmake_minimum_required(VERSION 3.20) +project(DataTrackThroughput LANGUAGES CXX) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +# ---- Dependencies -------------------------------------------------------- + +find_package(LiveKit CONFIG REQUIRED) + +find_package(nlohmann_json 3.11 QUIET) +if(NOT nlohmann_json_FOUND) + include(FetchContent) + FetchContent_Declare( + nlohmann_json + GIT_REPOSITORY https://github.com/nlohmann/json.git + GIT_TAG v3.11.3 + GIT_SHALLOW TRUE + ) + FetchContent_MakeAvailable(nlohmann_json) +endif() + +# ---- Targets ------------------------------------------------------------- + +set(_targets DataTrackThroughputProducer DataTrackThroughputConsumer) + +add_executable(DataTrackThroughputProducer producer.cpp) +add_executable(DataTrackThroughputConsumer consumer.cpp) + +foreach(_target ${_targets}) + target_include_directories(${_target} PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}") + target_link_libraries(${_target} PRIVATE LiveKit::livekit nlohmann_json::nlohmann_json) +endforeach() + +# ---- RPATH --------------------------------------------------------------- + +if(UNIX) + if(APPLE) + set_target_properties(${_targets} PROPERTIES + BUILD_RPATH "@loader_path" + INSTALL_RPATH "@loader_path" + ) + else() + set_target_properties(${_targets} PROPERTIES + BUILD_RPATH "$ORIGIN" + INSTALL_RPATH "$ORIGIN" + BUILD_RPATH_USE_ORIGIN TRUE + ) + endif() +endif() + +# ---- Copy SDK shared libraries next to executables ----------------------- + +get_target_property(_lk_location LiveKit::livekit LOCATION) +if(_lk_location) + get_filename_component(_lk_lib_dir "${_lk_location}" DIRECTORY) +else() + get_target_property(_lk_location LiveKit::livekit IMPORTED_LOCATION) + if(NOT _lk_location) + get_target_property(_lk_location LiveKit::livekit IMPORTED_LOCATION_RELEASE) + endif() + if(NOT _lk_location) + get_target_property(_lk_location LiveKit::livekit IMPORTED_LOCATION_DEBUG) + endif() + if(_lk_location) + get_filename_component(_lk_lib_dir "${_lk_location}" DIRECTORY) + endif() +endif() + +if(_lk_lib_dir) + if(WIN32) + file(GLOB _sdk_shared_libs "${_lk_lib_dir}/../bin/*.dll" "${_lk_lib_dir}/*.dll") + elseif(APPLE) + file(GLOB _sdk_shared_libs "${_lk_lib_dir}/*.dylib") + else() + file(GLOB _sdk_shared_libs "${_lk_lib_dir}/*.so" "${_lk_lib_dir}/*.so.*") + endif() + + foreach(_target ${_targets}) + foreach(_lib ${_sdk_shared_libs}) + get_filename_component(_lib_name "${_lib}" NAME) + add_custom_command(TARGET ${_target} POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "${_lib}" "$/${_lib_name}" + COMMENT "Copying ${_lib_name} next to ${_target}" + ) + endforeach() + endforeach() +endif() diff --git a/benchmarks/data_track_throughput/README.md b/benchmarks/data_track_throughput/README.md new file mode 100644 index 0000000..02af1d4 --- /dev/null +++ b/benchmarks/data_track_throughput/README.md @@ -0,0 +1,273 @@ +# Data Track Throughput Experiment + +Coordinated producer and consumer for benchmarking `LocalDataTrack` / +`RemoteDataTrack` throughput across a sweep of payload sizes and publish rates. + +## What It Does + +- `producer.cpp` + - Publishes a data track named `data-track-throughput` + - Runs a default sweep of payload sizes and publish rates (see + **Test Bounds** below) + - Calls the consumer over RPC before and after each scenario + +- `consumer.cpp` + - Registers a room data-frame callback for the producer's data track + - Receives every frame and records arrival timestamps + - Logs validation warnings (size mismatches, header mismatches, etc.) to stderr + - Tracks duplicates and missing messages + - Appends raw data to scenario-level and per-message CSV files + +## Design Principles + +- **Raw data only in CSV.** The consumer writes only directly measured values + (counts, byte totals, microsecond timestamps). All derived metrics (throughput, + latency percentiles, delivery ratio, etc.) are computed at analysis time by + `scripts/plot_throughput.py`. +- **Fixed packet size per scenario.** Each scenario uses a single + `packet_size_bytes`. This ensures every message in a run is the same size, + making aggregate measurements unambiguous. +- **Minimal measurement overhead.** The hot `onDataFrame` callback captures the + arrival timestamp first, then appends to an in-memory vector under a brief + mutex. File I/O happens only at finalization after all data is collected. + +## Test Bounds + +All bounds are defined in `common.h`. A scenario is any combination of +(payload size, publish rate) that passes all three constraints below. + +### Hard Limits + +| Parameter | Min | Max | +|-----------|-----|-----| +| Packet size | 1 KiB | 256 MiB | +| Publish rate | 1 Hz | 50k Hz | + +### Data-Rate Budget + +Every scenario must satisfy: + +``` +packet_size_bytes * desired_rate_hz <= 10 Gbps (1.25 GB/s) +``` + +This naturally allows small messages at very high rates and large messages at +low rates while preventing any single scenario from attempting an unreasonable +throughput that would destabilize the connection. + +### Default Sweep Grid + +The default sweep iterates over 13 payload sizes and 13 publish rates, skipping +any combination that exceeds the data-rate budget: + +**Payload sizes:** 1 KiB, 4 KiB, 16 KiB, 64 KiB, 128 KiB, 256 KiB, 512 KiB, +1 MiB, 2 MiB, 4 MiB, 16 MiB, 64 MiB, 256 MiB + +**Publish rates:** 1, 5, 10, 25, 50, 100, 200, 500, 1k, 5k, 10k, 20k, 50k Hz + +The budget clips larger payloads to lower rates. For example: + +| Payload | Max rate allowed | +|---------|-----------------| +| 1 KiB | 50k Hz (all rates) | +| 16 KiB | 50k Hz (all rates) | +| 64 KiB | 10k Hz | +| 256 KiB | 1k Hz | +| 1 MiB | 1k Hz | +| 4 MiB | 200 Hz | +| 64 MiB | 10 Hz | +| 256 MiB | 1 Hz | + +The budget clips larger payloads to lower rates. For example: + +| Payload | Max rate allowed | +|---------|-----------------| +| 1 KiB | 50k Hz (all rates) | +| 16 KiB | 50k Hz (all rates) | +| 64 KiB | 10k Hz | +| 256 KiB | 1k Hz | +| 1 MiB | 1k Hz | +| 4 MiB | 200 Hz | +| 64 MiB | 10 Hz | +| 256 MiB | 1 Hz | + +Single-scenario mode (`--rate-hz`, `--packet-size`, `--num-msgs`) bypasses the +default grid and only enforces the hard limits and data-rate budget, allowing +any valid combination to be tested explicitly. + +## CSV Output + +The consumer writes raw measurement data only. All derived metrics are computed +at analysis time by `scripts/plot_throughput.py`. + +### `throughput_summary.csv` + +One row per scenario. Contains only raw counts, byte totals, and microsecond +timestamps: + +| Column | Description | +|--------|-------------| +| `run_id` | Unique scenario identifier | +| `scenario_name` | Human-readable scenario label | +| `desired_rate_hz` | Requested publish rate | +| `packet_size_bytes` | Fixed packet size for this scenario | +| `messages_requested` | Number of messages the producer was told to send | +| `messages_attempted` | Number of messages the producer tried to send | +| `messages_enqueued` | Number of messages successfully enqueued | +| `messages_enqueue_failed` | Number of enqueue failures | +| `messages_received` | Unique messages received by consumer | +| `messages_missed` | `messages_requested - messages_received` | +| `duplicate_messages` | Number of duplicate frames received | +| `attempted_bytes` | Total bytes the producer attempted to send | +| `enqueued_bytes` | Total bytes successfully enqueued | +| `received_bytes` | Total bytes received by consumer | +| `first_send_time_us` | Timestamp of first send (microseconds since epoch) | +| `last_send_time_us` | Timestamp of last send | +| `first_arrival_time_us` | Timestamp of first arrival at consumer | +| `last_arrival_time_us` | Timestamp of last arrival at consumer | + +### `throughput_messages.csv` + +One row per received frame. Raw observation data only: + +| Column | Description | +|--------|-------------| +| `run_id` | Scenario identifier | +| `sequence` | Message sequence number | +| `payload_bytes` | Actual payload size received | +| `send_time_us` | Producer send timestamp (microseconds since epoch) | +| `arrival_time_us` | Consumer arrival timestamp (microseconds since epoch) | +| `is_duplicate` | 1 if this sequence was already seen, 0 otherwise | + +## Prerequisites + +- CMake 3.20+ +- C++17 compiler +- The LiveKit C++ SDK, built and installed (see below) + +## Building + +All commands below assume you are in **this directory** +(`data_track_throughput/`). + +### 1. Build and install the SDK + +From the SDK repository root: + +```bash +./build.sh # builds the SDK (debug by default) +cmake --install build-debug --prefix local-install +``` + +### 2. Configure this experiment + +```bash +cmake -S . -B build \ + -DCMAKE_PREFIX_PATH="$(cd ../../local-install && pwd)" +``` + +> Adjust the `CMAKE_PREFIX_PATH` to wherever the SDK was installed. The path +> above assumes this directory lives two levels below the repository root; it +> works regardless of the parent directory's name. + +### 3. Build + +```bash +cmake --build build +``` + +The executables and required shared libraries are placed in `build/`. + +## Build Targets + +- `DataTrackThroughputConsumer` +- `DataTrackThroughputProducer` + +## Running + +## Generate Tokens + +```bash +# producer +lk token create \ + --api-key devkey \ + --api-secret secret \ + -i producer \ + --join \ + --valid-for 99999h \ + --room robo_room \ + --grant '{"canPublish":true,"canSubscribe":true,"canPublishData":true}' + +# consumer +lk token create \ + --api-key devkey \ + --api-secret secret \ + -i consumer \ + --join \ + --valid-for 99999h \ + --room robo_room \ + --grant '{"canPublish":true,"canSubscribe":true,"canPublishData":true}' +``` + +Start the local server: +```bash +LIVEKIT_CONFIG="enable_data_tracks: true" livekit-server --dev +``` + +Start the consumer first: + +```bash +./build/DataTrackThroughputConsumer +``` + +Then start the producer: + +```bash +./build/DataTrackThroughputProducer --consumer consumer +``` + +If you omit `--consumer`, the producer expects exactly one remote participant +to already be in the room. + +## Single Scenario + +Instead of the full sweep, you can run one scenario: + +```bash +./build/DataTrackThroughputProducer \ + \ + --consumer \ + --rate-hz 50 \ + --packet-size 1mb \ + --num-msgs 25 +``` + +## Plotting + +Generate plots from a benchmark output directory: + +```bash +python3 scripts/plot_throughput.py data_track_throughput_results +``` + +By default the script writes PNGs into `data_track_throughput_results/plots/`. +Pass `--output-dir ` to override the output location. + +All derived metrics (throughput, latency percentiles, delivery ratio, receive +rate, interarrival times) are computed from the raw CSV timestamps and counts +at plot time. + +### Generated Plots + +From `throughput_summary.csv` + `throughput_messages.csv`: + +| File | Description | +|------|-------------| +| `expected_vs_actual_throughput.png` | Scatter plot comparing expected vs actual receive throughput (Mbps). Points are colored by desired publish rate and sized by payload. An ideal y=x reference line is overlaid. | +| `dropped_messages_vs_expected_throughput.png` | Scatter plot of missed/dropped message count vs expected throughput, colored by payload size (log scale). | +| `actual_throughput_heatmap.png` | Heatmap of actual receive throughput (Mbps) with payload size on the y-axis and desired rate on the x-axis. | +| `delivery_ratio_heatmap.png` | Heatmap of delivery ratio (received / requested) over the same payload-size x rate grid. | +| `p50_latency_heatmap.png` | Heatmap of median (P50) send-to-receive latency (ms) over the same grid. | +| `p95_latency_heatmap.png` | Heatmap of P95 send-to-receive latency (ms) over the same grid. | +| `message_latency_histogram.png` | Histogram of per-message latency (ms) across all received frames. | +| `message_interarrival_series.png` | Time-series line plot of inter-arrival gaps (ms) for every received message, ordered by run then arrival time. | diff --git a/benchmarks/data_track_throughput/common.h b/benchmarks/data_track_throughput/common.h new file mode 100644 index 0000000..a8852f7 --- /dev/null +++ b/benchmarks/data_track_throughput/common.h @@ -0,0 +1,396 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace data_track_throughput { + +using json = nlohmann::json; + +constexpr const char *kDefaultTrackName = "data-track-throughput"; +constexpr const char *kPrepareRpcMethod = "throughput.prepare"; +constexpr const char *kFinishRpcMethod = "throughput.finish"; +constexpr std::size_t kMinimumPayloadBytes = 1024; +constexpr std::size_t kMaximumPayloadBytes = 256ull * 1024ull * 1024ull; +constexpr double kMinimumRateHz = 1.0; +constexpr double kMaximumRateHz = 50000.0; +constexpr double kMaximumDataRateBytesPerSec = 10'000'000'000.0; // 10 GBps +constexpr std::uint32_t kFrameMagic = 0x31545444u; // "DTT1" +constexpr std::uint32_t kFrameVersion = 1; +constexpr std::size_t kFrameHeaderBytes = 56; + +inline std::string getenvOrEmpty(const char *name) { + const char *value = std::getenv(name); + return value == nullptr ? std::string{} : std::string(value); +} + +inline std::uint64_t nowSystemUs() { + using namespace std::chrono; + return static_cast( + duration_cast(system_clock::now().time_since_epoch()) + .count()); +} + +inline std::string toLower(std::string value) { + std::transform( + value.begin(), value.end(), value.begin(), + [](unsigned char c) { return static_cast(std::tolower(c)); }); + return value; +} + +inline std::string trim(std::string value) { + auto is_space = [](unsigned char c) { return std::isspace(c) != 0; }; + value.erase(value.begin(), + std::find_if(value.begin(), value.end(), + [&](unsigned char c) { return !is_space(c); })); + value.erase(std::find_if(value.rbegin(), value.rend(), + [&](unsigned char c) { return !is_space(c); }) + .base(), + value.end()); + return value; +} + +inline std::size_t parseByteSize(const std::string &text) { + const std::string lowered = toLower(trim(text)); + if (lowered.empty()) { + throw std::runtime_error("Empty byte size"); + } + + std::size_t split = 0; + while (split < lowered.size() && + (std::isdigit(static_cast(lowered[split])) != 0 || + lowered[split] == '.')) { + ++split; + } + + const double number = std::stod(lowered.substr(0, split)); + const std::string unit = trim(lowered.substr(split)); + + double multiplier = 1.0; + if (unit.empty() || unit == "b") { + multiplier = 1.0; + } else if (unit == "kb" || unit == "kib" || unit == "k") { + multiplier = 1024.0; + } else if (unit == "mb" || unit == "mib" || unit == "m") { + multiplier = 1024.0 * 1024.0; + } else if (unit == "gb" || unit == "gib" || unit == "g") { + multiplier = 1024.0 * 1024.0 * 1024.0; + } else { + throw std::runtime_error("Unsupported size unit: " + text); + } + + return static_cast(std::llround(number * multiplier)); +} + +inline std::string humanBytes(std::size_t bytes) { + static constexpr const char *kUnits[] = {"B", "KiB", "MiB", "GiB"}; + double value = static_cast(bytes); + std::size_t unit_index = 0; + while (value >= 1024.0 && unit_index + 1 < std::size(kUnits)) { + value /= 1024.0; + ++unit_index; + } + + std::ostringstream oss; + oss << std::fixed << std::setprecision(unit_index == 0 ? 0 : 2) << value + << kUnits[unit_index]; + return oss.str(); +} + +inline std::string csvEscape(const std::string &value) { + if (value.find_first_of(",\"\n") == std::string::npos) { + return value; + } + + std::string escaped = "\""; + for (char ch : value) { + if (ch == '"') { + escaped += "\"\""; + } else { + escaped.push_back(ch); + } + } + escaped.push_back('"'); + return escaped; +} + +inline void ensureCsvHeader(const std::filesystem::path &path, + const std::string &header) { + if (std::filesystem::exists(path)) { + return; + } + + std::ofstream out(path); + if (!out) { + throw std::runtime_error("Failed to create CSV: " + path.string()); + } + out << header << '\n'; +} + +struct ScenarioRequest { + std::uint64_t run_id = 0; + std::string scenario_name; + std::string producer_identity; + std::string track_name = kDefaultTrackName; + double desired_rate_hz = 0.0; + std::size_t packet_size_bytes = 0; + std::size_t message_count = 0; +}; + +struct ProducerStats { + std::uint64_t run_id = 0; + std::string scenario_name; + std::size_t attempted_count = 0; + std::size_t enqueue_success_count = 0; + std::size_t enqueue_failure_count = 0; + std::uint64_t attempted_bytes = 0; + std::uint64_t enqueued_bytes = 0; + std::uint64_t first_send_time_us = 0; + std::uint64_t last_send_time_us = 0; +}; + +struct FrameHeader { + std::uint64_t run_id = 0; + std::uint64_t sequence = 0; + std::uint64_t message_count = 0; + std::uint64_t payload_size_bytes = 0; + std::uint64_t packet_size_bytes = 0; + std::uint64_t send_time_us = 0; +}; + +inline void writeLe32(std::vector &buffer, std::size_t offset, + std::uint32_t value) { + for (int idx = 0; idx < 4; ++idx) { + buffer[offset + idx] = + static_cast((value >> (idx * 8)) & 0xFF); + } +} + +inline void writeLe64(std::vector &buffer, std::size_t offset, + std::uint64_t value) { + for (int idx = 0; idx < 8; ++idx) { + buffer[offset + idx] = + static_cast((value >> (idx * 8)) & 0xFF); + } +} + +inline std::uint32_t readLe32(const std::vector &buffer, + std::size_t offset) { + std::uint32_t value = 0; + for (int idx = 0; idx < 4; ++idx) { + value |= static_cast(buffer[offset + idx]) << (idx * 8); + } + return value; +} + +inline std::uint64_t readLe64(const std::vector &buffer, + std::size_t offset) { + std::uint64_t value = 0; + for (int idx = 0; idx < 8; ++idx) { + value |= static_cast(buffer[offset + idx]) << (idx * 8); + } + return value; +} + +inline std::vector makePayload(const ScenarioRequest &request, + std::uint64_t sequence, + std::uint64_t send_time_us) { + if (request.packet_size_bytes < kFrameHeaderBytes) { + throw std::runtime_error("Payload too small for frame header"); + } + + std::vector payload(request.packet_size_bytes, + static_cast(sequence & 0xFF)); + writeLe32(payload, 0, kFrameMagic); + writeLe32(payload, 4, kFrameVersion); + writeLe64(payload, 8, request.run_id); + writeLe64(payload, 16, sequence); + writeLe64(payload, 24, static_cast(request.message_count)); + writeLe64(payload, 32, static_cast(request.packet_size_bytes)); + writeLe64(payload, 40, static_cast(request.packet_size_bytes)); + writeLe64(payload, 48, send_time_us); + return payload; +} + +inline std::optional +parseHeader(const std::vector &payload) { + if (payload.size() < kFrameHeaderBytes) { + return std::nullopt; + } + if (readLe32(payload, 0) != kFrameMagic || + readLe32(payload, 4) != kFrameVersion) { + return std::nullopt; + } + + FrameHeader header; + header.run_id = readLe64(payload, 8); + header.sequence = readLe64(payload, 16); + header.message_count = readLe64(payload, 24); + header.payload_size_bytes = readLe64(payload, 32); + header.packet_size_bytes = readLe64(payload, 40); + header.send_time_us = readLe64(payload, 48); + return header; +} + +inline std::string defaultScenarioName(std::size_t packet_size_bytes, + double rate_hz) { + std::ostringstream oss; + oss << humanBytes(packet_size_bytes); + oss << "_" << std::fixed << std::setprecision(0) << rate_hz << "Hz"; + std::string name = oss.str(); + std::replace(name.begin(), name.end(), '.', '_'); + return name; +} + +inline json toJson(const ScenarioRequest &request) { + return json{ + {"run_id", request.run_id}, + {"scenario_name", request.scenario_name}, + {"producer_identity", request.producer_identity}, + {"track_name", request.track_name}, + {"desired_rate_hz", request.desired_rate_hz}, + {"packet_size_bytes", request.packet_size_bytes}, + {"message_count", request.message_count}, + }; +} + +inline ScenarioRequest scenarioRequestFromJson(const json &value) { + ScenarioRequest request; + request.run_id = value.at("run_id").get(); + request.scenario_name = value.at("scenario_name").get(); + request.producer_identity = value.at("producer_identity").get(); + request.track_name = value.at("track_name").get(); + request.desired_rate_hz = value.at("desired_rate_hz").get(); + request.packet_size_bytes = value.at("packet_size_bytes").get(); + request.message_count = value.at("message_count").get(); + return request; +} + +inline json toJson(const ProducerStats &stats) { + return json{ + {"run_id", stats.run_id}, + {"scenario_name", stats.scenario_name}, + {"attempted_count", stats.attempted_count}, + {"enqueue_success_count", stats.enqueue_success_count}, + {"enqueue_failure_count", stats.enqueue_failure_count}, + {"attempted_bytes", stats.attempted_bytes}, + {"enqueued_bytes", stats.enqueued_bytes}, + {"first_send_time_us", stats.first_send_time_us}, + {"last_send_time_us", stats.last_send_time_us}, + }; +} + +inline ProducerStats producerStatsFromJson(const json &value) { + ProducerStats stats; + stats.run_id = value.at("run_id").get(); + stats.scenario_name = value.at("scenario_name").get(); + stats.attempted_count = value.at("attempted_count").get(); + stats.enqueue_success_count = + value.at("enqueue_success_count").get(); + stats.enqueue_failure_count = + value.at("enqueue_failure_count").get(); + stats.attempted_bytes = value.at("attempted_bytes").get(); + stats.enqueued_bytes = value.at("enqueued_bytes").get(); + stats.first_send_time_us = + value.at("first_send_time_us").get(); + stats.last_send_time_us = value.at("last_send_time_us").get(); + return stats; +} + +inline bool exceedsDataRateBudget(const ScenarioRequest &request) { + return static_cast(request.packet_size_bytes) * + request.desired_rate_hz > + kMaximumDataRateBytesPerSec; +} + +inline void validateScenario(const ScenarioRequest &request) { + if (request.message_count == 0) { + throw std::runtime_error("message_count must be greater than zero"); + } + if (request.desired_rate_hz < kMinimumRateHz || + request.desired_rate_hz > kMaximumRateHz) { + throw std::runtime_error("desired_rate_hz must be between 1 and 50000"); + } + if (request.packet_size_bytes < kMinimumPayloadBytes || + request.packet_size_bytes > kMaximumPayloadBytes) { + throw std::runtime_error( + "packet_size_bytes must be between 1 KiB and 256 MiB"); + } + if (exceedsDataRateBudget(request)) { + throw std::runtime_error("scenario exceeds data-rate budget of 10 Gbps"); + } +} + +inline std::vector +makeDefaultScenarioPlan(const std::string &producer_identity, + const std::string &track_name, + std::size_t messages_per_scenario) { + const std::vector payload_sizes = {1ull * 1024ull, + 4ull * 1024ull, + 16ull * 1024ull, + 64ull * 1024ull, + 128ull * 1024ull, + 256ull * 1024ull, + 512ull * 1024ull, + 1ull * 1024ull * 1024ull, + 2ull * 1024ull * 1024ull, + 4ull * 1024ull * 1024ull, + 16ull * 1024ull * 1024ull, + 64ull * 1024ull * 1024ull, + 256ull * 1024ull * 1024ull}; + const std::vector rates = {1.0, 5.0, 10.0, 25.0, 50.0, + 100.0, 200.0, 500.0, 1000.0, 5000.0, + 10000.0, 20000.0, 50000.0}; + + std::vector scenarios; + std::uint64_t run_id = 1; + + for (std::size_t payload_size : payload_sizes) { + for (double rate_hz : rates) { + ScenarioRequest request; + request.run_id = run_id++; + request.scenario_name = defaultScenarioName(payload_size, rate_hz); + request.producer_identity = producer_identity; + request.track_name = track_name; + request.desired_rate_hz = rate_hz; + request.packet_size_bytes = payload_size; + request.message_count = messages_per_scenario; + if (exceedsDataRateBudget(request)) { + continue; + } + scenarios.push_back(std::move(request)); + } + } + + return scenarios; +} + +} // namespace data_track_throughput diff --git a/benchmarks/data_track_throughput/consumer.cpp b/benchmarks/data_track_throughput/consumer.cpp new file mode 100644 index 0000000..9d22e8a --- /dev/null +++ b/benchmarks/data_track_throughput/consumer.cpp @@ -0,0 +1,537 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "common.h" + +#include "livekit/livekit.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace livekit; +using namespace data_track_throughput; +using namespace std::chrono_literals; + +namespace { + +std::atomic g_running{true}; + +struct ConsumerOptions { + std::string url; + std::string token; + std::filesystem::path output_dir = "data_track_throughput_results"; + std::string track_name = kDefaultTrackName; + std::chrono::milliseconds quiet_period{2000}; + std::chrono::milliseconds max_finish_wait{15000}; +}; + +struct MessageObservation { + std::uint64_t sequence = 0; + std::size_t payload_bytes = 0; + std::uint64_t send_time_us = 0; + std::uint64_t arrival_time_us = 0; + bool duplicate = false; +}; + +struct RunState { + ScenarioRequest request; + std::vector observations; + std::unordered_set seen_sequences; + ProducerStats producer_stats; + std::size_t duplicate_count = 0; + std::uint64_t first_arrival_us = 0; + std::uint64_t last_arrival_us = 0; + bool producer_finished = false; +}; + +void handleSignal(int) { g_running.store(false); } + +void printUsage(const char *prog) { + std::cout << "Usage:\n" + << " " << prog << " \n" + << " " << prog << " --url --token \n\n" + << "Optional flags:\n" + << " --output-dir CSV output directory. Default: " + "data_track_throughput_results\n" + << " --track-name Data track name. Default: " + << kDefaultTrackName << "\n" + << " --quiet-period-ms No-new-message window before " + "finalizing. Default: 2000\n" + << " --max-finish-wait-ms Hard cap for run finalization. " + "Default: 15000\n\n" + << "Env fallbacks:\n" + << " LIVEKIT_URL, LIVEKIT_TOKEN" << std::endl; +} + +bool parseArgs(int argc, char *argv[], ConsumerOptions &options) { + auto readFlagValue = [&](const std::string &flag, int &index) -> std::string { + const std::string arg = argv[index]; + const std::string prefix = flag + "="; + if (arg == flag && index + 1 < argc) { + return argv[++index]; + } + if (arg.rfind(prefix, 0) == 0) { + return arg.substr(prefix.size()); + } + return {}; + }; + + for (int index = 1; index < argc; ++index) { + const std::string arg = argv[index]; + if (arg == "-h" || arg == "--help") { + return false; + } + + if (arg.rfind("--url", 0) == 0) { + options.url = readFlagValue("--url", index); + } else if (arg.rfind("--token", 0) == 0) { + options.token = readFlagValue("--token", index); + } else if (arg.rfind("--output-dir", 0) == 0) { + options.output_dir = readFlagValue("--output-dir", index); + } else if (arg.rfind("--track-name", 0) == 0) { + options.track_name = readFlagValue("--track-name", index); + } else if (arg.rfind("--quiet-period-ms", 0) == 0) { + options.quiet_period = std::chrono::milliseconds( + std::stoll(readFlagValue("--quiet-period-ms", index))); + } else if (arg.rfind("--max-finish-wait-ms", 0) == 0) { + options.max_finish_wait = std::chrono::milliseconds( + std::stoll(readFlagValue("--max-finish-wait-ms", index))); + } + } + + std::vector positional; + for (int index = 1; index < argc; ++index) { + const std::string arg = argv[index]; + if (arg.rfind("--", 0) == 0) { + if (arg.find('=') == std::string::npos && index + 1 < argc) { + ++index; + } + continue; + } + positional.push_back(arg); + } + + if (options.url.empty() && positional.size() >= 1) { + options.url = positional[0]; + } + if (options.token.empty() && positional.size() >= 2) { + options.token = positional[1]; + } + + if (options.url.empty()) { + options.url = getenvOrEmpty("LIVEKIT_URL"); + } + if (options.token.empty()) { + options.token = getenvOrEmpty("LIVEKIT_TOKEN"); + } + + return !(options.url.empty() || options.token.empty()); +} + +class ThroughputConsumer { +public: + explicit ThroughputConsumer(const ConsumerOptions &options) + : options_(options), + summary_csv_path_(options.output_dir / "throughput_summary.csv"), + message_csv_path_(options.output_dir / "throughput_messages.csv") { + std::filesystem::create_directories(options_.output_dir); + ensureCsvHeader( + summary_csv_path_, + "run_id,scenario_name,desired_rate_hz,packet_size_bytes," + "messages_requested,messages_attempted,messages_enqueued," + "messages_enqueue_failed,messages_received,messages_missed," + "duplicate_messages,attempted_bytes,enqueued_bytes,received_bytes," + "first_send_time_us,last_send_time_us," + "first_arrival_time_us,last_arrival_time_us"); + ensureCsvHeader(message_csv_path_, + "run_id,sequence,payload_bytes,send_time_us," + "arrival_time_us,is_duplicate"); + } + + ~ThroughputConsumer() { + if (room_ != nullptr && + callback_id_ != std::numeric_limits::max()) { + room_->removeOnDataFrameCallback(callback_id_); + } + } + + void setRoom(Room &room) { room_ = &room; } + + void ensureDataCallbackRegistered(const ScenarioRequest &request) { + if (room_ == nullptr) { + throw std::runtime_error("Room not attached to throughput consumer"); + } + + if (callback_id_ != std::numeric_limits::max() && + callback_producer_identity_ == request.producer_identity && + callback_track_name_ == request.track_name) { + return; + } + + if (callback_id_ != std::numeric_limits::max()) { + room_->removeOnDataFrameCallback(callback_id_); + callback_id_ = std::numeric_limits::max(); + } + + callback_id_ = room_->addOnDataFrameCallback( + request.producer_identity, request.track_name, + [this](const std::vector &payload, + std::optional user_timestamp) { + this->onDataFrame(payload, user_timestamp); + }); + if (callback_id_ == std::numeric_limits::max()) { + throw std::runtime_error("Failed to register data frame callback"); + } + + callback_producer_identity_ = request.producer_identity; + callback_track_name_ = request.track_name; + + std::cout << "Listening for throughput data track '" << request.track_name + << "' from " << request.producer_identity << std::endl; + } + + std::optional + handlePrepareRpc(const RpcInvocationData &invocation) { + const ScenarioRequest request = + scenarioRequestFromJson(json::parse(invocation.payload)); + validateScenario(request); + ensureDataCallbackRegistered(request); + + std::unique_lock lock(mutex_); + RunState state; + state.request = request; + runs_[request.run_id] = std::move(state); + + std::cout << "Prepared " << request.scenario_name + << " rate=" << request.desired_rate_hz << "Hz" + << " packet_size=" << request.packet_size_bytes + << " messages=" << request.message_count << std::endl; + + return json{{"ready", true}, + {"scenario_name", request.scenario_name}, + {"track_name", options_.track_name}} + .dump(); + } + + std::optional + handleFinishRpc(const RpcInvocationData &invocation) { + const auto payload = json::parse(invocation.payload); + const ProducerStats stats = producerStatsFromJson(payload.at("stats")); + + { + std::lock_guard lock(mutex_); + auto it = runs_.find(stats.run_id); + if (it == runs_.end()) { + return json{{"ready", false}, + {"message", "Unknown run_id in finish RPC"}, + {"run_id", stats.run_id}} + .dump(); + } + + it->second.producer_stats = stats; + it->second.producer_finished = true; + cv_.notify_all(); + } + + return finalizeRun(stats.run_id); + } + +private: + void onDataFrame(const std::vector &payload, + std::optional user_timestamp) { + const std::uint64_t arrival_time_us = nowSystemUs(); + const auto header = parseHeader(payload); + if (!header) { + std::cerr << "Ignored frame with invalid throughput header" << std::endl; + return; + } + + std::lock_guard lock(mutex_); + auto run_it = runs_.find(header->run_id); + if (run_it == runs_.end()) { + std::cerr << "Ignored frame for unknown run_id=" << header->run_id + << std::endl; + return; + } + + RunState &run = run_it->second; + const bool duplicate = !run.seen_sequences.insert(header->sequence).second; + + if (duplicate) { + ++run.duplicate_count; + } + + if (payload.size() != run.request.packet_size_bytes) { + std::cerr << "WARN: payload size mismatch run_id=" << header->run_id + << " seq=" << header->sequence + << " expected=" << run.request.packet_size_bytes + << " got=" << payload.size() << std::endl; + } + if (header->payload_size_bytes != payload.size() || + header->message_count != run.request.message_count || + header->packet_size_bytes != run.request.packet_size_bytes) { + std::cerr << "WARN: header field mismatch run_id=" << header->run_id + << " seq=" << header->sequence << std::endl; + } + if (user_timestamp.has_value() && *user_timestamp != header->send_time_us) { + std::cerr << "WARN: timestamp mismatch run_id=" << header->run_id + << " seq=" << header->sequence << std::endl; + } + if (header->sequence >= run.request.message_count) { + std::cerr << "WARN: unexpected sequence run_id=" << header->run_id + << " seq=" << header->sequence + << " max=" << run.request.message_count << std::endl; + } + + if (run.first_arrival_us == 0) { + run.first_arrival_us = arrival_time_us; + } + run.last_arrival_us = arrival_time_us; + + MessageObservation observation; + observation.sequence = header->sequence; + observation.payload_bytes = payload.size(); + observation.send_time_us = user_timestamp.value_or(header->send_time_us); + observation.arrival_time_us = arrival_time_us; + observation.duplicate = duplicate; + run.observations.push_back(std::move(observation)); + } + + std::string finalizeRun(std::uint64_t run_id) { + RunState completed; + { + std::unique_lock lock(mutex_); + auto it = runs_.find(run_id); + if (it == runs_.end()) { + throw std::runtime_error("Run missing during finalization"); + } + + const auto deadline = + std::chrono::steady_clock::now() + options_.max_finish_wait; + while (g_running.load()) { + auto now = std::chrono::steady_clock::now(); + if (it->second.seen_sequences.size() >= + it->second.request.message_count) { + break; + } + if (now >= deadline) { + break; + } + if (it->second.producer_finished) { + const std::uint64_t reference_time_us = + it->second.last_arrival_us != 0 + ? it->second.last_arrival_us + : it->second.producer_stats.last_send_time_us; + if (reference_time_us != 0 && + nowSystemUs() > + reference_time_us + static_cast( + options_.quiet_period.count()) * + 1000ull) { + break; + } + } + + cv_.wait_for(lock, 100ms); + it = runs_.find(run_id); + if (it == runs_.end()) { + throw std::runtime_error("Run disappeared during finalization"); + } + } + + completed = std::move(it->second); + runs_.erase(it); + } + + auto summary = buildSummaryJson(completed); + appendMessageRows(completed); + appendSummaryRow(summary); + + std::cout << "Wrote scenario " << completed.request.scenario_name << " to " + << summary_csv_path_.string() << " and " + << message_csv_path_.string() << std::endl; + return summary.dump(); + } + + json buildSummaryJson(const RunState &run) const { + std::size_t unique_count = 0; + std::uint64_t received_bytes = 0; + for (const auto &obs : run.observations) { + if (!obs.duplicate) { + ++unique_count; + received_bytes += obs.payload_bytes; + } + } + + const std::size_t messages_missed = + run.request.message_count > unique_count + ? run.request.message_count - unique_count + : 0; + + json summary; + summary["run_id"] = run.request.run_id; + summary["scenario_name"] = run.request.scenario_name; + summary["desired_rate_hz"] = run.request.desired_rate_hz; + summary["packet_size_bytes"] = run.request.packet_size_bytes; + summary["messages_requested"] = run.request.message_count; + summary["messages_attempted"] = run.producer_stats.attempted_count; + summary["messages_enqueued"] = run.producer_stats.enqueue_success_count; + summary["messages_enqueue_failed"] = + run.producer_stats.enqueue_failure_count; + summary["messages_received"] = unique_count; + summary["messages_missed"] = messages_missed; + summary["duplicate_messages"] = run.duplicate_count; + summary["attempted_bytes"] = run.producer_stats.attempted_bytes; + summary["enqueued_bytes"] = run.producer_stats.enqueued_bytes; + summary["received_bytes"] = received_bytes; + summary["first_send_time_us"] = run.producer_stats.first_send_time_us; + summary["last_send_time_us"] = run.producer_stats.last_send_time_us; + summary["first_arrival_time_us"] = run.first_arrival_us; + summary["last_arrival_time_us"] = run.last_arrival_us; + return summary; + } + + void appendSummaryRow(const json &summary) const { + std::ofstream out(summary_csv_path_, std::ios::app); + if (!out) { + throw std::runtime_error("Failed to open summary CSV for append"); + } + + out << summary.at("run_id").get() << ',' + << csvEscape(summary.at("scenario_name").get()) << ',' + << summary.at("desired_rate_hz").get() << ',' + << summary.at("packet_size_bytes").get() << ',' + << summary.at("messages_requested").get() << ',' + << summary.at("messages_attempted").get() << ',' + << summary.at("messages_enqueued").get() << ',' + << summary.at("messages_enqueue_failed").get() << ',' + << summary.at("messages_received").get() << ',' + << summary.at("messages_missed").get() << ',' + << summary.at("duplicate_messages").get() << ',' + << summary.at("attempted_bytes").get() << ',' + << summary.at("enqueued_bytes").get() << ',' + << summary.at("received_bytes").get() << ',' + << summary.at("first_send_time_us").get() << ',' + << summary.at("last_send_time_us").get() << ',' + << summary.at("first_arrival_time_us").get() << ',' + << summary.at("last_arrival_time_us").get() << '\n'; + } + + void appendMessageRows(const RunState &run) const { + std::vector observations = run.observations; + std::sort(observations.begin(), observations.end(), + [](const MessageObservation &lhs, const MessageObservation &rhs) { + return lhs.arrival_time_us < rhs.arrival_time_us; + }); + + std::ofstream out(message_csv_path_, std::ios::app); + if (!out) { + throw std::runtime_error("Failed to open message CSV for append"); + } + + for (const auto &message : observations) { + out << run.request.run_id << ',' << message.sequence << ',' + << message.payload_bytes << ',' << message.send_time_us << ',' + << message.arrival_time_us << ',' << (message.duplicate ? 1 : 0) + << '\n'; + } + } + + ConsumerOptions options_; + std::filesystem::path summary_csv_path_; + std::filesystem::path message_csv_path_; + mutable std::mutex mutex_; + std::condition_variable cv_; + Room *room_ = nullptr; + DataFrameCallbackId callback_id_ = + std::numeric_limits::max(); + std::string callback_producer_identity_; + std::string callback_track_name_; + std::unordered_map runs_; +}; + +} // namespace + +int main(int argc, char *argv[]) { + ConsumerOptions options; + if (!parseArgs(argc, argv, options)) { + printUsage(argv[0]); + return 1; + } + + std::signal(SIGINT, handleSignal); +#ifdef SIGTERM + std::signal(SIGTERM, handleSignal); +#endif + + livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); + + try { + ThroughputConsumer consumer(options); + Room room; + consumer.setRoom(room); + + RoomOptions room_options; + room_options.auto_subscribe = true; + room_options.dynacast = false; + + std::cout << "Connecting to " << options.url << std::endl; + if (!room.Connect(options.url, options.token, room_options)) { + throw std::runtime_error("Failed to connect to LiveKit room"); + } + + auto *local_participant = room.localParticipant(); + if (local_participant == nullptr) { + throw std::runtime_error("Local participant unavailable after connect"); + } + + local_participant->registerRpcMethod( + kPrepareRpcMethod, [&consumer](const RpcInvocationData &invocation) { + return consumer.handlePrepareRpc(invocation); + }); + local_participant->registerRpcMethod( + kFinishRpcMethod, [&consumer](const RpcInvocationData &invocation) { + return consumer.handleFinishRpc(invocation); + }); + + std::cout << "Ready. CSV output directory: " + << std::filesystem::absolute(options.output_dir).string() + << std::endl; + + while (g_running.load()) { + std::this_thread::sleep_for(250ms); + } + } catch (const std::exception &e) { + std::cerr << e.what() << std::endl; + livekit::shutdown(); + return 1; + } + + livekit::shutdown(); + return 0; +} diff --git a/benchmarks/data_track_throughput/producer.cpp b/benchmarks/data_track_throughput/producer.cpp new file mode 100644 index 0000000..3845d67 --- /dev/null +++ b/benchmarks/data_track_throughput/producer.cpp @@ -0,0 +1,347 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "common.h" + +#include "livekit/livekit.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace livekit; +using namespace data_track_throughput; + +namespace { + +std::atomic g_running{true}; + +struct ProducerOptions { + std::string url; + std::string token; + std::string consumer_identity; + std::string track_name = kDefaultTrackName; + std::size_t messages_per_scenario = 30; + std::optional single_rate_hz; + std::optional single_packet_size_bytes; + std::optional single_message_count; +}; + +void handleSignal(int) { g_running.store(false); } + +void printUsage(const char *prog) { + std::cout + << "Usage:\n" + << " " << prog << " [--consumer ]\n" + << " " << prog + << " --url --token [--consumer ]\n\n" + << "Optional flags:\n" + << " --track-name Data track name. Default: " + << kDefaultTrackName << "\n" + << " --messages-per-scenario Default sweep message count. " + "Default: 30\n" + << " --rate-hz Run a single scenario instead of " + "the full sweep\n" + << " --packet-size Single-scenario packet size, e.g. " + "1mb\n" + << " --num-msgs Single-scenario message count\n\n" + << "Env fallbacks:\n" + << " LIVEKIT_URL, LIVEKIT_TOKEN" << std::endl; +} + +bool parseArgs(int argc, char *argv[], ProducerOptions &options) { + auto readFlagValue = [&](const std::string &flag, int &index) -> std::string { + const std::string arg = argv[index]; + const std::string prefix = flag + "="; + if (arg == flag && index + 1 < argc) { + return argv[++index]; + } + if (arg.rfind(prefix, 0) == 0) { + return arg.substr(prefix.size()); + } + return {}; + }; + + for (int index = 1; index < argc; ++index) { + const std::string arg = argv[index]; + if (arg == "-h" || arg == "--help") { + return false; + } + + if (arg.rfind("--url", 0) == 0) { + options.url = readFlagValue("--url", index); + } else if (arg.rfind("--token", 0) == 0) { + options.token = readFlagValue("--token", index); + } else if (arg.rfind("--consumer", 0) == 0) { + options.consumer_identity = readFlagValue("--consumer", index); + } else if (arg.rfind("--track-name", 0) == 0) { + options.track_name = readFlagValue("--track-name", index); + } else if (arg.rfind("--messages-per-scenario", 0) == 0) { + options.messages_per_scenario = static_cast( + std::stoull(readFlagValue("--messages-per-scenario", index))); + } else if (arg.rfind("--rate-hz", 0) == 0) { + options.single_rate_hz = std::stod(readFlagValue("--rate-hz", index)); + } else if (arg.rfind("--packet-size", 0) == 0) { + options.single_packet_size_bytes = + parseByteSize(readFlagValue("--packet-size", index)); + } else if (arg.rfind("--num-msgs", 0) == 0) { + options.single_message_count = static_cast( + std::stoull(readFlagValue("--num-msgs", index))); + } + } + + std::vector positional; + for (int index = 1; index < argc; ++index) { + const std::string arg = argv[index]; + if (arg.rfind("--", 0) == 0) { + if (arg.find('=') == std::string::npos && index + 1 < argc) { + ++index; + } + continue; + } + positional.push_back(arg); + } + + if (options.url.empty() && positional.size() >= 1) { + options.url = positional[0]; + } + if (options.token.empty() && positional.size() >= 2) { + options.token = positional[1]; + } + + if (options.url.empty()) { + options.url = getenvOrEmpty("LIVEKIT_URL"); + } + if (options.token.empty()) { + options.token = getenvOrEmpty("LIVEKIT_TOKEN"); + } + + const bool single_mode_requested = + options.single_rate_hz.has_value() || + options.single_packet_size_bytes.has_value() || + options.single_message_count.has_value(); + const bool single_mode_complete = + options.single_rate_hz.has_value() && + options.single_packet_size_bytes.has_value() && + options.single_message_count.has_value(); + if (single_mode_requested && !single_mode_complete) { + throw std::runtime_error("Single-scenario mode requires --rate-hz, " + "--packet-size, and --num-msgs"); + } + + return !(options.url.empty() || options.token.empty()); +} + +std::string waitForConsumerIdentity(Room &room, + const std::string &requested_identity, + std::chrono::seconds timeout) { + const auto deadline = std::chrono::steady_clock::now() + timeout; + while (g_running.load() && std::chrono::steady_clock::now() < deadline) { + if (!requested_identity.empty()) { + if (room.remoteParticipant(requested_identity) != nullptr) { + return requested_identity; + } + } else { + const auto participants = room.remoteParticipants(); + if (participants.size() == 1 && participants.front() != nullptr) { + return participants.front()->identity(); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + throw std::runtime_error( + requested_identity.empty() + ? "Timed out waiting for exactly one remote consumer participant" + : "Timed out waiting for consumer identity: " + requested_identity); +} + +std::vector +buildScenarioPlan(const ProducerOptions &options, + const std::string &producer_identity) { + if (options.single_rate_hz.has_value()) { + ScenarioRequest request; + request.run_id = 1; + request.scenario_name = defaultScenarioName( + *options.single_packet_size_bytes, *options.single_rate_hz); + request.producer_identity = producer_identity; + request.track_name = options.track_name; + request.desired_rate_hz = *options.single_rate_hz; + request.packet_size_bytes = *options.single_packet_size_bytes; + request.message_count = *options.single_message_count; + validateScenario(request); + return {request}; + } + + auto scenarios = makeDefaultScenarioPlan( + producer_identity, options.track_name, options.messages_per_scenario); + for (const auto &scenario : scenarios) { + validateScenario(scenario); + } + return scenarios; +} + +ProducerStats runScenario(LocalDataTrack &track, + const ScenarioRequest &request) { + ProducerStats stats; + stats.run_id = request.run_id; + stats.scenario_name = request.scenario_name; + + const auto frame_interval = + std::chrono::duration(1.0 / request.desired_rate_hz); + auto next_send = std::chrono::steady_clock::now(); + + for (std::size_t index = 0; index < request.message_count && g_running.load(); + ++index) { + const std::uint64_t send_time_us = nowSystemUs(); + + if (stats.first_send_time_us == 0) { + stats.first_send_time_us = send_time_us; + } + stats.last_send_time_us = send_time_us; + ++stats.attempted_count; + stats.attempted_bytes += request.packet_size_bytes; + + auto payload = + makePayload(request, static_cast(index), send_time_us); + auto push_result = track.tryPush(std::move(payload), send_time_us); + if (push_result) { + ++stats.enqueue_success_count; + stats.enqueued_bytes += request.packet_size_bytes; + } else { + ++stats.enqueue_failure_count; + std::cerr << "tryPush failed for scenario " << request.scenario_name + << " seq=" << index << " reason=" << push_result.error().message + << std::endl; + } + + next_send += + std::chrono::duration_cast( + frame_interval); + std::this_thread::sleep_until(next_send); + } + + return stats; +} + +void printScenarioSummary(const json &summary) { + std::cout << "Summary " << summary.value("scenario_name", "") + << ": received=" << summary.value("messages_received", 0) << "/" + << summary.value("messages_requested", 0) + << " missed=" << summary.value("messages_missed", 0) + << " received_bytes=" << summary.value("received_bytes", 0) + << std::endl; +} + +} // namespace + +int main(int argc, char *argv[]) { + ProducerOptions options; + try { + if (!parseArgs(argc, argv, options)) { + printUsage(argv[0]); + return 1; + } + } catch (const std::exception &e) { + std::cerr << "Argument error: " << e.what() << std::endl; + printUsage(argv[0]); + return 1; + } + + std::signal(SIGINT, handleSignal); +#ifdef SIGTERM + std::signal(SIGTERM, handleSignal); +#endif + + livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); + + try { + Room room; + RoomOptions room_options; + room_options.auto_subscribe = true; + room_options.dynacast = false; + + std::cout << "Connecting to " << options.url << std::endl; + if (!room.Connect(options.url, options.token, room_options)) { + throw std::runtime_error("Failed to connect to LiveKit room"); + } + + auto *local_participant = room.localParticipant(); + if (local_participant == nullptr) { + throw std::runtime_error("Local participant unavailable after connect"); + } + + const auto publish_result = + local_participant->publishDataTrack(options.track_name); + if (!publish_result) { + throw std::runtime_error("Failed to publish data track: " + + publish_result.error().message); + } + auto track = publish_result.value(); + + const std::string consumer_identity = waitForConsumerIdentity( + room, options.consumer_identity, std::chrono::seconds(30)); + const auto scenarios = + buildScenarioPlan(options, local_participant->identity()); + + std::cout << "Target consumer: " << consumer_identity << std::endl; + std::cout << "Running " << scenarios.size() << " scenario(s)" << std::endl; + + for (const auto &scenario : scenarios) { + if (!g_running.load()) { + break; + } + + std::cout << "Preparing " << scenario.scenario_name + << " rate=" << scenario.desired_rate_hz << "Hz" + << " packet_size=" << humanBytes(scenario.packet_size_bytes) + << " messages=" << scenario.message_count << std::endl; + + const auto prepare_payload = toJson(scenario).dump(); + const std::string prepare_response = local_participant->performRpc( + consumer_identity, kPrepareRpcMethod, prepare_payload, 30.0); + const json prepare_json = json::parse(prepare_response); + if (!prepare_json.value("ready", false)) { + throw std::runtime_error( + "Consumer rejected scenario " + scenario.scenario_name + ": " + + prepare_json.value("message", "unknown error")); + } + + const ProducerStats stats = runScenario(*track, scenario); + json finish_payload; + finish_payload["stats"] = toJson(stats); + const std::string finish_response = local_participant->performRpc( + consumer_identity, kFinishRpcMethod, finish_payload.dump(), 60.0); + const json summary = json::parse(finish_response); + printScenarioSummary(summary); + } + + track->unpublishDataTrack(); + } catch (const std::exception &e) { + std::cerr << e.what() << std::endl; + livekit::shutdown(); + return 1; + } + + livekit::shutdown(); + return 0; +} diff --git a/benchmarks/data_track_throughput/scripts/plot_throughput.py b/benchmarks/data_track_throughput/scripts/plot_throughput.py new file mode 100644 index 0000000..066c4d7 --- /dev/null +++ b/benchmarks/data_track_throughput/scripts/plot_throughput.py @@ -0,0 +1,491 @@ +#!/usr/bin/env python3 + +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import csv +import math +from pathlib import Path +from typing import Dict, List, Optional, Sequence, Tuple + +import matplotlib + +matplotlib.use("Agg") +import matplotlib.pyplot as plt +from matplotlib import colors +from matplotlib.ticker import FormatStrFormatter + + +SUMMARY_FILE = "throughput_summary.csv" +MESSAGES_FILE = "throughput_messages.csv" + +INT_FIELDS = { + "run_id", + "packet_size_bytes", + "messages_requested", + "messages_attempted", + "messages_enqueued", + "messages_enqueue_failed", + "messages_received", + "messages_missed", + "duplicate_messages", + "attempted_bytes", + "enqueued_bytes", + "received_bytes", + "first_send_time_us", + "last_send_time_us", + "first_arrival_time_us", + "last_arrival_time_us", + "sequence", + "payload_bytes", + "send_time_us", + "arrival_time_us", + "is_duplicate", +} + +FLOAT_FIELDS = { + "desired_rate_hz", +} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Generate throughput benchmark plots from data_track_throughput CSV output." + ) + parser.add_argument( + "path", + help="Path to the benchmark output directory, throughput_summary.csv, or throughput_messages.csv", + ) + parser.add_argument( + "--output-dir", + help="Directory to write plots into. Default: /plots", + ) + return parser.parse_args() + + +def resolve_paths(path_arg: str) -> Tuple[Path, Path, Path]: + input_path = Path(path_arg).expanduser().resolve() + if input_path.is_dir(): + csv_dir = input_path + elif input_path.is_file(): + csv_dir = input_path.parent + else: + raise FileNotFoundError(f"Input path does not exist: {input_path}") + + summary_path = csv_dir / SUMMARY_FILE + messages_path = csv_dir / MESSAGES_FILE + if not summary_path.exists(): + raise FileNotFoundError(f"Missing required summary CSV: {summary_path}") + return csv_dir, summary_path, messages_path + + +def convert_value(key: str, value: str): + if value == "": + return None + if key in INT_FIELDS: + return int(value) + if key in FLOAT_FIELDS: + return float(value) + return value + + +def load_csv_rows(path: Path) -> List[Dict[str, object]]: + with path.open(newline="") as handle: + reader = csv.DictReader(handle) + return [{key: convert_value(key, value) for key, value in row.items()} for row in reader] + + +def payload_label(payload_bytes: int) -> str: + units = ["B", "KiB", "MiB", "GiB"] + value = float(payload_bytes) + unit_index = 0 + while value >= 1024.0 and unit_index + 1 < len(units): + value /= 1024.0 + unit_index += 1 + formatted = f"{value:g}" + return f"{formatted} {units[unit_index]}" + + +def rate_label(rate_hz: float) -> str: + if rate_hz >= 1000.0: + value = rate_hz / 1000.0 + return f"{value:g}k Hz" + return f"{rate_hz:g} Hz" + + +def throughput_label(mbps: float) -> str: + if mbps >= 1000.0: + return f"{mbps / 1000.0:.2f} Gbps" + if mbps >= 100.0: + return f"{mbps:.0f} Mbps" + if mbps >= 10.0: + return f"{mbps:.1f} Mbps" + return f"{mbps:.2f} Mbps" + + +def save_plot(fig: plt.Figure, output_dir: Path, filename: str) -> None: + output_dir.mkdir(parents=True, exist_ok=True) + fig.tight_layout() + fig.savefig(output_dir / filename, dpi=180, bbox_inches="tight") + plt.close(fig) + + +# --------------------------------------------------------------------------- +# Derived-value helpers (computed from raw CSV fields at analysis time) +# --------------------------------------------------------------------------- + +def safe_duration_s(first_us: int, last_us: int) -> float: + if first_us == 0 or last_us <= first_us: + return 0.0 + return (last_us - first_us) / 1_000_000.0 + + +def throughput_mbps(total_bytes: int, duration_s: float) -> float: + if duration_s <= 0.0: + return 0.0 + return (total_bytes * 8.0) / duration_s / 1_000_000.0 + + +def percentile(values: List[float], p: float) -> float: + if not values: + return 0.0 + s = sorted(values) + idx = (p / 100.0) * (len(s) - 1) + lower = int(math.floor(idx)) + upper = int(math.ceil(idx)) + if lower == upper: + return s[lower] + frac = idx - lower + return s[lower] + (s[upper] - s[lower]) * frac + + +def enrich_summary_row(row: Dict[str, object]) -> Dict[str, object]: + """Add derived fields to a summary row from its raw fields.""" + msg_req = row["messages_requested"] + rate_hz = row["desired_rate_hz"] + expected_duration = msg_req / rate_hz if rate_hz > 0 else 0.0 + + send_duration = safe_duration_s(row["first_send_time_us"], row["last_send_time_us"]) + receive_window = safe_duration_s(row["first_arrival_time_us"], row["last_arrival_time_us"]) + + row["expected_duration_s"] = expected_duration + row["send_duration_s"] = send_duration + row["receive_window_s"] = receive_window + row["delivery_ratio"] = row["messages_received"] / msg_req if msg_req > 0 else 0.0 + row["expected_throughput_mbps"] = throughput_mbps(row["attempted_bytes"], expected_duration) + row["enqueue_throughput_mbps"] = throughput_mbps(row["enqueued_bytes"], send_duration) + row["actual_throughput_mbps"] = throughput_mbps(row["received_bytes"], receive_window) + received = row["messages_received"] + row["average_receive_rate_hz"] = (received - 1) / receive_window if received > 1 and receive_window > 0 else 0.0 + return row + + +def enrich_message_rows(rows: Sequence[Dict[str, object]]) -> List[Dict[str, object]]: + """Add latency_ms and interarrival_ms to per-message rows (grouped by run_id).""" + sorted_rows = sorted(rows, key=lambda r: (r["run_id"], r["arrival_time_us"])) + prev_arrival_by_run: Dict[int, int] = {} + enriched = [] + for row in sorted_rows: + rid = row["run_id"] + send_us = row["send_time_us"] + arrival_us = row["arrival_time_us"] + row["latency_ms"] = (arrival_us - send_us) / 1000.0 if send_us > 0 and arrival_us >= send_us else None + prev = prev_arrival_by_run.get(rid) + row["interarrival_ms"] = (arrival_us - prev) / 1000.0 if prev is not None and arrival_us >= prev else None + prev_arrival_by_run[rid] = arrival_us + enriched.append(row) + return enriched + + +def compute_latency_stats(message_rows: Sequence[Dict[str, object]]) -> Dict[int, Dict[str, float]]: + """Compute per-run latency statistics from per-message rows.""" + by_run: Dict[int, List[float]] = {} + for row in message_rows: + if row.get("is_duplicate", 0) == 1: + continue + lat = row.get("latency_ms") + if lat is not None: + by_run.setdefault(row["run_id"], []).append(lat) + + stats: Dict[int, Dict[str, float]] = {} + for rid, lats in by_run.items(): + stats[rid] = { + "average_latency_ms": sum(lats) / len(lats) if lats else 0.0, + "p50_latency_ms": percentile(lats, 50.0), + "p95_latency_ms": percentile(lats, 95.0), + "max_latency_ms": max(lats) if lats else 0.0, + } + return stats + + +# --------------------------------------------------------------------------- +# Plots +# --------------------------------------------------------------------------- + +def create_expected_vs_actual_plot(summary_rows: Sequence[Dict[str, object]], output_dir: Path) -> None: + fig, ax = plt.subplots(figsize=(9, 6)) + expected = [row["expected_throughput_mbps"] for row in summary_rows] + actual = [row["actual_throughput_mbps"] for row in summary_rows] + rates = [row["desired_rate_hz"] for row in summary_rows] + sizes = [row["packet_size_bytes"] for row in summary_rows] + point_sizes = [40 + (math.log10(max(1, size)) * 14) for size in sizes] + + scatter = ax.scatter(expected, actual, c=rates, s=point_sizes, cmap="viridis", alpha=0.85) + max_value = max(expected + actual) if expected or actual else 1.0 + ax.plot([0, max_value], [0, max_value], linestyle="--", color="gray", linewidth=1.0, label="ideal") + use_gbps = max_value >= 1000.0 + if use_gbps: + ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda v, _: f"{v / 1000:.1f}")) + ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda v, _: f"{v / 1000:.1f}")) + ax.set_title("Expected vs Actual Throughput") + unit = "Gbps" if use_gbps else "Mbps" + ax.set_xlabel(f"Expected throughput ({unit})") + ax.set_ylabel(f"Actual receive throughput ({unit})") + ax.grid(alpha=0.25) + ax.legend() + colorbar = fig.colorbar(scatter, ax=ax) + colorbar.set_label("Desired rate (Hz)") + tick_rates = sorted({r for r in rates}) + if len(tick_rates) > 8: + tick_rates = tick_rates[::max(1, len(tick_rates) // 8)] + colorbar.set_ticks(tick_rates) + colorbar.set_ticklabels([rate_label(r) for r in tick_rates]) + save_plot(fig, output_dir, "expected_vs_actual_throughput.png") + + +def create_drops_vs_throughput_plot(summary_rows: Sequence[Dict[str, object]], output_dir: Path) -> None: + fig, ax = plt.subplots(figsize=(9, 6)) + x_values = [row["expected_throughput_mbps"] for row in summary_rows] + y_values = [row["messages_missed"] for row in summary_rows] + colors_data = [row["packet_size_bytes"] for row in summary_rows] + + vmin = max(1, min(colors_data)) + vmax = max(colors_data) if colors_data else 1 + scatter = ax.scatter( + x_values, + y_values, + c=colors_data, + cmap="plasma", + norm=colors.LogNorm(vmin=vmin, vmax=vmax), + s=85, + alpha=0.85, + ) + max_expected = max(x_values) if x_values else 1.0 + use_gbps = max_expected >= 1000.0 + if use_gbps: + ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda v, _: f"{v / 1000:.1f}")) + ax.set_title("Dropped Messages vs Expected Throughput") + unit = "Gbps" if use_gbps else "Mbps" + ax.set_xlabel(f"Expected throughput ({unit})") + ax.set_ylabel("Dropped / missed messages") + ax.grid(alpha=0.25) + colorbar = fig.colorbar(scatter, ax=ax) + colorbar.set_label("Payload size (bytes)") + tick_values = sorted({v for v in colors_data if vmin <= v <= vmax}) + if len(tick_values) > 10: + tick_values = tick_values[::max(1, len(tick_values) // 8)] + colorbar.set_ticks(tick_values) + colorbar.set_ticklabels([payload_label(int(v)) for v in tick_values]) + save_plot(fig, output_dir, "dropped_messages_vs_expected_throughput.png") + + +def build_heatmap( + rows: Sequence[Dict[str, object]], + value_key: str, + title: str, + colorbar_label: str, + filename: str, + output_dir: Path, + annotation_fn=None, + colorbar_fmt: Optional[str] = None, +) -> None: + if not rows: + return + + rates = sorted({row["desired_rate_hz"] for row in rows}) + payloads = sorted({row["packet_size_bytes"] for row in rows}) + grid = [[math.nan for _ in rates] for _ in payloads] + + grouped: Dict[Tuple[int, float], List[float]] = {} + for row in rows: + key = (row["packet_size_bytes"], row["desired_rate_hz"]) + grouped.setdefault(key, []).append(float(row[value_key])) + + for payload_idx, payload in enumerate(payloads): + for rate_idx, rate in enumerate(rates): + values = grouped.get((payload, rate)) + if values: + grid[payload_idx][rate_idx] = sum(values) / len(values) + + fig, ax = plt.subplots(figsize=(10, 6)) + image = ax.imshow(grid, aspect="auto", origin="lower", cmap="viridis") + ax.set_title(title) + ax.set_xlabel("Desired rate (Hz)") + ax.set_ylabel("Payload size") + ax.set_xticks(range(len(rates))) + ax.set_xticklabels([rate_label(rate) for rate in rates], rotation=45, ha="right") + ax.set_yticks(range(len(payloads))) + ax.set_yticklabels([payload_label(payload) for payload in payloads]) + + for payload_idx, payload in enumerate(payloads): + for rate_idx, rate in enumerate(rates): + value = grid[payload_idx][rate_idx] + if math.isnan(value): + continue + label = annotation_fn(value) if annotation_fn else f"{value:.1f}" + ax.text(rate_idx, payload_idx, label, ha="center", va="center", color="white", fontsize=7) + + colorbar = fig.colorbar(image, ax=ax) + colorbar.set_label(colorbar_label) + if colorbar_fmt is not None: + colorbar.ax.yaxis.set_major_formatter(FormatStrFormatter(colorbar_fmt)) + save_plot(fig, output_dir, filename) + + +def create_delivery_ratio_plot(summary_rows: Sequence[Dict[str, object]], output_dir: Path) -> None: + build_heatmap( + summary_rows, + "delivery_ratio", + "Delivery Ratio Heatmap", + "Delivery ratio", + "delivery_ratio_heatmap.png", + output_dir, + annotation_fn=lambda v: f"{v:.2f}", + colorbar_fmt="%.2f", + ) + + +def create_actual_throughput_plot(summary_rows: Sequence[Dict[str, object]], output_dir: Path) -> None: + build_heatmap( + summary_rows, + "actual_throughput_mbps", + "Actual Throughput Heatmap", + "Actual receive throughput (Mbps)", + "actual_throughput_heatmap.png", + output_dir, + annotation_fn=lambda v: throughput_label(v), + ) + + +def create_latency_heatmap( + summary_rows: Sequence[Dict[str, object]], + latency_stats: Dict[int, Dict[str, float]], + output_dir: Path, +) -> None: + rows_with_lat = [] + for row in summary_rows: + stats = latency_stats.get(row["run_id"]) + if stats: + rows_with_lat.append({**row, **stats}) + build_heatmap( + rows_with_lat, + "p95_latency_ms", + "P95 Latency Heatmap", + "P95 latency (ms)", + "p95_latency_heatmap.png", + output_dir, + ) + + +def create_median_latency_heatmap( + summary_rows: Sequence[Dict[str, object]], + latency_stats: Dict[int, Dict[str, float]], + output_dir: Path, +) -> None: + rows_with_lat = [] + for row in summary_rows: + stats = latency_stats.get(row["run_id"]) + if stats: + rows_with_lat.append({**row, **stats}) + build_heatmap( + rows_with_lat, + "p50_latency_ms", + "Median Latency Heatmap", + "Median latency (ms)", + "p50_latency_heatmap.png", + output_dir, + ) + + +def create_message_latency_histogram(message_rows: Sequence[Dict[str, object]], output_dir: Path) -> None: + latencies = [row["latency_ms"] for row in message_rows if row.get("latency_ms") is not None and row.get("is_duplicate", 0) != 1] + if not latencies: + return + + fig, ax = plt.subplots(figsize=(9, 6)) + ax.hist(latencies, bins=min(60, max(10, int(math.sqrt(len(latencies))))), color="#3c7dd9", alpha=0.9) + ax.set_title("Message Latency Distribution") + ax.set_xlabel("Latency (ms)") + ax.set_ylabel("Message count") + ax.grid(alpha=0.2) + save_plot(fig, output_dir, "message_latency_histogram.png") + + +def create_message_interarrival_plot(message_rows: Sequence[Dict[str, object]], output_dir: Path) -> None: + rows = [row for row in message_rows if row.get("interarrival_ms") is not None and row.get("is_duplicate", 0) != 1] + if not rows: + return + + rows = sorted(rows, key=lambda row: (row["run_id"], row["arrival_time_us"])) + fig, ax = plt.subplots(figsize=(10, 6)) + x_values = list(range(len(rows))) + y_values = [row["interarrival_ms"] for row in rows] + ax.plot(x_values, y_values, linewidth=1.0, alpha=0.9) + ax.set_title("Inter-arrival Time Across Received Messages") + ax.set_xlabel("Received message index") + ax.set_ylabel("Inter-arrival time (ms)") + ax.grid(alpha=0.2) + save_plot(fig, output_dir, "message_interarrival_series.png") + + +def generate_plots( + summary_rows: Sequence[Dict[str, object]], + message_rows: Sequence[Dict[str, object]], + latency_stats: Dict[int, Dict[str, float]], + output_dir: Path, +) -> None: + create_expected_vs_actual_plot(summary_rows, output_dir) + create_drops_vs_throughput_plot(summary_rows, output_dir) + create_actual_throughput_plot(summary_rows, output_dir) + create_delivery_ratio_plot(summary_rows, output_dir) + create_latency_heatmap(summary_rows, latency_stats, output_dir) + create_median_latency_heatmap(summary_rows, latency_stats, output_dir) + if message_rows: + create_message_latency_histogram(message_rows, output_dir) + create_message_interarrival_plot(message_rows, output_dir) + + +def main() -> int: + args = parse_args() + csv_dir, summary_path, messages_path = resolve_paths(args.path) + output_dir = Path(args.output_dir).expanduser().resolve() if args.output_dir else csv_dir / "plots" + + summary_rows = load_csv_rows(summary_path) + if not summary_rows: + raise SystemExit(f"No rows found in {summary_path}") + summary_rows = [enrich_summary_row(row) for row in summary_rows] + + message_rows = load_csv_rows(messages_path) if messages_path.exists() else [] + message_rows = enrich_message_rows(message_rows) if message_rows else [] + latency_stats = compute_latency_stats(message_rows) if message_rows else {} + + generate_plots(summary_rows, message_rows, latency_stats, output_dir) + + print(f"Wrote plots to {output_dir}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())