Fix Redis stream leak: MAXLEN on xadd + sliding TTL on stream keys#339
Open
Fix Redis stream leak: MAXLEN on xadd + sliding TTL on stream keys#339
Conversation
The SDK's RedisStreamRepository.send_event was calling xadd with no
MAXLEN, so every task:* stream grew unbounded for the lifetime of the
task. The accompanying comment ("Add to Redis stream with a reasonable
max length") suggested the cap was intended but never wired up.
This change matches the agentex server-side adapter, which has had
maxlen=REDIS_STREAM_MAXLEN, approximate=True since Jan 2 (PR #111 in
scaleapi/scale-agentex). Default is 10000 entries, overridable via the
REDIS_STREAM_MAXLEN env var, same as the server.
Note: this caps each stream's size during generation but does not
delete streams when their task completes -- that's a separate fix.
Contributor
|
on the backend side, i just made a change to pipeline xadd with an expire call to ensure streams get cleaned up by redis if nothing else, maybe worth doing something similar here? ref: scaleapi/scale-agentex#215 |
Mirror scaleapi/scale-agentex#215 (server-side adapter): pipeline XADD with EXPIRE so each task:* stream key gets a sliding TTL. Orphaned streams (no writes for the TTL window) self-delete in Redis without needing an explicit cleanup_stream call from the caller. This is the right shape of fix for the SDK's leak: an explicit DEL on terminal task transitions (an earlier draft of this PR) introduced a race where the server's task_updated event published to the same topic could be deleted before a connected frontend SSE consumer read it. EXPIRE sidesteps that — TTL only fires after inactivity, so an actively-streaming agent or actively-reading consumer keeps the key alive, and the key only ages out once everyone is done with it. Defaults match the server: REDIS_STREAM_TTL_SECONDS=3600 (1h), overridable via env var. Setting it to 0 short-circuits to plain XADD (no TTL refresh), matching the server's escape hatch. Implementation notes: - transaction=False on the pipeline: connection-level batching, no MULTI/EXEC overhead for what's already a fast op. - raise_on_error=False: an EXPIRE failure after a successful XADD must not surface to the caller. The message has been published; retrying would duplicate it. We log and move on. Next successful XADD will reset the TTL anyway.
a298f63 to
0a7cfa0
Compare
smoreinis
approved these changes
May 5, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
The
RedisStreamRepositoryin this SDK has been leaking memory in production: everytask:{id}stream grows unbounded for the lifetime of the task and is never deleted. On a long-lived cluster (BP) this manifested as 12k+ orphaned task streams consuming most of a 6GB Redis cache, requiring manualredis-cli DELflushes.Two complementary fixes, one commit each. Both mirror patterns the agentex server-side adapter (
scaleapi/scale-agentexagentex/src/adapters/streams/adapter_redis.py) already uses.Commit 1 —
perf(streaming): add MAXLEN to Redis xaddRedisStreamRepository.send_eventwas callingxaddwith noMAXLEN, despite a comment claiming "Add to Redis stream with a reasonable max length". The cap was intended but never wired up.Matches the server adapter's pattern from scale-agentex PR #111 (Jan 2). Default 10000 entries, overridable via
REDIS_STREAM_MAXLENenv var.Commit 2 —
feat(streaming): add sliding TTL to Redis stream keysMirrors scale-agentex PR #215 (Stas, just merged) on the SDK side. Pipelines
XADDwithEXPIREin one round-trip. Each delta write refreshes the TTL on the stream key, so:REDIS_STREAM_TTL_SECONDS.Default TTL: 3600 seconds (1h), matching the server. Configurable via
REDIS_STREAM_TTL_SECONDSenv var. Setting it to 0 short-circuits to plainXADD(no TTL refresh).Implementation notes:
transaction=Falseon the pipeline: connection-level batching, no MULTI/EXEC overhead for a hot path.raise_on_error=False: ifEXPIREfails afterXADDsucceeded, log and move on. We must not propagate the failure to the caller — the message has been published; a retry would duplicate it. The next successfulXADDwill reset the TTL anyway.Why this approach instead of cleanup_stream?
An earlier draft of this PR added an explicit
cleanup_streamcall to all terminal task transitions inTasksService(complete,fail,cancel,terminate,timeout,delete). Stas pointed out two issues with that approach which the EXPIRE pattern sidesteps:task_updatedevents. When the server processes a terminal task transition (POST /tasks/{id}/complete), it publishes atask_updatedevent to the sametask:{id}stream so connected SSE consumers see the state change. An SDK-sidecleanup_streamimmediately after the API call returns would race that event — a frontend mid-XREADcould miss the COMPLETED transition.cleanup_stream-on-disconnect path nukes the topic for all subscribers when one disconnects. Adding morecleanup_streamcallsites compounds that latent bug.EXPIRE avoids both: it only fires after inactivity, so any active reader keeps the key alive long enough to consume final events, and the topic-deletion-on-disconnect bug is rendered moot once the SDK no longer relies on explicit DEL.
Validation
End-to-end tested against BP's dev cluster on 2026-05-05 with this branch path-pinned into the agent images. Ran an eval (
-k 2, 2 examples → 2 orchestrator workflows + 2 tardis subagent workflows = 4 task streams).Redis state ~3 minutes after eval finished:
task:58baa63e-...task:251f2dac-...task:7d87bbd3-...task:49bb2778-...All four streams have active TTLs of ~3400s (~57 min, which is
3600s - elapsed_since_last_write). They will self-delete from Redis ~57 min from the last delta write — no explicit cleanup call needed.Side-by-side baseline (same Redis cluster, same time, but pods running the unmodified
agentex-sdk==0.10.3from PyPI):task:e2963d73-...task:f7c9f250-...task:30526127-...task:c3dcb6ec-...The only difference between the two cohorts is the SDK version pinned in the agent images. The TTL is the difference this PR makes.
This pattern also closes the orphan case our application layer can't fix: long-lived orchestrator workflows (which never call
tasks.completebecause they're designed to stay alive across multiple user messages) used to leak their stream forever. Now their stream auto-cleans withinREDIS_STREAM_TTL_SECONDSof the last delta, regardless of workflow lifecycle.Test plan
uv run ruff check src/agentex/lib/core/adapters/streams/adapter_redis.py)RedisStreamRepository()with no kwargs continues to work; settingREDIS_STREAM_TTL_SECONDS=0disables the TTL behavior entirely