Add durable control-plane storage#185
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a centralized ControlPlaneStore to manage short-lived state, including OAuth tokens, admin sessions, and rate limits, replacing previous in-memory dictionaries. The new store uses MongoDB for persistent storage with an in-memory fallback for non-production environments. Key feedback points out that the check_rate_limit method performs blocking synchronous I/O despite being marked async, which may block the FastAPI event loop. Other suggestions include moving network-dependent initialization out of the class constructor and removing redundant code from the dependencies module.
| async def check_rate_limit( | ||
| self, | ||
| identity: str, | ||
| max_requests: int, | ||
| window_seconds: int, | ||
| ) -> tuple[bool, int]: | ||
| now = time.time() | ||
| cutoff = now - window_seconds | ||
|
|
||
| if self._in_memory: | ||
| with _memory_lock: | ||
| hits = [hit for hit in _memory_rate_limits[identity] if hit > cutoff] | ||
| if len(hits) >= max_requests: | ||
| _memory_rate_limits[identity] = hits | ||
| return False, 0 | ||
| hits.append(now) | ||
| _memory_rate_limits[identity] = hits | ||
| return True, max(max_requests - len(hits), 0) | ||
|
|
||
| record = self.rate_limits.find_one({"identity": identity}) or {} | ||
| hits = [float(hit) for hit in record.get("hits", []) if float(hit) > cutoff] | ||
| if len(hits) >= max_requests: | ||
| self._save_rate_limit(identity, hits, window_seconds) | ||
| return False, 0 | ||
|
|
||
| hits.append(now) | ||
| self._save_rate_limit(identity, hits, window_seconds) | ||
| return True, max(max_requests - len(hits), 0) |
There was a problem hiding this comment.
This method is defined as async but performs synchronous, blocking MongoDB I/O using pymongo. In a FastAPI application, this will block the event loop and prevent other requests from being handled concurrently. Additionally, the read-modify-write pattern for the hits list is not atomic and can lead to race conditions in a distributed environment where multiple workers are processing requests for the same identity. Consider using an asynchronous driver like motor or wrapping these calls in a thread pool using loop.run_in_executor. For atomicity, you could use MongoDB's $push with $slice or a find_one_and_update with an aggregation pipeline.
| self.rate_limits = None | ||
| self._connected = False | ||
| self._in_memory = False | ||
| self._try_connect() |
There was a problem hiding this comment.
Performing a blocking network operation like ping inside __init__ is discouraged as it can hang the application during module import or startup. Consider moving the connection logic to a dedicated initialization method that can be called during the application's startup sequence (e.g., using a FastAPI lifespan handler).
|
|
||
| # ═══════════════════════════════════════════════════════════════════════════ | ||
| # Sliding-window rate limiter (in-process, per-key) | ||
| # Sliding-window rate limiter |
|
Addressed the review feedback in b9a5e1a:
Local checks:
|
Ankit-Kotnala
left a comment
There was a problem hiding this comment.
@webhop123
Thanks for the PR, this seems good. Moving temp tokens, OAuth codes, admin sessions, and rate limits out of local memory definitely makes sense.
Before merging, can you please check a few things:
- Bandit and staging deploy are failing right now, so I’d prefer to wait for those to be successful.
- Most of the new tests seem to cover the in-memory fallback. Since the main change is Mongo-backed storage, can we add some coverage for that path too, especially the rate-limit update logic?
- Rate limiting was moved off the event loop, but a few auth/admin paths still call the sync store methods from async routes. Can we wrap those as well or expose async methods?
Once these are handled, then I think this should be good to merge.
|
Addressed the requested items in c955b9b:
Current checks: Bandit, dependency audit, tests, e2e, Docker, API diff, and label are passing. Danger/staging are skipped on this fork PR; staging can still run from a trusted same-repo branch. |
|
| Filename | Overview |
|---|---|
| src/database/control_plane_store.py | New core module: MongoDB-backed store for single-use tokens, admin sessions, and rate-limit counters with in-memory fallback; atomic aggregation pipeline for rate limiting; _check_rate_limit_sync wraps MongoDB calls in try/except but token/session methods do not. |
| src/api/dependencies.py | Removes in-process _SlidingWindowRateLimiter and replaces _rate_limiter.check() with control_plane_store.check_rate_limit(); straightforward delegation with no logic change. |
| src/api/routes/admin.py | Replaces _admin_sessions dict with control_plane_store; _verify_admin_token converted to async; session creation, lookup, and deletion all delegate to the new store correctly. |
| src/api/routes/auth.py | Replaces _mcp_temp_tokens and _oauth_auth_codes dicts with control_plane_store; all helper functions correctly converted to async and callers updated to await. |
| tests/unit/test_control_plane_store.py | New unit tests covering single-use token consumption, admin sessions, rate-limit counters (memory and Mongo paths), MongoDB failure handling, and production-mode memory fallback refusal. |
| tests/api/test_dependencies_and_routes.py | Old _SlidingWindowRateLimiter unit test replaced with an integration test for enforce_rate_limit via a FakeControlPlaneStore monkeypatch; correctly validates 200 then 429 behaviour. |
| .github/workflows/danger.yml | Adds fork-gate condition to the Danger job, preventing it from running on external PRs where secrets would be unavailable. |
Reviews (2): Last reviewed commit: "Merge main and harden control-plane rate..." | Re-trigger Greptile
| async def check_rate_limit( | ||
| self, | ||
| identity: str, | ||
| max_requests: int, | ||
| window_seconds: int, | ||
| ) -> tuple[bool, int]: | ||
| now = time.time() | ||
| cutoff = now - window_seconds | ||
|
|
||
| if self._in_memory: | ||
| return self._check_rate_limit_memory(identity, max_requests, now, cutoff) | ||
|
|
||
| return await asyncio.to_thread( | ||
| self._check_rate_limit_sync, | ||
| identity, | ||
| max_requests, | ||
| window_seconds, | ||
| now, | ||
| cutoff, | ||
| ) |
There was a problem hiding this comment.
MongoDB failure propagates as unhandled 500 in the rate-limit path
check_rate_limit dispatches to _check_rate_limit_sync → _check_rate_limit_mongo, which calls find_one_and_update. If MongoDB becomes transiently unavailable after the initial connect (_connected = True), PyMongo raises an exception that propagates unhandled through asyncio.to_thread → check_rate_limit → enforce_rate_limit → FastAPI, returning HTTP 500 to every rate-limited API caller.
The old _SlidingWindowRateLimiter never threw; it always returned (True/False, int). There is no fallback in enforce_rate_limit, so a brief MongoDB network hiccup would disrupt all protected endpoints. Consider catching the exception and either failing open or raising an explicit HTTPException(status_code=503) rather than an opaque 500.
| async def check_rate_limit( | ||
| self, | ||
| identity: str, | ||
| max_requests: int, | ||
| window_seconds: int, | ||
| ) -> tuple[bool, int]: | ||
| now = time.time() | ||
| cutoff = now - window_seconds | ||
|
|
||
| if self._in_memory: | ||
| return self._check_rate_limit_memory(identity, max_requests, now, cutoff) | ||
|
|
||
| return await asyncio.to_thread( | ||
| self._check_rate_limit_sync, | ||
| identity, | ||
| max_requests, | ||
| window_seconds, | ||
| now, | ||
| cutoff, | ||
| ) |
There was a problem hiding this comment.
check_rate_limit skips _ensure_ready() before branching on _in_memory
Every other public method (create_single_use_token, get_admin_session, delete_admin_session, etc.) calls self._ensure_ready() first. check_rate_limit checks self._in_memory directly without initialising the store. On first use, when both _connected = False and _in_memory = False, the if self._in_memory: guard is False so the code falls through to the thread path, which correctly calls _ensure_ready() inside _check_rate_limit_sync. The runtime behaviour is correct, but any future refactor that relies on the method being callable in isolation could miss initialization.
| if not record or not record.get("allowed"): | ||
| return False, 0 | ||
|
|
||
| hits = record.get("hits", []) | ||
| return True, max(max_requests - len(hits), 0) |
There was a problem hiding this comment.
When MongoDB is unavailable after initial connect,
find_one_and_update raises a PyMongo exception that propagates unhandled through the call stack, returning HTTP 500 to every rate-limited caller. Wrapping the Mongo call and returning a safe default makes the degradation explicit and controllable.
| if not record or not record.get("allowed"): | |
| return False, 0 | |
| hits = record.get("hits", []) | |
| return True, max(max_requests - len(hits), 0) | |
| if not record or not record.get("allowed"): | |
| return False, 0 | |
| hits = record.get("hits", []) | |
| return True, max(max_requests - len(hits), 0) | |
| def _check_rate_limit_mongo_safe( | |
| self, | |
| identity: str, | |
| max_requests: int, | |
| window_seconds: int, | |
| now: float, | |
| cutoff: float, | |
| ) -> tuple[bool, int]: | |
| try: | |
| return self._check_rate_limit_mongo( | |
| identity, max_requests, window_seconds, now, cutoff | |
| ) | |
| except Exception as exc: | |
| logger.error("Rate-limit MongoDB error for %s: %s", identity, exc) | |
| # Fail open: allow the request rather than returning an opaque 500. | |
| return True, 0 |
Summary
Refs #161
Tests
.\.venv\Scripts\python -m ruff check src\database\control_plane_store.py src\api\routes\auth.py src\api\routes\admin.py src\api\dependencies.py tests\unit\test_control_plane_store.py tests\api\test_dependencies_and_routes.py.\.venv\Scripts\python -m pytest tests\unit\test_control_plane_store.py tests\unit\test_database_stores.py tests\api\test_dependencies_and_routes.py -qgit diff --check