Skip to content

Add durable control-plane storage#185

Open
webhop123 wants to merge 4 commits into
XortexAI:mainfrom
webhop123:feature/control-plane-storage
Open

Add durable control-plane storage#185
webhop123 wants to merge 4 commits into
XortexAI:mainfrom
webhop123:feature/control-plane-storage

Conversation

@webhop123
Copy link
Copy Markdown

Summary

  • add a Mongo-backed control-plane store for single-use MCP temp tokens, OAuth authorization codes, admin sessions, and shared rate-limit counters
  • move auth/admin/rate-limit paths off process-local dictionaries while keeping the development/test in-memory fallback
  • add coverage for single-use token consumption, admin sessions, shared rate-limit counters, and the rate-limit dependency path

Refs #161

Tests

  • .\.venv\Scripts\python -m ruff check src\database\control_plane_store.py src\api\routes\auth.py src\api\routes\admin.py src\api\dependencies.py tests\unit\test_control_plane_store.py tests\api\test_dependencies_and_routes.py
  • .\.venv\Scripts\python -m pytest tests\unit\test_control_plane_store.py tests\unit\test_database_stores.py tests\api\test_dependencies_and_routes.py -q
  • git diff --check

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a centralized ControlPlaneStore to manage short-lived state, including OAuth tokens, admin sessions, and rate limits, replacing previous in-memory dictionaries. The new store uses MongoDB for persistent storage with an in-memory fallback for non-production environments. Key feedback points out that the check_rate_limit method performs blocking synchronous I/O despite being marked async, which may block the FastAPI event loop. Other suggestions include moving network-dependent initialization out of the class constructor and removing redundant code from the dependencies module.

Comment thread src/database/control_plane_store.py Outdated
Comment on lines +213 to +240
async def check_rate_limit(
self,
identity: str,
max_requests: int,
window_seconds: int,
) -> tuple[bool, int]:
now = time.time()
cutoff = now - window_seconds

if self._in_memory:
with _memory_lock:
hits = [hit for hit in _memory_rate_limits[identity] if hit > cutoff]
if len(hits) >= max_requests:
_memory_rate_limits[identity] = hits
return False, 0
hits.append(now)
_memory_rate_limits[identity] = hits
return True, max(max_requests - len(hits), 0)

record = self.rate_limits.find_one({"identity": identity}) or {}
hits = [float(hit) for hit in record.get("hits", []) if float(hit) > cutoff]
if len(hits) >= max_requests:
self._save_rate_limit(identity, hits, window_seconds)
return False, 0

hits.append(now)
self._save_rate_limit(identity, hits, window_seconds)
return True, max(max_requests - len(hits), 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This method is defined as async but performs synchronous, blocking MongoDB I/O using pymongo. In a FastAPI application, this will block the event loop and prevent other requests from being handled concurrently. Additionally, the read-modify-write pattern for the hits list is not atomic and can lead to race conditions in a distributed environment where multiple workers are processing requests for the same identity. Consider using an asynchronous driver like motor or wrapping these calls in a thread pool using loop.run_in_executor. For atomicity, you could use MongoDB's $push with $slice or a find_one_and_update with an aggregation pipeline.

Comment thread src/database/control_plane_store.py Outdated
self.rate_limits = None
self._connected = False
self._in_memory = False
self._try_connect()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Performing a blocking network operation like ping inside __init__ is discouraged as it can hang the application during module import or startup. Consider moving the connection logic to a dedicated initialization method that can be called during the application's startup sequence (e.g., using a FastAPI lifespan handler).

Comment thread src/api/dependencies.py

# ═══════════════════════════════════════════════════════════════════════════
# Sliding-window rate limiter (in-process, per-key)
# Sliding-window rate limiter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _SlidingWindowRateLimiter class and its instance _rate_limiter appear to be redundant now that rate limiting logic has been moved to ControlPlaneStore. Consider removing them and updating the associated tests to avoid maintaining dead code.

@webhop123
Copy link
Copy Markdown
Author

Addressed the review feedback in b9a5e1a:

  • moved the control-plane Mongo connection out of construction and made it lazy
  • moved rate-limit Mongo work off the event loop and replaced the read/modify/write path with a single find_one_and_update pipeline
  • removed the old in-process limiter and its test

Local checks:

  • ruff check src/database/control_plane_store.py src/api/dependencies.py tests/unit/test_control_plane_store.py tests/api/test_dependencies_and_routes.py
  • pytest tests/unit/test_control_plane_store.py tests/unit/test_database_stores.py tests/api/test_dependencies_and_routes.py -q

Copy link
Copy Markdown
Collaborator

@Ankit-Kotnala Ankit-Kotnala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@webhop123
Thanks for the PR, this seems good. Moving temp tokens, OAuth codes, admin sessions, and rate limits out of local memory definitely makes sense.

Before merging, can you please check a few things:

  • Bandit and staging deploy are failing right now, so I’d prefer to wait for those to be successful.
  • Most of the new tests seem to cover the in-memory fallback. Since the main change is Mongo-backed storage, can we add some coverage for that path too, especially the rate-limit update logic?
  • Rate limiting was moved off the event loop, but a few auth/admin paths still call the sync store methods from async routes. Can we wrap those as well or expose async methods?

Once these are handled, then I think this should be good to merge.

@webhop123
Copy link
Copy Markdown
Author

Addressed the requested items in c955b9b:

  • added async control-plane store wrappers and switched auth/admin routes to await them
  • added Mongo-path coverage for single-use tokens and the atomic rate-limit update/pruning logic
  • fixed the Bandit SARIF setup and guarded privileged Danger/staging jobs for fork PRs where write permissions/secrets are unavailable

Current checks: Bandit, dependency audit, tests, e2e, Docker, API diff, and label are passing. Danger/staging are skipped on this fork PR; staging can still run from a trusted same-repo branch.

@webhop123 webhop123 requested a review from Ankit-Kotnala May 18, 2026 12:53
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 23, 2026

Greptile Summary

This PR introduces ControlPlaneStore, a MongoDB-backed singleton that replaces all in-process dictionaries used for MCP temp tokens, OAuth auth codes, admin sessions, and per-user rate-limit counters. An in-memory fallback is retained for development and test, and production is hardened to refuse the fallback.

  • Single-use tokens and admin sessions are stored as hashed records in a control_plane_records collection with a TTL index; consumption is atomic via find_one_and_delete.
  • Rate limiting uses a MongoDB aggregation pipeline (find_one_and_update with upsert) that prunes, counts, and appends hits atomically; the sync wrapper catches MongoDB exceptions and fails open for the rate-limit path only.
  • _verify_admin_token is promoted to async def to await the store call, and all auth/admin helpers are converted to async accordingly.

Confidence Score: 4/5

Safe to merge with one area worth addressing: the token and session MongoDB paths have no exception handling for transient post-connect failures, unlike the rate-limit path.

The rate-limit path now wraps MongoDB calls and fails open, which was the main concern raised previously. However, consume_single_use_token, get_admin_session, and create_admin_session let PyMongo exceptions propagate directly. A brief MongoDB hiccup after startup would make all admin endpoints and token-exchange requests return HTTP 500 for its duration.

src/database/control_plane_store.py — specifically the synchronous token and session methods that invoke MongoDB without a try/except.

Important Files Changed

Filename Overview
src/database/control_plane_store.py New core module: MongoDB-backed store for single-use tokens, admin sessions, and rate-limit counters with in-memory fallback; atomic aggregation pipeline for rate limiting; _check_rate_limit_sync wraps MongoDB calls in try/except but token/session methods do not.
src/api/dependencies.py Removes in-process _SlidingWindowRateLimiter and replaces _rate_limiter.check() with control_plane_store.check_rate_limit(); straightforward delegation with no logic change.
src/api/routes/admin.py Replaces _admin_sessions dict with control_plane_store; _verify_admin_token converted to async; session creation, lookup, and deletion all delegate to the new store correctly.
src/api/routes/auth.py Replaces _mcp_temp_tokens and _oauth_auth_codes dicts with control_plane_store; all helper functions correctly converted to async and callers updated to await.
tests/unit/test_control_plane_store.py New unit tests covering single-use token consumption, admin sessions, rate-limit counters (memory and Mongo paths), MongoDB failure handling, and production-mode memory fallback refusal.
tests/api/test_dependencies_and_routes.py Old _SlidingWindowRateLimiter unit test replaced with an integration test for enforce_rate_limit via a FakeControlPlaneStore monkeypatch; correctly validates 200 then 429 behaviour.
.github/workflows/danger.yml Adds fork-gate condition to the Danger job, preventing it from running on external PRs where secrets would be unavailable.

Fix All in Cursor Fix All in Codex Fix All in Claude Code

Reviews (2): Last reviewed commit: "Merge main and harden control-plane rate..." | Re-trigger Greptile

Comment on lines +265 to +284
async def check_rate_limit(
self,
identity: str,
max_requests: int,
window_seconds: int,
) -> tuple[bool, int]:
now = time.time()
cutoff = now - window_seconds

if self._in_memory:
return self._check_rate_limit_memory(identity, max_requests, now, cutoff)

return await asyncio.to_thread(
self._check_rate_limit_sync,
identity,
max_requests,
window_seconds,
now,
cutoff,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 MongoDB failure propagates as unhandled 500 in the rate-limit path

check_rate_limit dispatches to _check_rate_limit_sync_check_rate_limit_mongo, which calls find_one_and_update. If MongoDB becomes transiently unavailable after the initial connect (_connected = True), PyMongo raises an exception that propagates unhandled through asyncio.to_threadcheck_rate_limitenforce_rate_limit → FastAPI, returning HTTP 500 to every rate-limited API caller.

The old _SlidingWindowRateLimiter never threw; it always returned (True/False, int). There is no fallback in enforce_rate_limit, so a brief MongoDB network hiccup would disrupt all protected endpoints. Consider catching the exception and either failing open or raising an explicit HTTPException(status_code=503) rather than an opaque 500.

Fix in Cursor Fix in Codex Fix in Claude Code

Comment on lines +265 to +284
async def check_rate_limit(
self,
identity: str,
max_requests: int,
window_seconds: int,
) -> tuple[bool, int]:
now = time.time()
cutoff = now - window_seconds

if self._in_memory:
return self._check_rate_limit_memory(identity, max_requests, now, cutoff)

return await asyncio.to_thread(
self._check_rate_limit_sync,
identity,
max_requests,
window_seconds,
now,
cutoff,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 check_rate_limit skips _ensure_ready() before branching on _in_memory

Every other public method (create_single_use_token, get_admin_session, delete_admin_session, etc.) calls self._ensure_ready() first. check_rate_limit checks self._in_memory directly without initialising the store. On first use, when both _connected = False and _in_memory = False, the if self._in_memory: guard is False so the code falls through to the thread path, which correctly calls _ensure_ready() inside _check_rate_limit_sync. The runtime behaviour is correct, but any future refactor that relies on the method being callable in isolation could miss initialization.

Fix in Cursor Fix in Codex Fix in Claude Code

Comment on lines +364 to +368
if not record or not record.get("allowed"):
return False, 0

hits = record.get("hits", [])
return True, max(max_requests - len(hits), 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 When MongoDB is unavailable after initial connect, find_one_and_update raises a PyMongo exception that propagates unhandled through the call stack, returning HTTP 500 to every rate-limited caller. Wrapping the Mongo call and returning a safe default makes the degradation explicit and controllable.

Suggested change
if not record or not record.get("allowed"):
return False, 0
hits = record.get("hits", [])
return True, max(max_requests - len(hits), 0)
if not record or not record.get("allowed"):
return False, 0
hits = record.get("hits", [])
return True, max(max_requests - len(hits), 0)
def _check_rate_limit_mongo_safe(
self,
identity: str,
max_requests: int,
window_seconds: int,
now: float,
cutoff: float,
) -> tuple[bool, int]:
try:
return self._check_rate_limit_mongo(
identity, max_requests, window_seconds, now, cutoff
)
except Exception as exc:
logger.error("Rate-limit MongoDB error for %s: %s", identity, exc)
# Fail open: allow the request rather than returning an opaque 500.
return True, 0

Fix in Cursor Fix in Codex Fix in Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants