Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 57 additions & 6 deletions src/agentex/lib/core/adapters/streams/adapter_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,40 @@
logger = make_logger(__name__)


_DEFAULT_STREAM_MAXLEN = 10000
_DEFAULT_STREAM_TTL_SECONDS = 3600


class RedisStreamRepository(StreamRepository):
"""
A simplified Redis implementation of the EventStreamRepository interface.
Optimized for text/JSON streaming with SSE.
"""

def __init__(self, redis_url: str | None = None):
def __init__(
self,
redis_url: str | None = None,
stream_maxlen: int | None = None,
stream_ttl_seconds: int | None = None,
):
# Get Redis URL from environment if not provided
self.redis_url = redis_url or os.environ.get(
"REDIS_URL", "redis://localhost:6379"
)
self.redis = redis.from_url(self.redis_url)
self.stream_maxlen = (
stream_maxlen
if stream_maxlen is not None
else int(os.environ.get("REDIS_STREAM_MAXLEN", _DEFAULT_STREAM_MAXLEN))
)
# 0 disables sliding TTL.
self.stream_ttl_seconds = (
stream_ttl_seconds
if stream_ttl_seconds is not None
else int(
os.environ.get("REDIS_STREAM_TTL_SECONDS", _DEFAULT_STREAM_TTL_SECONDS)
)
)

@override
async def send_event(self, topic: str, event: dict[str, Any]) -> str:
Expand All @@ -47,11 +69,40 @@ async def send_event(self, topic: str, event: dict[str, Any]) -> str:
# # Uncomment to debug
# logger.info(f"Sending event to Redis stream {topic}: {event_json}")

# Add to Redis stream with a reasonable max length
message_id = await self.redis.xadd(
name=topic,
fields={"data": event_json},
)
# Pipeline XADD + EXPIRE in one round-trip so the stream key gets
# a sliding TTL — orphaned streams (no writes for the TTL window)
# self-delete. Mirrors the server-side adapter (scaleapi/scale-agentex#215).
if self.stream_ttl_seconds > 0:
async with self.redis.pipeline(transaction=False) as pipe:
pipe.xadd(
name=topic,
fields={"data": event_json},
maxlen=self.stream_maxlen,
approximate=True,
)
pipe.expire(name=topic, time=self.stream_ttl_seconds)
# raise_on_error=False so an EXPIRE failure does not surface
# to the caller after XADD already succeeded — that would
# risk callers retrying and duplicating messages. A failed
# TTL refresh is recoverable: MAXLEN still caps RAM and the
# next write resets the clock.
results = await pipe.execute(raise_on_error=False)
# results[0] = xadd message ID (or Exception)
# results[1] = expire bool (or Exception)
message_id = results[0]
if isinstance(message_id, Exception):
raise message_id
if isinstance(results[1], Exception):
logger.warning(
f"Failed to refresh TTL on stream {topic}: {results[1]}"
)
else:
message_id = await self.redis.xadd(
name=topic,
fields={"data": event_json},
maxlen=self.stream_maxlen,
approximate=True,
)

return message_id
except Exception as e:
Expand Down
Loading