feat(ws): add public Client.stop() for graceful shutdown#128
Open
ChangjunZhao wants to merge 1 commit into
Open
feat(ws): add public Client.stop() for graceful shutdown#128ChangjunZhao wants to merge 1 commit into
ChangjunZhao wants to merge 1 commit into
Conversation
Currently the WebSocket Client exposes no way to gracefully shut down — the
only public lifecycle method is ``start()`` which blocks forever on
``loop.run_until_complete(_select())`` (``_select`` is ``while True: sleep
3600``). Callers have to:
1. Run ``start()`` on a worker thread because it blocks
2. Have no clean way to stop it — terminating the thread mid-flight leaves
pending ``ping_loop`` / ``receive_message_loop`` tasks, leaks the
WebSocket connection, and triggers asyncio warnings like:
* Task was destroyed but it is pending!
* Task exception was never retrieved
3. Worse, ``auto_reconnect=True`` means even if you do close the underlying
conn externally, the SDK reconnects on its own.
Downstream apps end up reaching for private attributes — ``_disconnect`` /
``_conn`` / ``_auto_reconnect`` — to work around this. That's fragile across
SDK upgrades and an unfortunate API gap.
This PR adds a public ``async def stop()`` that:
- Sets ``_auto_reconnect = False`` so the receive loop's exception handler
doesn't re-establish the connection after we close it
- Cancels three background tasks (ping_loop, receive_message_loop, the new
``_main_task`` that replaces the bare ``_select()`` call) so loop close
doesn't warn about pending tasks
- Calls ``_disconnect()`` to close the underlying WebSocket
- Releases the blocking ``start()`` by cancelling ``_main_task``
- Is idempotent — safe to call multiple times
Designed for cross-thread use via ``asyncio.run_coroutine_threadsafe`` from
the application's main thread to the worker thread's loop. This is the
typical deployment pattern since ``start()`` blocks.
Minimal internal changes to ``start()`` / ``_connect()`` to save the task
handles on ``self``. ``start()``'s blocking semantics are preserved (still
blocks on ``run_until_complete(self._main_task)``), only the path that lets
it exit cleanly is new.
Backward compatibility:
- Module-level ``_select()`` is kept (some downstream callers may have
monkeypatched it in tests).
- All existing behavior unchanged for callers that don't invoke ``stop()``.
- All existing ws tests still pass.
Tests added (lark_oapi/ws/tests/test_stop.py):
- stop disables auto_reconnect
- stop closes the websocket
- stop cancels ping / receive / main background tasks
- stop is idempotent
- stop handles already-completed tasks
- stop works cross-thread via run_coroutine_threadsafe (real-world pattern)
|
|
Somainer
pushed a commit
to Somainer/lyre
that referenced
this pull request
May 22, 2026
The previous commit fixed the symptom; this fleshes out the comment to make clear it's not us being clever — lark_oapi has a known bug where its WS event loop is a module-level global captured at first import, making it incompatible with apps that already run their own asyncio loop. The workaround in our worker thread (rebinding ``lark_oapi.ws.client.loop``) is the same one the upstream issue reporter uses. Comment now references: - larksuite/oapi-sdk-python#119 — the bug, with the same workaround - larksuite/oapi-sdk-python#96 — open request for an async start API - larksuite/oapi-sdk-python#128 — open PR adding graceful stop When any of those land we can drop this block.
Somainer
added a commit
to Somainer/lyre
that referenced
this pull request
May 22, 2026
* Mypy strict cleanup: 124 errors → 0, ruff stays clean No behavior change — purely tightening types across the codebase so ``mypy src`` passes under strict mode (and ``ruff check`` already does). Structural changes: - ``Repositories`` Protocol gains ``conn: aiosqlite.Connection`` as a documented escape hatch for cross-table SQL (snapshot aggregates, JSON1 path filters); ``SqliteRepositories`` now declares each repo attribute with the Protocol type, fixing the invariance mismatch callers saw against concrete ``SqliteFooRepository`` types. - ``MailboxRepository`` adds three public methods that previously required reaching into ``_row_to_msg`` from outside: ``find_by_ channel_external_id`` (Lark thread resolution), ``find_id_by_external_id`` (scheduler UNIQUE-collision recovery), ``list_pending_channel_publish`` (owner-mail catch-up). Lark / scheduler / enqueuer now go through them. - Routes share a small typed-accessor module (``routes/__init__.py``) for ``request.app.state.*`` — Starlette's ``State`` is untyped, so every read used to leak ``Any`` into otherwise-strict route bodies. - ``StopReason = Literal[...]`` exported from ``llm_adapter`` and used in ``_FINISH_REASON_MAP`` so adapters yield ``TurnComplete`` with a narrowed type instead of a plain ``str``. Mechanical: - Generic-arg fills (``dict``, ``list``, ``Queue``, ``Task``, ``tuple``) - Function annotations on a handful of internal helpers - Removed stale ``# type: ignore`` markers warn_unused_ignores caught - Replaced ``hasattr(x, 'isoformat')`` with ``isinstance(x, datetime)`` so mypy can narrow ``datetime | None`` - ``HTMLResponse`` → ``_TemplateResponse`` returns on routes (the actual Jinja2Templates return type) - ``types-croniter`` dev dep added; ``croniter`` calls now type-check - Wrapped a couple of "row from RETURNING / set elsewhere" assert-non- None narrows with explanatory comments https://claude.ai/code/session_018boPnoruT4LkKs7frK2V7w * Send-mail dropdown: show display_name, not persona type The /send persona dropdown was rendering ``analyst`` / ``dispatcher`` / ``reviewer`` — the abstract role names, when the owner addresses agents by their display_name (``analyst-1``, ``luna``, ``reviewer-1``). _personas_for_form now returns (value, label) pairs: value stays the persona name (backend still composes <persona>/<name>), label uses ``display_name`` from identity.md (falls back to name). Custom display_names propagate automatically — re-onboarding to set display_name="luna" makes the dropdown show "luna". https://claude.ai/code/session_018boPnoruT4LkKs7frK2V7w * Fix Lark integration: thread event-loop + use open_id, not user_id Two unrelated boot failures the first real Lark hookup turned up. 1. ``RuntimeError: This event loop is already running`` ``lark_oapi.ws.Client.start()`` calls ``loop.run_until_complete(self._connect())``. The worker thread we spawn never called ``asyncio.set_event_loop`` — under Python 3.12 that falls through to the *main* thread's loop, which is already running uvicorn / scheduler. Now the thread sets a fresh loop before ``ws.start()``. 2. ``Access denied. One of the following scopes is required: [contact:user.employee_id:readonly]`` Outbound sends used ``receive_id_type("user_id")``. Lark equates user_id with employee_id and gates it behind a contact scope that the bot doesn't actually need to message the owner. Switched to ``open_id`` for both outbound ``receive_id_type`` and inbound sender matching (read ``sender.sender_id.open_id`` instead of ``.user_id``). The ``authorized_user_id`` config field name stays for compatibility but now expects the ``ou_…`` form; docstring updated to call this out. The mailbox metadata key under ``channels.lark`` flipped from ``user_id`` → ``open_id`` to match. https://claude.ai/code/session_018boPnoruT4LkKs7frK2V7w * Fix Lark WS event-loop capture: construct Client inside the thread Last round's set_event_loop fix wasn't enough: lark-oapi's ws.Client captures ``asyncio.get_event_loop()`` into ``self`` at __init__ time. Building the Client on the main thread therefore froze the main asyncio loop into the SDK; every later ``run_until_complete`` inside ``start()`` re-hit "This event loop is already running" regardless of what the worker thread did. Now both the new loop AND the Client live inside the worker thread, so the SDK captures *this* thread's fresh loop. Also: bump the unauthorized-sender log from debug to info. During first-time setup the owner needs to find their app-scoped open_id (Lark open_ids are per-app — an id from one bot won't match another), and the simplest way is "send the bot a message, copy the open_id from the log". Hiding that at debug forced everyone to add print statements; info+hint surfaces it directly. https://claude.ai/code/session_018boPnoruT4LkKs7frK2V7w * Lark WS: monkey-patch module-level event loop in worker thread The previous fixes all assumed lark_oapi captured the loop on the Client instance. It doesn't — ``lark_oapi/ws/client.py`` binds a **module-level** ``loop`` global at first import via ``asyncio.get_event_loop()``. Because LarkChannel construction imports lark_oapi on the main thread (where uvicorn's loop is running), that global is forever pinned to the main loop. Every ``Client.start()`` / ``_connect()`` / ``_disconnect()`` then calls ``run_until_complete`` on it — and the main loop IS running. RuntimeError. Fix: in the worker thread, rebind ``lark_oapi.ws.client.loop`` to a fresh loop BEFORE Client construction. This also makes the per-instance ``ExpiringCache`` pick up the worker's loop (it reads ``get_event_loop()`` at __init__), which resolves the "Task was destroyed but it is pending" warning around ``ExpiringCache._start_ clear_cron`` — that sweeper task had been scheduled on the main loop too. https://claude.ai/code/session_018boPnoruT4LkKs7frK2V7w * Document Lark loop workaround: cite upstream SDK bug + tracking issues The previous commit fixed the symptom; this fleshes out the comment to make clear it's not us being clever — lark_oapi has a known bug where its WS event loop is a module-level global captured at first import, making it incompatible with apps that already run their own asyncio loop. The workaround in our worker thread (rebinding ``lark_oapi.ws.client.loop``) is the same one the upstream issue reporter uses. Comment now references: - larksuite/oapi-sdk-python#119 — the bug, with the same workaround - larksuite/oapi-sdk-python#96 — open request for an async start API - larksuite/oapi-sdk-python#128 — open PR adding graceful stop When any of those land we can drop this block. --------- Co-authored-by: Claude <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
The WebSocket
Clientexposes no public way to gracefully shut down. The only lifecycle method isstart(), which blocks forever onloop.run_until_complete(_select())(where_selectiswhile True: sleep(3600)).Downstream apps that need to stop a running client (e.g. on app shutdown, on config change, on plugin lifecycle hook) currently have to reach for private attributes —
_disconnect(),_conn,_auto_reconnect— and on top of that,start()never returns even after the underlying WS is closed, so the worker thread leaks.Even worse,
auto_reconnect=Truemeans the SDK reconnects on its own after any external close attempt, so callers can't "stop" by just closing the connection.Symptoms we hit downstream:
Task was destroyed but it is pending!for the leaked_ping_looptaskTask exception was never retrievedfor the receive loopSolution
Add a public
async def stop()method that:_auto_reconnect = False— prevents the receive loop's exception handler from re-establishing the connection after we close it_ping_task,_receive_message_task, the new_main_task) — prevents pending-task warnings on loop close_disconnect()— closes the underlying WebSocket_main_task— releases the blockingstart()so its worker thread can exitDesigned for the typical cross-thread pattern via
asyncio.run_coroutine_threadsafe:Changes
Client.__init__: 3 new task handle fields (_ping_task,_receive_message_task,_main_task)Client.start(): stores the ping/main task handles sostop()can cancel them; still blocks on the main task; wraps the block intry/except CancelledErrorfor graceful exitClient._connect(): stores the receive_message_task handleClient.stop(): new public method — see docstring for full semantics_select()is kept unchanged for backward compat (some downstream callers monkeypatch it in tests)Tests
lark_oapi/ws/tests/test_stop.py(new file, 6 tests):test_stop_disables_auto_reconnecttest_stop_closes_websockettest_stop_cancels_background_taskstest_stop_is_idempotenttest_stop_handles_already_done_taskstest_stop_from_another_thread— real-worldrun_coroutine_threadsafepatternAll existing
test_websockets_compat.pytests still pass.Backward compatibility
✅ All existing behavior preserved when
stop()is not called.✅
start()still blocks onrun_until_completeof the main task (same observable behavior as before).✅ Module-level
_select()retained.✅ No changes to public method signatures, no breaking changes to private attributes used in existing patches.