Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
24fd516
Revert address_hex
j0sh May 15, 2026
2c1dbae
Add LiveRunner registration
j0sh May 7, 2026
33135f6
Add trickle create / remove API
j0sh May 7, 2026
4a49cad
Add decode / demux callbacks
j0sh May 8, 2026
bf1b633
Update to session-based runner auth
j0sh May 11, 2026
a367886
ruff
j0sh May 11, 2026
d442b35
Add output bytes callback, improve callback cleanup
j0sh May 12, 2026
73e65d2
Handle orchestrator restarts in heartbeat
j0sh May 12, 2026
c932843
Refactor runner trickle handling
j0sh May 12, 2026
2adc861
Add methods to start / stop persistent runner sessions
j0sh May 12, 2026
785f165
Async-native json requests
j0sh May 13, 2026
2220cd2
Add runner discovery
j0sh May 13, 2026
a3dfb04
Get rid of code in orchestrator.py
j0sh May 13, 2026
8c39ef9
Add runner selection
j0sh May 13, 2026
4e5edb5
Add echo and text examples
j0sh May 14, 2026
336373f
Add live runner payments.
j0sh May 14, 2026
1a4a024
Rename session_url to runner_url where appropriate
j0sh May 14, 2026
3e27b15
live_runner: unify session and single-shot runner calls
j0sh May 15, 2026
effddc9
Thread body and method through to selection
j0sh May 15, 2026
ce9281d
Preserve payment session, simplify payments flow
j0sh May 15, 2026
9c4a7d0
Make runner discovery async, support orch discovery
j0sh May 15, 2026
f13043f
Port Scope to runners
j0sh May 15, 2026
d3db9aa
Clean up echo examples
j0sh May 15, 2026
c2d59ca
Add ping-pong websocket example
j0sh May 18, 2026
63b59df
Add channel reader callbacks
j0sh May 20, 2026
dae2abd
Runner session callbacks
j0sh May 21, 2026
b0f143b
Rename 'session' mode to 'persistent'
j0sh May 21, 2026
dc57090
Check for 401 instead of message string in heartbeat re-register
j0sh May 21, 2026
0d0e3e7
Handle Scope serverless and runners separately
j0sh May 21, 2026
bf01130
Accept LiveRunnerSessionRequest in stop helper
j0sh May 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions examples/echo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Echo Live Runner Demo

This example demonstrates:

* Runner registration
* Video input - taken from a local file
* Video output - echoed to output with blur applied
* Parameter updates - adjust the amount of blur

Start go-livepeer:

```sh
./livepeer -orchestrator -useLiveRunners -serviceAddr localhost:8935 -v 99 -orchSecret abcdef
```

Start the runner:

```sh
uv run examples/echo/runner.py --orchestrator https://localhost:8935 --orchSecret abcdef
```

Run the client with a local sample input (`~/samples/bbb_720p.mp4`):

```sh
uv run examples/echo/client.py --blur ~/samples/bbb_720p.mp4
```

The resulting file is stored at echo-out.ts. To use a different file
or redirect to stdout for live playback:

```sh
uv run client.py --blur --output - ~/samples/bbb_720p.mp4 | ffplay -
```

The client discovers the `livepeer-sample/echo` runner automatically. To use a
different orchestrator or discovery endpoint:

```sh
uv run examples/echo/client.py --discovery http://localhost:8935/discovery --blur ~/samples/bbb_720p.mp4
```
162 changes: 162 additions & 0 deletions examples/echo/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import asyncio
import sys
import time
from contextlib import nullcontext, suppress
from pathlib import Path

import av

from livepeer_gateway.errors import LivepeerGatewayError
from livepeer_gateway.live_runner import stop_runner_session
from livepeer_gateway.media_output import MediaOutput
from livepeer_gateway.media_publish import MediaPublish
from livepeer_gateway.http import post_json
from livepeer_gateway.selection import reserve_session

DEFAULT_DISCOVERY = "http://localhost:8935/discovery"
ECHO_APP_ID = "livepeer-sample/echo"
DEFAULT_OUTPUT = "echo-out.ts"
BLUR_UPDATE_INTERVAL_S = 0.01
MAX_BLUR_RADIUS = 100


def _log(*args: object) -> None:
print(*args, file=sys.stderr)


def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run the proxied echo Live Runner demo.")
parser.add_argument("input")
parser.add_argument("--discovery", default=DEFAULT_DISCOVERY)
parser.add_argument("--output", default=DEFAULT_OUTPUT)
parser.add_argument("--radius", type=int, default=75)
parser.add_argument("--max-frames", type=int, default=0, help="Stop after this many input video frames (0 = full file).")
parser.add_argument("--blur", action="store_true", help="Sweep blur radius while publishing the sample.")
return parser.parse_args()


def _channel_url(echo_response: dict[str, object], name: str) -> str:
url = echo_response.get(name)
if not isinstance(url, str) or not url:
raise LivepeerGatewayError(f"echo response missing {name!r} url")
return url


async def _publish_video(
input_path: Path,
publish_url: str,
*,
max_frames: int = 0,
app_url: str = "",
blur: bool = False,
) -> None:
input_ = av.open(str(input_path))
try:
if not input_.streams.video:
raise LivepeerGatewayError(f"No video stream found in input file: {input_path}")
publisher = MediaPublish(publish_url)
prev_pts_time: float | None = None
prev_wall: float | None = None
next_update_pts_time: float | None = None
blur_radius = 0
blur_direction = 1

try:
for index, frame in enumerate(input_.decode(video=0), start=1):
if max_frames > 0 and index > max_frames:
break
current_pts_time = None
if frame.pts is not None and frame.time_base is not None:
current_pts_time = float(frame.pts * frame.time_base)
if next_update_pts_time is None:
next_update_pts_time = current_pts_time

while (
blur
and app_url
and current_pts_time is not None
and next_update_pts_time is not None
and current_pts_time >= next_update_pts_time
):
await post_json(f"{app_url.rstrip('/')}/update", {"mode": "blur", "radius": blur_radius})
if blur_radius == MAX_BLUR_RADIUS:
blur_direction = -1
elif blur_radius == 0:
blur_direction = 1
blur_radius += blur_direction
next_update_pts_time += BLUR_UPDATE_INTERVAL_S

if (
prev_pts_time is not None
and prev_wall is not None
and current_pts_time is not None
):
delta_s = current_pts_time - prev_pts_time
elapsed_s = time.monotonic() - prev_wall
sleep_s = max(0.0, delta_s - elapsed_s)
if sleep_s > 0:
await asyncio.sleep(sleep_s)

if current_pts_time is not None:
prev_pts_time = current_pts_time
prev_wall = time.monotonic()

await publisher.write_frame(frame)
finally:
await publisher.close()
finally:
input_.close()


async def main() -> None:
args = _parse_args()
input_path = Path(args.input).expanduser()
output_stdout = args.output.strip().lower() in {"-", "stdout"}
output_path = None if output_stdout else Path(args.output).expanduser()
if not input_path.exists():
raise SystemExit(f"input file does not exist: {input_path}")

session = None

try:
session = await reserve_session(discovery_url=args.discovery, app=ECHO_APP_ID)
_log("runner_url:", session.runner.url if session.runner is not None else session.runner_url)
_log("session_id:", session.session_id)
_log("app_url:", session.app_url)

echo = await post_json(f"{session.app_url.rstrip('/')}/echo", {"radius": args.radius})
in_url = _channel_url(echo, "in")
out_url = _channel_url(echo, "out")
_log("in:", in_url)
_log("out:", out_url)

with nullcontext(sys.stdout.buffer) if output_stdout else output_path.open("wb") as fh:
def _write_chunk(chunk: bytes) -> None:
fh.write(chunk)
if output_stdout:
fh.flush()

async with MediaOutput(out_url, on_bytes=_write_chunk):
await _publish_video(
input_path,
in_url,
max_frames=max(0, args.max_frames),
app_url=session.app_url,
blur=args.blur,
)
_log("publish complete; waiting for output to drain...")
fh.flush()
except LivepeerGatewayError as exc:
raise SystemExit(f"ERROR: {exc}") from exc
finally:
if session is not None:
with suppress(Exception):
await stop_runner_session(session)


if __name__ == "__main__":
asyncio.run(main())
Loading