Skip to content

feat(ws): add public Client.stop() for graceful shutdown#128

Open
ChangjunZhao wants to merge 1 commit into
larksuite:v2_mainfrom
ChangjunZhao:feat/public-stop-method
Open

feat(ws): add public Client.stop() for graceful shutdown#128
ChangjunZhao wants to merge 1 commit into
larksuite:v2_mainfrom
ChangjunZhao:feat/public-stop-method

Conversation

@ChangjunZhao
Copy link
Copy Markdown

Problem

The WebSocket Client exposes no public way to gracefully shut down. The only lifecycle method is start(), which blocks forever on loop.run_until_complete(_select()) (where _select is while True: sleep(3600)).

Downstream apps that need to stop a running client (e.g. on app shutdown, on config change, on plugin lifecycle hook) currently have to reach for private attributes — _disconnect(), _conn, _auto_reconnect — and on top of that, start() never returns even after the underlying WS is closed, so the worker thread leaks.

Even worse, auto_reconnect=True means the SDK reconnects on its own after any external close attempt, so callers can't "stop" by just closing the connection.

Symptoms we hit downstream:

  • Task was destroyed but it is pending! for the leaked _ping_loop task
  • Task exception was never retrieved for the receive loop
  • WS thread accumulating across reconnect cycles
  • Brand new WebSocket connections appearing after we tried to stop

Solution

Add a public async def stop() method that:

  1. Sets _auto_reconnect = False — prevents the receive loop's exception handler from re-establishing the connection after we close it
  2. Cancels three internal background tasks (_ping_task, _receive_message_task, the new _main_task) — prevents pending-task warnings on loop close
  3. Calls _disconnect() — closes the underlying WebSocket
  4. Cancels _main_task — releases the blocking start() so its worker thread can exit
  5. Is idempotent — safe to call multiple times (e.g. from signal handlers, finally blocks)

Designed for the typical cross-thread pattern via asyncio.run_coroutine_threadsafe:

# In worker thread:
client = Client(app_id, app_secret)
client.start()   # blocks until stop() is called from another thread

# In main thread (or signal handler):
future = asyncio.run_coroutine_threadsafe(client.stop(), client_loop)
future.result(timeout=5)
# Now the worker thread's start() returns, thread can be joined.

Changes

  • Client.__init__: 3 new task handle fields (_ping_task, _receive_message_task, _main_task)
  • Client.start(): stores the ping/main task handles so stop() can cancel them; still blocks on the main task; wraps the block in try/except CancelledError for graceful exit
  • Client._connect(): stores the receive_message_task handle
  • Client.stop(): new public method — see docstring for full semantics
  • Module-level _select() is kept unchanged for backward compat (some downstream callers monkeypatch it in tests)

Tests

lark_oapi/ws/tests/test_stop.py (new file, 6 tests):

  • test_stop_disables_auto_reconnect
  • test_stop_closes_websocket
  • test_stop_cancels_background_tasks
  • test_stop_is_idempotent
  • test_stop_handles_already_done_tasks
  • test_stop_from_another_thread — real-world run_coroutine_threadsafe pattern

All existing test_websockets_compat.py tests still pass.

$ python -m pytest lark_oapi/ws/tests/ -v
========================= 10 passed in 5.22s =========================

Backward compatibility

✅ All existing behavior preserved when stop() is not called.
start() still blocks on run_until_complete of the main task (same observable behavior as before).
✅ Module-level _select() retained.
✅ No changes to public method signatures, no breaking changes to private attributes used in existing patches.

Currently the WebSocket Client exposes no way to gracefully shut down — the
only public lifecycle method is ``start()`` which blocks forever on
``loop.run_until_complete(_select())`` (``_select`` is ``while True: sleep
3600``). Callers have to:

  1. Run ``start()`` on a worker thread because it blocks
  2. Have no clean way to stop it — terminating the thread mid-flight leaves
     pending ``ping_loop`` / ``receive_message_loop`` tasks, leaks the
     WebSocket connection, and triggers asyncio warnings like:
       * Task was destroyed but it is pending!
       * Task exception was never retrieved
  3. Worse, ``auto_reconnect=True`` means even if you do close the underlying
     conn externally, the SDK reconnects on its own.

Downstream apps end up reaching for private attributes — ``_disconnect`` /
``_conn`` / ``_auto_reconnect`` — to work around this. That's fragile across
SDK upgrades and an unfortunate API gap.

This PR adds a public ``async def stop()`` that:

  - Sets ``_auto_reconnect = False`` so the receive loop's exception handler
    doesn't re-establish the connection after we close it
  - Cancels three background tasks (ping_loop, receive_message_loop, the new
    ``_main_task`` that replaces the bare ``_select()`` call) so loop close
    doesn't warn about pending tasks
  - Calls ``_disconnect()`` to close the underlying WebSocket
  - Releases the blocking ``start()`` by cancelling ``_main_task``
  - Is idempotent — safe to call multiple times

Designed for cross-thread use via ``asyncio.run_coroutine_threadsafe`` from
the application's main thread to the worker thread's loop. This is the
typical deployment pattern since ``start()`` blocks.

Minimal internal changes to ``start()`` / ``_connect()`` to save the task
handles on ``self``. ``start()``'s blocking semantics are preserved (still
blocks on ``run_until_complete(self._main_task)``), only the path that lets
it exit cleanly is new.

Backward compatibility:
  - Module-level ``_select()`` is kept (some downstream callers may have
    monkeypatched it in tests).
  - All existing behavior unchanged for callers that don't invoke ``stop()``.
  - All existing ws tests still pass.

Tests added (lark_oapi/ws/tests/test_stop.py):
  - stop disables auto_reconnect
  - stop closes the websocket
  - stop cancels ping / receive / main background tasks
  - stop is idempotent
  - stop handles already-completed tasks
  - stop works cross-thread via run_coroutine_threadsafe (real-world pattern)
@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Somainer pushed a commit to Somainer/lyre that referenced this pull request May 22, 2026
The previous commit fixed the symptom; this fleshes out the comment
to make clear it's not us being clever — lark_oapi has a known bug
where its WS event loop is a module-level global captured at first
import, making it incompatible with apps that already run their own
asyncio loop. The workaround in our worker thread (rebinding
``lark_oapi.ws.client.loop``) is the same one the upstream issue
reporter uses.

Comment now references:
- larksuite/oapi-sdk-python#119 — the bug, with the same workaround
- larksuite/oapi-sdk-python#96 — open request for an async start API
- larksuite/oapi-sdk-python#128 — open PR adding graceful stop

When any of those land we can drop this block.
Somainer added a commit to Somainer/lyre that referenced this pull request May 22, 2026
* Mypy strict cleanup: 124 errors → 0, ruff stays clean

No behavior change — purely tightening types across the codebase so
``mypy src`` passes under strict mode (and ``ruff check`` already does).

Structural changes:
- ``Repositories`` Protocol gains ``conn: aiosqlite.Connection`` as a
  documented escape hatch for cross-table SQL (snapshot aggregates,
  JSON1 path filters); ``SqliteRepositories`` now declares each repo
  attribute with the Protocol type, fixing the invariance mismatch
  callers saw against concrete ``SqliteFooRepository`` types.
- ``MailboxRepository`` adds three public methods that previously
  required reaching into ``_row_to_msg`` from outside: ``find_by_
  channel_external_id`` (Lark thread resolution), ``find_id_by_external_id``
  (scheduler UNIQUE-collision recovery), ``list_pending_channel_publish``
  (owner-mail catch-up). Lark / scheduler / enqueuer now go through them.
- Routes share a small typed-accessor module (``routes/__init__.py``)
  for ``request.app.state.*`` — Starlette's ``State`` is untyped, so
  every read used to leak ``Any`` into otherwise-strict route bodies.
- ``StopReason = Literal[...]`` exported from ``llm_adapter`` and used
  in ``_FINISH_REASON_MAP`` so adapters yield ``TurnComplete`` with a
  narrowed type instead of a plain ``str``.

Mechanical:
- Generic-arg fills (``dict``, ``list``, ``Queue``, ``Task``, ``tuple``)
- Function annotations on a handful of internal helpers
- Removed stale ``# type: ignore`` markers warn_unused_ignores caught
- Replaced ``hasattr(x, 'isoformat')`` with ``isinstance(x, datetime)``
  so mypy can narrow ``datetime | None``
- ``HTMLResponse`` → ``_TemplateResponse`` returns on routes (the
  actual Jinja2Templates return type)
- ``types-croniter`` dev dep added; ``croniter`` calls now type-check
- Wrapped a couple of "row from RETURNING / set elsewhere" assert-non-
  None narrows with explanatory comments

https://claude.ai/code/session_018boPnoruT4LkKs7frK2V7w

* Send-mail dropdown: show display_name, not persona type

The /send persona dropdown was rendering ``analyst`` / ``dispatcher`` /
``reviewer`` — the abstract role names, when the owner addresses agents
by their display_name (``analyst-1``, ``luna``, ``reviewer-1``).

_personas_for_form now returns (value, label) pairs: value stays the
persona name (backend still composes <persona>/<name>), label uses
``display_name`` from identity.md (falls back to name). Custom
display_names propagate automatically — re-onboarding to set
display_name="luna" makes the dropdown show "luna".

https://claude.ai/code/session_018boPnoruT4LkKs7frK2V7w

* Fix Lark integration: thread event-loop + use open_id, not user_id

Two unrelated boot failures the first real Lark hookup turned up.

1. ``RuntimeError: This event loop is already running``

   ``lark_oapi.ws.Client.start()`` calls
   ``loop.run_until_complete(self._connect())``. The worker thread we
   spawn never called ``asyncio.set_event_loop`` — under Python 3.12
   that falls through to the *main* thread's loop, which is already
   running uvicorn / scheduler. Now the thread sets a fresh loop
   before ``ws.start()``.

2. ``Access denied. One of the following scopes is required:
       [contact:user.employee_id:readonly]``

   Outbound sends used ``receive_id_type("user_id")``. Lark equates
   user_id with employee_id and gates it behind a contact scope that
   the bot doesn't actually need to message the owner.

   Switched to ``open_id`` for both outbound ``receive_id_type`` and
   inbound sender matching (read ``sender.sender_id.open_id`` instead
   of ``.user_id``). The ``authorized_user_id`` config field name
   stays for compatibility but now expects the ``ou_…`` form;
   docstring updated to call this out. The mailbox metadata key under
   ``channels.lark`` flipped from ``user_id`` → ``open_id`` to match.

https://claude.ai/code/session_018boPnoruT4LkKs7frK2V7w

* Fix Lark WS event-loop capture: construct Client inside the thread

Last round's set_event_loop fix wasn't enough: lark-oapi's ws.Client
captures ``asyncio.get_event_loop()`` into ``self`` at __init__ time.
Building the Client on the main thread therefore froze the main
asyncio loop into the SDK; every later ``run_until_complete`` inside
``start()`` re-hit "This event loop is already running" regardless of
what the worker thread did.

Now both the new loop AND the Client live inside the worker thread,
so the SDK captures *this* thread's fresh loop.

Also: bump the unauthorized-sender log from debug to info. During
first-time setup the owner needs to find their app-scoped open_id
(Lark open_ids are per-app — an id from one bot won't match another),
and the simplest way is "send the bot a message, copy the open_id
from the log". Hiding that at debug forced everyone to add print
statements; info+hint surfaces it directly.

https://claude.ai/code/session_018boPnoruT4LkKs7frK2V7w

* Lark WS: monkey-patch module-level event loop in worker thread

The previous fixes all assumed lark_oapi captured the loop on the
Client instance. It doesn't — ``lark_oapi/ws/client.py`` binds a
**module-level** ``loop`` global at first import via
``asyncio.get_event_loop()``. Because LarkChannel construction
imports lark_oapi on the main thread (where uvicorn's loop is
running), that global is forever pinned to the main loop. Every
``Client.start()`` / ``_connect()`` / ``_disconnect()`` then calls
``run_until_complete`` on it — and the main loop IS running.
RuntimeError.

Fix: in the worker thread, rebind ``lark_oapi.ws.client.loop`` to a
fresh loop BEFORE Client construction. This also makes the
per-instance ``ExpiringCache`` pick up the worker's loop (it reads
``get_event_loop()`` at __init__), which resolves the "Task was
destroyed but it is pending" warning around ``ExpiringCache._start_
clear_cron`` — that sweeper task had been scheduled on the main
loop too.

https://claude.ai/code/session_018boPnoruT4LkKs7frK2V7w

* Document Lark loop workaround: cite upstream SDK bug + tracking issues

The previous commit fixed the symptom; this fleshes out the comment
to make clear it's not us being clever — lark_oapi has a known bug
where its WS event loop is a module-level global captured at first
import, making it incompatible with apps that already run their own
asyncio loop. The workaround in our worker thread (rebinding
``lark_oapi.ws.client.loop``) is the same one the upstream issue
reporter uses.

Comment now references:
- larksuite/oapi-sdk-python#119 — the bug, with the same workaround
- larksuite/oapi-sdk-python#96 — open request for an async start API
- larksuite/oapi-sdk-python#128 — open PR adding graceful stop

When any of those land we can drop this block.

---------

Co-authored-by: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants