Skip to content

Feature: add support nng as broker#612

Open
alexted wants to merge 4 commits intotaskiq-python:feature/nngfrom
alexted:feature/add-support-NNG-as-broker
Open

Feature: add support nng as broker#612
alexted wants to merge 4 commits intotaskiq-python:feature/nngfrom
alexted:feature/add-support-NNG-as-broker

Conversation

@alexted
Copy link
Copy Markdown

@alexted alexted commented Apr 26, 2026

Refactoring the solution

@alexted alexted force-pushed the feature/add-support-NNG-as-broker branch from 78a2868 to b8c7212 Compare April 26, 2026 15:37
… affinity policy, and scheduler abstraction.
@alexted alexted mentioned this pull request Apr 26, 2026
Copy link
Copy Markdown
Member

@s3rius s3rius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a whole broker-server implementation, which I'm not quite sure if we want to have it in the main repo.

I thought that it would be like ZeroMQ, but it's much more impressive. Maybe we might consider moving it to another package to keep the main lib thin.

Comment thread taskiq/brokers/nng/hub.py

# ── standalone CLI entry point ────────────────────────────────────────────────

def _build_config() -> HubConfig:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of defining a stand-alone file to run, let's make it a sub-command of the taskiq itself.

You can do it by following our CLI extending guide.

https://taskiq-python.github.io/extending-taskiq/cli.html

Comment thread taskiq/brokers/nng/hub.py
the least-loaded worker instead of relying on NNG round-robin.

**State** — :class:`~taskiq.brokers.nng.storage.InMemoryStore`. All
store operations are synchronous and execute directly on the asyncio event
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
store operations are synchronous and execute directly on the asyncio event
store operations are asynchronous and execute directly on the asyncio event

"labels": message.labels,
"lease_id": "", # hub assigns the real lease_id at dispatch time
"attempts": int(message.labels.get("attempts", 0)),
"max_retries": int(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have retry-middlewares that do exactly that, I guess we can remove retry functionality from the broker and the hub.

lease_id = envelope.lease_id # hub-assigned; correct by construction

async def _ack(
_task_id: str = task_id,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating those parameters as defaults, just use them inside the function. In that case your closure will borrow variables from the outer environment. It will make it more rock solid to use.

payload_b64: str
labels: dict[str, Any] = field(default_factory=dict)
lease_id: str = ""
attempts: int = 0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry-related again.

status: str = "starting"
version: str = "unknown"

def as_dict(self) -> dict[str, Any]:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to define a function for that. There's a built-in asdict function for dataclasses.

Comment on lines +135 to +136
separators=(",", ":"),
ensure_ascii=False,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separators will save you like few bytes. But it might make messages more readable.

Suggested change
separators=(",", ":"),
ensure_ascii=False,
ensure_ascii=False,

payload: dict[str, Any] = {
"task_id": message.task_id,
"task_name": message.task_name,
"payload_b64": base64.b64encode(message.message).decode("ascii"),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"payload_b64": base64.b64encode(message.message).decode("ascii"),
"payload_b64": base64.b64encode(message.message).decode("utf-8"),

UTF-8 is used in all other places. Let's keep it everywhere to not have any encoding\decoding issues.

Comment thread taskiq/brokers/nng/hub.py
# ── control plane ─────────────────────────────────────────────────────────

async def _control_handler(self, ctx: Any) -> None:
"""Run one Rep0 context: receive → dispatch → reply, in a loop."""
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Run one Rep0 context: receive dispatch reply, in a loop."""
"""Run one Rep0 context: receive -> dispatch -> reply, in a loop."""

@codecov
Copy link
Copy Markdown

codecov Bot commented May 6, 2026

Codecov Report

❌ Patch coverage is 70.53942% with 213 lines in your changes missing coverage. Please review.
✅ Project coverage is 77.08%. Comparing base (2fd7a3c) to head (fe3b0a5).

Files with missing lines Patch % Lines
taskiq/brokers/nng/broker.py 0.00% 129 Missing ⚠️
taskiq/brokers/nng/hub.py 71.61% 65 Missing ⚠️
taskiq/brokers/nng/storage.py 94.11% 16 Missing ⚠️
taskiq/brokers/nng/protocol.py 96.62% 3 Missing ⚠️
Additional details and impacted files
@@               Coverage Diff               @@
##           feature/nng     #612      +/-   ##
===============================================
- Coverage        78.30%   77.08%   -1.23%     
===============================================
  Files               70       74       +4     
  Lines             2485     3186     +701     
===============================================
+ Hits              1946     2456     +510     
- Misses             539      730     +191     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants