diff --git a/application/runtime_composer.py b/application/runtime_composer.py index 43e4086..e36fb5e 100644 --- a/application/runtime_composer.py +++ b/application/runtime_composer.py @@ -11,7 +11,7 @@ from application.runtime_reporting_adapters import build_runtime_reporting_adapters from quant_platform_kit.common.runtime_assembly import build_runtime_assembly from quant_platform_kit.common.runtime_target import build_runtime_context_fields -from quant_platform_kit.common.port_adapters import CallablePortfolioPort +from quant_platform_kit.common.port_adapters import CallableNotificationPort, CallablePortfolioPort from quant_platform_kit.common.runtime_target import RuntimeTarget @@ -124,8 +124,13 @@ def build_reporting_adapters(self): printer=lambda line: self.printer(line, flush=True), ) - def build_rebalance_runtime(self): + def build_rebalance_runtime(self, *, silent_cycle_notifications: bool = False): notification_adapters = self.build_notification_adapters() + notifications = ( + CallableNotificationPort(lambda _message: None) + if silent_cycle_notifications + else notification_adapters.notification_port + ) return IBKRRebalanceRuntime( connect_ib=self.connect_ib_fn, portfolio_port_factory=lambda ib: CallablePortfolioPort( @@ -133,7 +138,7 @@ def build_rebalance_runtime(self): ), compute_signals=self.compute_signals_fn, execute_rebalance=self.execute_rebalance_fn, - notifications=notification_adapters.notification_port, + notifications=notifications, ) def build_rebalance_config(self, *, extra_notification_lines=()): diff --git a/main.py b/main.py index 29638a1..da5ab25 100644 --- a/main.py +++ b/main.py @@ -536,7 +536,9 @@ def run_strategy_core(*, strategy_plugin_signals=(), dry_run_only_override: bool return run_paper_liquidation_cycle() composer = build_composer(dry_run_only_override=dry_run_only_override) return run_rebalance_cycle( - runtime=composer.build_rebalance_runtime(), + runtime=composer.build_rebalance_runtime( + silent_cycle_notifications=bool(dry_run_only_override), + ), config=composer.build_rebalance_config(extra_notification_lines=build_extra_notification_lines(strategy_plugin_signals)), ) @@ -701,6 +703,86 @@ def _handle_request(*, dry_run_only_override: bool | None = None, response_body: print(f"failed to persist execution report: {persist_exc}", flush=True) +def _handle_probe(*, response_body: str = "Probe OK"): + ib = None + log_context = None + report = None + try: + log_context = build_request_log_context() + report = build_execution_report(log_context, dry_run_only_override=True) + strategy_plugin_signals, strategy_plugin_error = load_strategy_plugin_signals() + attach_strategy_plugin_report( + report, + signals=strategy_plugin_signals, + error=strategy_plugin_error, + ) + log_runtime_event( + log_context, + "health_probe_received", + message="Received health probe request", + http_method=request.method, + execution_window="probe", + ) + ib = connect_ib() + snapshot = build_portfolio_snapshot(ib) + positions = tuple(getattr(snapshot, "positions", ()) or ()) + buying_power = float(getattr(snapshot, "buying_power", 0.0) or 0.0) + total_equity = float(getattr(snapshot, "total_equity", 0.0) or 0.0) + finalize_runtime_report( + report, + status="ok", + summary={ + "buying_power": buying_power, + "total_equity": total_equity, + "positions_count": len(positions), + }, + ) + log_runtime_event( + log_context, + "health_probe_completed", + message="Health probe completed", + execution_window="probe", + buying_power=buying_power, + total_equity=total_equity, + positions_count=len(positions), + ) + return response_body, 200 + except Exception as exc: + if report is not None: + append_runtime_report_error( + report, + stage="health_probe", + message=str(exc), + error_type=type(exc).__name__, + ) + finalize_runtime_report(report, status="error") + if log_context is not None: + log_runtime_event( + log_context, + "health_probe_failed", + message="Health probe failed", + severity="ERROR", + execution_window="probe", + error_type=type(exc).__name__, + error_message=str(exc), + ) + error_msg = f"{t('health_probe_title')}\n{t('health_probe_error_prefix')}{traceback.format_exc()}" + publish_notification(detailed_text=error_msg, compact_text=error_msg) + return "Error", 500 + finally: + if ib is not None and hasattr(ib, "disconnect"): + try: + ib.disconnect() + except Exception as disconnect_exc: + print(f"failed to disconnect IBKR probe client: {disconnect_exc}", flush=True) + try: + if report is not None: + report_path = persist_execution_report(report, dry_run_only_override=True) + print(f"execution_report {report_path}", flush=True) + except Exception as persist_exc: + print(f"failed to persist execution report: {persist_exc}", flush=True) + + @app.route("/", methods=["POST", "GET"]) def handle_request(): return _handle_request() @@ -711,6 +793,11 @@ def handle_precheck(): return _handle_request(dry_run_only_override=True, response_body="Precheck OK") +@app.route("/probe", methods=["POST", "GET"]) +def handle_probe(): + return _handle_probe() + + @app.route("/health", methods=["GET"]) def health(): return "OK", 200 diff --git a/notifications/telegram.py b/notifications/telegram.py index 022fe38..ad6c2d8 100644 --- a/notifications/telegram.py +++ b/notifications/telegram.py @@ -8,6 +8,8 @@ "rebalance_title": "🔔 【调仓指令】", "heartbeat_title": "💓 【心跳检测】", "error_title": "🚨 【策略异常】", + "health_probe_title": "🔎 【连接探针】", + "health_probe_error_prefix": "健康探针异常:\n", "canary_title": "🐤 【金丝雀检查】", "strategy_label": "🧭 策略: {name}", "account_ids_detail": "🆔 账户: {account_ids}", @@ -114,6 +116,8 @@ "rebalance_title": "🔔 【Trade Execution Report】", "heartbeat_title": "💓 【Heartbeat】", "error_title": "🚨 【Strategy Error】", + "health_probe_title": "🔎 【Health Probe】", + "health_probe_error_prefix": "Health probe error:\n", "canary_title": "🐤 【Canary Check】", "strategy_label": "🧭 Strategy: {name}", "account_ids_detail": "🆔 Account: {account_ids}", diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index 11a935c..0c86022 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -103,6 +103,97 @@ def test_handle_precheck_get_does_not_execute(strategy_module, monkeypatch): assert observed["called"] is False +def test_handle_probe_checks_account_snapshot_without_success_notification(strategy_module, monkeypatch): + observed = {"events": [], "disconnects": 0, "notifications": []} + + class FakeIB: + def disconnect(self): + observed["disconnects"] += 1 + + snapshot = types.SimpleNamespace( + buying_power=123.0, + total_equity=456.0, + positions=(types.SimpleNamespace(symbol="SOXL"),), + ) + + monkeypatch.setattr(strategy_module, "build_request_log_context", lambda: types.SimpleNamespace(run_id="run-001")) + monkeypatch.setattr(strategy_module, "build_execution_report", lambda log_context, **_kwargs: {"status": "pending"}) + monkeypatch.setattr( + strategy_module, + "persist_execution_report", + lambda report, **_kwargs: observed.setdefault("report", dict(report)) or "/tmp/runtime-report.json", + ) + monkeypatch.setattr( + strategy_module, + "log_runtime_event", + lambda context, event, **fields: observed["events"].append((event, fields)), + ) + monkeypatch.setattr(strategy_module, "load_strategy_plugin_signals", lambda: ((), None)) + monkeypatch.setattr(strategy_module, "attach_strategy_plugin_report", lambda *args, **kwargs: None) + monkeypatch.setattr(strategy_module, "connect_ib", lambda: FakeIB()) + monkeypatch.setattr(strategy_module, "build_portfolio_snapshot", lambda ib: snapshot) + monkeypatch.setattr( + strategy_module, + "publish_notification", + lambda **_kwargs: observed["notifications"].append(_kwargs), + ) + + with strategy_module.app.test_request_context("/probe", method="POST"): + body, status = strategy_module.handle_probe() + + assert status == 200 + assert body == "Probe OK" + assert [event for event, _fields in observed["events"]] == [ + "health_probe_received", + "health_probe_completed", + ] + assert observed["report"]["status"] == "ok" + assert observed["report"]["summary"]["buying_power"] == 123.0 + assert observed["report"]["summary"]["total_equity"] == 456.0 + assert observed["report"]["summary"]["positions_count"] == 1 + assert observed["disconnects"] == 1 + assert observed["notifications"] == [] + + +def test_handle_probe_failure_sends_notification(strategy_module, monkeypatch): + observed = {"events": [], "notifications": []} + + monkeypatch.setattr(strategy_module, "build_request_log_context", lambda: types.SimpleNamespace(run_id="run-001")) + monkeypatch.setattr(strategy_module, "build_execution_report", lambda log_context, **_kwargs: {"status": "pending"}) + monkeypatch.setattr(strategy_module, "persist_execution_report", lambda report, **_kwargs: observed.setdefault("report", dict(report)) or "/tmp/runtime-report.json") + monkeypatch.setattr( + strategy_module, + "log_runtime_event", + lambda context, event, **fields: observed["events"].append((event, fields)), + ) + monkeypatch.setattr(strategy_module, "load_strategy_plugin_signals", lambda: ((), None)) + monkeypatch.setattr(strategy_module, "attach_strategy_plugin_report", lambda *args, **kwargs: None) + monkeypatch.setattr( + strategy_module, + "connect_ib", + lambda: (_ for _ in ()).throw(RuntimeError("probe failed")), + ) + monkeypatch.setattr( + strategy_module, + "publish_notification", + lambda **kwargs: observed["notifications"].append(kwargs), + ) + + with strategy_module.app.test_request_context("/probe", method="POST"): + body, status = strategy_module.handle_probe() + + assert status == 500 + assert body == "Error" + assert observed["report"]["status"] == "error" + assert observed["report"]["errors"][0]["stage"] == "health_probe" + assert [event for event, _fields in observed["events"]] == [ + "health_probe_received", + "health_probe_failed", + ] + assert len(observed["notifications"]) == 1 + assert "probe failed" in observed["notifications"][0]["detailed_text"] + + def test_build_extra_notification_lines_includes_account_id(strategy_module): lines = strategy_module.build_extra_notification_lines(()) assert any("U18308207" in line for line in lines) diff --git a/tests/test_runtime_composer.py b/tests/test_runtime_composer.py index 43b6b6d..b0e1567 100644 --- a/tests/test_runtime_composer.py +++ b/tests/test_runtime_composer.py @@ -82,6 +82,7 @@ def fake_reporting_builder(**kwargs): notification_adapters = composer.build_notification_adapters() reporting_adapters = composer.build_reporting_adapters() runtime = composer.build_rebalance_runtime() + silent_runtime = composer.build_rebalance_runtime(silent_cycle_notifications=True) config = composer.build_rebalance_config(extra_notification_lines=("plugin-line",)) assert notification_adapters.notification_port == "notification-port" @@ -97,6 +98,8 @@ def fake_reporting_builder(**kwargs): assert runtime.compute_signals == "compute-signals" assert runtime.execute_rebalance == "execute-rebalance" assert runtime.notifications == "notification-port" + silent_runtime.notifications.send_text("precheck heartbeat") + assert "sent_message" not in observed assert config.separator == "━━━━━━━━━━━━━━━━━━" assert config.strategy_display_name == "全球 ETF 轮动" assert config.reconciliation_output_path == "/tmp/reconciliation.json"