Feature: add support nng as broker#612
Feature: add support nng as broker#612alexted wants to merge 4 commits intotaskiq-python:feature/nngfrom
Conversation
78a2868 to
b8c7212
Compare
… affinity policy, and scheduler abstraction.
s3rius
left a comment
There was a problem hiding this comment.
That's a whole broker-server implementation, which I'm not quite sure if we want to have it in the main repo.
I thought that it would be like ZeroMQ, but it's much more impressive. Maybe we might consider moving it to another package to keep the main lib thin.
|
|
||
| # ── standalone CLI entry point ──────────────────────────────────────────────── | ||
|
|
||
| def _build_config() -> HubConfig: |
There was a problem hiding this comment.
Instead of defining a stand-alone file to run, let's make it a sub-command of the taskiq itself.
You can do it by following our CLI extending guide.
| the least-loaded worker instead of relying on NNG round-robin. | ||
|
|
||
| **State** — :class:`~taskiq.brokers.nng.storage.InMemoryStore`. All | ||
| store operations are synchronous and execute directly on the asyncio event |
There was a problem hiding this comment.
| store operations are synchronous and execute directly on the asyncio event | |
| store operations are asynchronous and execute directly on the asyncio event |
| "labels": message.labels, | ||
| "lease_id": "", # hub assigns the real lease_id at dispatch time | ||
| "attempts": int(message.labels.get("attempts", 0)), | ||
| "max_retries": int( |
There was a problem hiding this comment.
Since we have retry-middlewares that do exactly that, I guess we can remove retry functionality from the broker and the hub.
| lease_id = envelope.lease_id # hub-assigned; correct by construction | ||
|
|
||
| async def _ack( | ||
| _task_id: str = task_id, |
There was a problem hiding this comment.
Instead of creating those parameters as defaults, just use them inside the function. In that case your closure will borrow variables from the outer environment. It will make it more rock solid to use.
| payload_b64: str | ||
| labels: dict[str, Any] = field(default_factory=dict) | ||
| lease_id: str = "" | ||
| attempts: int = 0 |
| status: str = "starting" | ||
| version: str = "unknown" | ||
|
|
||
| def as_dict(self) -> dict[str, Any]: |
There was a problem hiding this comment.
You don't need to define a function for that. There's a built-in asdict function for dataclasses.
| separators=(",", ":"), | ||
| ensure_ascii=False, |
There was a problem hiding this comment.
Separators will save you like few bytes. But it might make messages more readable.
| separators=(",", ":"), | |
| ensure_ascii=False, | |
| ensure_ascii=False, |
| payload: dict[str, Any] = { | ||
| "task_id": message.task_id, | ||
| "task_name": message.task_name, | ||
| "payload_b64": base64.b64encode(message.message).decode("ascii"), |
There was a problem hiding this comment.
| "payload_b64": base64.b64encode(message.message).decode("ascii"), | |
| "payload_b64": base64.b64encode(message.message).decode("utf-8"), |
UTF-8 is used in all other places. Let's keep it everywhere to not have any encoding\decoding issues.
| # ── control plane ───────────────────────────────────────────────────────── | ||
|
|
||
| async def _control_handler(self, ctx: Any) -> None: | ||
| """Run one Rep0 context: receive → dispatch → reply, in a loop.""" |
There was a problem hiding this comment.
| """Run one Rep0 context: receive → dispatch → reply, in a loop.""" | |
| """Run one Rep0 context: receive -> dispatch -> reply, in a loop.""" |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## feature/nng #612 +/- ##
===============================================
- Coverage 78.30% 77.08% -1.23%
===============================================
Files 70 74 +4
Lines 2485 3186 +701
===============================================
+ Hits 1946 2456 +510
- Misses 539 730 +191 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Refactoring the solution