Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions application/runtime_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from application.runtime_reporting_adapters import build_runtime_reporting_adapters
from quant_platform_kit.common.runtime_assembly import build_runtime_assembly
from quant_platform_kit.common.runtime_target import build_runtime_context_fields
from quant_platform_kit.common.port_adapters import CallablePortfolioPort
from quant_platform_kit.common.port_adapters import CallableNotificationPort, CallablePortfolioPort
from quant_platform_kit.common.runtime_target import RuntimeTarget


Expand Down Expand Up @@ -124,16 +124,21 @@ def build_reporting_adapters(self):
printer=lambda line: self.printer(line, flush=True),
)

def build_rebalance_runtime(self):
def build_rebalance_runtime(self, *, silent_cycle_notifications: bool = False):
notification_adapters = self.build_notification_adapters()
notifications = (
CallableNotificationPort(lambda _message: None)
if silent_cycle_notifications
else notification_adapters.notification_port
)
return IBKRRebalanceRuntime(
connect_ib=self.connect_ib_fn,
portfolio_port_factory=lambda ib: CallablePortfolioPort(
lambda: self.build_portfolio_snapshot_fn(ib)
),
compute_signals=self.compute_signals_fn,
execute_rebalance=self.execute_rebalance_fn,
notifications=notification_adapters.notification_port,
notifications=notifications,
)

def build_rebalance_config(self, *, extra_notification_lines=()):
Expand Down
89 changes: 88 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,9 @@ def run_strategy_core(*, strategy_plugin_signals=(), dry_run_only_override: bool
return run_paper_liquidation_cycle()
composer = build_composer(dry_run_only_override=dry_run_only_override)
return run_rebalance_cycle(
runtime=composer.build_rebalance_runtime(),
runtime=composer.build_rebalance_runtime(
silent_cycle_notifications=bool(dry_run_only_override),
),
config=composer.build_rebalance_config(extra_notification_lines=build_extra_notification_lines(strategy_plugin_signals)),
)

Expand Down Expand Up @@ -701,6 +703,86 @@ def _handle_request(*, dry_run_only_override: bool | None = None, response_body:
print(f"failed to persist execution report: {persist_exc}", flush=True)


def _handle_probe(*, response_body: str = "Probe OK"):
ib = None
log_context = None
report = None
try:
log_context = build_request_log_context()
report = build_execution_report(log_context, dry_run_only_override=True)
strategy_plugin_signals, strategy_plugin_error = load_strategy_plugin_signals()
attach_strategy_plugin_report(
report,
signals=strategy_plugin_signals,
error=strategy_plugin_error,
)
log_runtime_event(
log_context,
"health_probe_received",
message="Received health probe request",
http_method=request.method,
execution_window="probe",
)
ib = connect_ib()
snapshot = build_portfolio_snapshot(ib)
positions = tuple(getattr(snapshot, "positions", ()) or ())
buying_power = float(getattr(snapshot, "buying_power", 0.0) or 0.0)
total_equity = float(getattr(snapshot, "total_equity", 0.0) or 0.0)
finalize_runtime_report(
report,
status="ok",
summary={
"buying_power": buying_power,
"total_equity": total_equity,
"positions_count": len(positions),
},
)
log_runtime_event(
log_context,
"health_probe_completed",
message="Health probe completed",
execution_window="probe",
buying_power=buying_power,
total_equity=total_equity,
positions_count=len(positions),
)
return response_body, 200
except Exception as exc:
if report is not None:
append_runtime_report_error(
report,
stage="health_probe",
message=str(exc),
error_type=type(exc).__name__,
)
finalize_runtime_report(report, status="error")
if log_context is not None:
log_runtime_event(
log_context,
"health_probe_failed",
message="Health probe failed",
severity="ERROR",
execution_window="probe",
error_type=type(exc).__name__,
error_message=str(exc),
)
error_msg = f"{t('health_probe_title')}\n{t('health_probe_error_prefix')}{traceback.format_exc()}"
publish_notification(detailed_text=error_msg, compact_text=error_msg)
return "Error", 500
finally:
if ib is not None and hasattr(ib, "disconnect"):
try:
ib.disconnect()
except Exception as disconnect_exc:
print(f"failed to disconnect IBKR probe client: {disconnect_exc}", flush=True)
try:
if report is not None:
report_path = persist_execution_report(report, dry_run_only_override=True)
print(f"execution_report {report_path}", flush=True)
except Exception as persist_exc:
print(f"failed to persist execution report: {persist_exc}", flush=True)


@app.route("/", methods=["POST", "GET"])
def handle_request():
return _handle_request()
Expand All @@ -711,6 +793,11 @@ def handle_precheck():
return _handle_request(dry_run_only_override=True, response_body="Precheck OK")


@app.route("/probe", methods=["POST", "GET"])
def handle_probe():
return _handle_probe()
Comment on lines +796 to +798
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Restrict /probe execution to POST requests

The new route explicitly allows GET, but handle_probe() immediately runs _handle_probe() without a method guard, so any GET hit to /probe will open an IBKR connection, persist a dry-run report, and potentially send failure notifications. This is a production-risk side effect for routine GET traffic (health check tooling, accidental browser hits, or crawlers), and it is inconsistent with /precheck, which returns a safe message on GET instead of executing strategy logic.

Useful? React with 👍 / 👎.



@app.route("/health", methods=["GET"])
def health():
return "OK", 200
Expand Down
4 changes: 4 additions & 0 deletions notifications/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"rebalance_title": "🔔 【调仓指令】",
"heartbeat_title": "💓 【心跳检测】",
"error_title": "🚨 【策略异常】",
"health_probe_title": "🔎 【连接探针】",
"health_probe_error_prefix": "健康探针异常:\n",
"canary_title": "🐤 【金丝雀检查】",
"strategy_label": "🧭 策略: {name}",
"account_ids_detail": "🆔 账户: {account_ids}",
Expand Down Expand Up @@ -114,6 +116,8 @@
"rebalance_title": "🔔 【Trade Execution Report】",
"heartbeat_title": "💓 【Heartbeat】",
"error_title": "🚨 【Strategy Error】",
"health_probe_title": "🔎 【Health Probe】",
"health_probe_error_prefix": "Health probe error:\n",
"canary_title": "🐤 【Canary Check】",
"strategy_label": "🧭 Strategy: {name}",
"account_ids_detail": "🆔 Account: {account_ids}",
Expand Down
91 changes: 91 additions & 0 deletions tests/test_request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,97 @@ def test_handle_precheck_get_does_not_execute(strategy_module, monkeypatch):
assert observed["called"] is False


def test_handle_probe_checks_account_snapshot_without_success_notification(strategy_module, monkeypatch):
observed = {"events": [], "disconnects": 0, "notifications": []}

class FakeIB:
def disconnect(self):
observed["disconnects"] += 1

snapshot = types.SimpleNamespace(
buying_power=123.0,
total_equity=456.0,
positions=(types.SimpleNamespace(symbol="SOXL"),),
)

monkeypatch.setattr(strategy_module, "build_request_log_context", lambda: types.SimpleNamespace(run_id="run-001"))
monkeypatch.setattr(strategy_module, "build_execution_report", lambda log_context, **_kwargs: {"status": "pending"})
monkeypatch.setattr(
strategy_module,
"persist_execution_report",
lambda report, **_kwargs: observed.setdefault("report", dict(report)) or "/tmp/runtime-report.json",
)
monkeypatch.setattr(
strategy_module,
"log_runtime_event",
lambda context, event, **fields: observed["events"].append((event, fields)),
)
monkeypatch.setattr(strategy_module, "load_strategy_plugin_signals", lambda: ((), None))
monkeypatch.setattr(strategy_module, "attach_strategy_plugin_report", lambda *args, **kwargs: None)
monkeypatch.setattr(strategy_module, "connect_ib", lambda: FakeIB())
monkeypatch.setattr(strategy_module, "build_portfolio_snapshot", lambda ib: snapshot)
monkeypatch.setattr(
strategy_module,
"publish_notification",
lambda **_kwargs: observed["notifications"].append(_kwargs),
)

with strategy_module.app.test_request_context("/probe", method="POST"):
body, status = strategy_module.handle_probe()

assert status == 200
assert body == "Probe OK"
assert [event for event, _fields in observed["events"]] == [
"health_probe_received",
"health_probe_completed",
]
assert observed["report"]["status"] == "ok"
assert observed["report"]["summary"]["buying_power"] == 123.0
assert observed["report"]["summary"]["total_equity"] == 456.0
assert observed["report"]["summary"]["positions_count"] == 1
assert observed["disconnects"] == 1
assert observed["notifications"] == []


def test_handle_probe_failure_sends_notification(strategy_module, monkeypatch):
observed = {"events": [], "notifications": []}

monkeypatch.setattr(strategy_module, "build_request_log_context", lambda: types.SimpleNamespace(run_id="run-001"))
monkeypatch.setattr(strategy_module, "build_execution_report", lambda log_context, **_kwargs: {"status": "pending"})
monkeypatch.setattr(strategy_module, "persist_execution_report", lambda report, **_kwargs: observed.setdefault("report", dict(report)) or "/tmp/runtime-report.json")
monkeypatch.setattr(
strategy_module,
"log_runtime_event",
lambda context, event, **fields: observed["events"].append((event, fields)),
)
monkeypatch.setattr(strategy_module, "load_strategy_plugin_signals", lambda: ((), None))
monkeypatch.setattr(strategy_module, "attach_strategy_plugin_report", lambda *args, **kwargs: None)
monkeypatch.setattr(
strategy_module,
"connect_ib",
lambda: (_ for _ in ()).throw(RuntimeError("probe failed")),
)
monkeypatch.setattr(
strategy_module,
"publish_notification",
lambda **kwargs: observed["notifications"].append(kwargs),
)

with strategy_module.app.test_request_context("/probe", method="POST"):
body, status = strategy_module.handle_probe()

assert status == 500
assert body == "Error"
assert observed["report"]["status"] == "error"
assert observed["report"]["errors"][0]["stage"] == "health_probe"
assert [event for event, _fields in observed["events"]] == [
"health_probe_received",
"health_probe_failed",
]
assert len(observed["notifications"]) == 1
assert "probe failed" in observed["notifications"][0]["detailed_text"]


def test_build_extra_notification_lines_includes_account_id(strategy_module):
lines = strategy_module.build_extra_notification_lines(())
assert any("U18308207" in line for line in lines)
Expand Down
3 changes: 3 additions & 0 deletions tests/test_runtime_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def fake_reporting_builder(**kwargs):
notification_adapters = composer.build_notification_adapters()
reporting_adapters = composer.build_reporting_adapters()
runtime = composer.build_rebalance_runtime()
silent_runtime = composer.build_rebalance_runtime(silent_cycle_notifications=True)
config = composer.build_rebalance_config(extra_notification_lines=("plugin-line",))

assert notification_adapters.notification_port == "notification-port"
Expand All @@ -97,6 +98,8 @@ def fake_reporting_builder(**kwargs):
assert runtime.compute_signals == "compute-signals"
assert runtime.execute_rebalance == "execute-rebalance"
assert runtime.notifications == "notification-port"
silent_runtime.notifications.send_text("precheck heartbeat")
assert "sent_message" not in observed
assert config.separator == "━━━━━━━━━━━━━━━━━━"
assert config.strategy_display_name == "全球 ETF 轮动"
assert config.reconciliation_output_path == "/tmp/reconciliation.json"
Expand Down