diff --git a/src/agentex/lib/core/adapters/streams/adapter_redis.py b/src/agentex/lib/core/adapters/streams/adapter_redis.py index 7b355ee94..8446d67f1 100644 --- a/src/agentex/lib/core/adapters/streams/adapter_redis.py +++ b/src/agentex/lib/core/adapters/streams/adapter_redis.py @@ -15,18 +15,40 @@ logger = make_logger(__name__) +_DEFAULT_STREAM_MAXLEN = 10000 +_DEFAULT_STREAM_TTL_SECONDS = 3600 + + class RedisStreamRepository(StreamRepository): """ A simplified Redis implementation of the EventStreamRepository interface. Optimized for text/JSON streaming with SSE. """ - def __init__(self, redis_url: str | None = None): + def __init__( + self, + redis_url: str | None = None, + stream_maxlen: int | None = None, + stream_ttl_seconds: int | None = None, + ): # Get Redis URL from environment if not provided self.redis_url = redis_url or os.environ.get( "REDIS_URL", "redis://localhost:6379" ) self.redis = redis.from_url(self.redis_url) + self.stream_maxlen = ( + stream_maxlen + if stream_maxlen is not None + else int(os.environ.get("REDIS_STREAM_MAXLEN", _DEFAULT_STREAM_MAXLEN)) + ) + # 0 disables sliding TTL. + self.stream_ttl_seconds = ( + stream_ttl_seconds + if stream_ttl_seconds is not None + else int( + os.environ.get("REDIS_STREAM_TTL_SECONDS", _DEFAULT_STREAM_TTL_SECONDS) + ) + ) @override async def send_event(self, topic: str, event: dict[str, Any]) -> str: @@ -47,11 +69,40 @@ async def send_event(self, topic: str, event: dict[str, Any]) -> str: # # Uncomment to debug # logger.info(f"Sending event to Redis stream {topic}: {event_json}") - # Add to Redis stream with a reasonable max length - message_id = await self.redis.xadd( - name=topic, - fields={"data": event_json}, - ) + # Pipeline XADD + EXPIRE in one round-trip so the stream key gets + # a sliding TTL — orphaned streams (no writes for the TTL window) + # self-delete. Mirrors the server-side adapter (scaleapi/scale-agentex#215). + if self.stream_ttl_seconds > 0: + async with self.redis.pipeline(transaction=False) as pipe: + pipe.xadd( + name=topic, + fields={"data": event_json}, + maxlen=self.stream_maxlen, + approximate=True, + ) + pipe.expire(name=topic, time=self.stream_ttl_seconds) + # raise_on_error=False so an EXPIRE failure does not surface + # to the caller after XADD already succeeded — that would + # risk callers retrying and duplicating messages. A failed + # TTL refresh is recoverable: MAXLEN still caps RAM and the + # next write resets the clock. + results = await pipe.execute(raise_on_error=False) + # results[0] = xadd message ID (or Exception) + # results[1] = expire bool (or Exception) + message_id = results[0] + if isinstance(message_id, Exception): + raise message_id + if isinstance(results[1], Exception): + logger.warning( + f"Failed to refresh TTL on stream {topic}: {results[1]}" + ) + else: + message_id = await self.redis.xadd( + name=topic, + fields={"data": event_json}, + maxlen=self.stream_maxlen, + approximate=True, + ) return message_id except Exception as e: