From 3bfa57c38d34f301c3a467261c8b6ec2bc5ec156 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Mon, 18 May 2026 22:03:56 +0800 Subject: [PATCH] Silence LongBridge checks and clarify buy note --- application/execution_service.py | 22 ++++-- application/runtime_composer.py | 10 ++- main.py | 99 ++++++++++++++++++++++++++- notifications/telegram.py | 6 ++ tests/test_rebalance_service.py | 37 ++++++++++- tests/test_request_handling.py | 111 ++++++++++++++++++++++++++++++- tests/test_runtime_composer.py | 3 + 7 files changed, 278 insertions(+), 10 deletions(-) diff --git a/application/execution_service.py b/application/execution_service.py index 8d99a95..febfe85 100644 --- a/application/execution_service.py +++ b/application/execution_service.py @@ -658,15 +658,27 @@ def record_dry_run(symbol, side, quantity, price, *, order_type): investable_cash = max(0, investable_cash - cost_estimate) action_done = True else: + if diff <= investable_cash: + note_kind = "buy_deferred_small_target_gap" + note_kwargs = { + "symbol": f"{symbol}.US", + "diff": f"{diff:.2f}", + "price": f"{price:.2f}", + } + else: + note_kind = "buy_deferred_small_cash" + note_kwargs = { + "symbol": f"{symbol}.US", + "diff": f"{diff:.2f}", + "investable": f"{investable_cash:.2f}", + "price": f"{price:.2f}", + } record_note_log( note_logs, translator=translator, with_prefix=with_prefix, - kind="buy_deferred_small_cash", - symbol=f"{symbol}.US", - diff=f"{diff:.2f}", - investable=f"{investable_cash:.2f}", - price=f"{price:.2f}", + kind=note_kind, + **note_kwargs, ) if ( diff --git a/application/runtime_composer.py b/application/runtime_composer.py index 706e624..384ea68 100644 --- a/application/runtime_composer.py +++ b/application/runtime_composer.py @@ -10,6 +10,7 @@ from application.runtime_dependencies import LongBridgeRebalanceConfig, LongBridgeRebalanceRuntime from application.runtime_notification_adapters import build_runtime_notification_adapters from application.runtime_reporting_adapters import build_runtime_reporting_adapters +from quant_platform_kit.common.port_adapters import CallableNotificationPort from quant_platform_kit.common.runtime_assembly import build_runtime_assembly from quant_platform_kit.common.runtime_target import build_runtime_context_fields from quant_platform_kit.common.runtime_target import RuntimeTarget @@ -144,8 +145,13 @@ def build_reporting_adapters(self): printer=lambda line: self.printer(line, flush=True), ) - def build_rebalance_runtime(self) -> LongBridgeRebalanceRuntime: + def build_rebalance_runtime(self, *, silent_cycle_notifications: bool = False) -> LongBridgeRebalanceRuntime: notification_adapters = self.build_notification_adapters() + notifications = ( + CallableNotificationPort(lambda _message: None) + if silent_cycle_notifications + else notification_adapters.notification_port + ) return LongBridgeRebalanceRuntime( bootstrap=self.bootstrap_builder( project_id=self.project_id, @@ -160,7 +166,7 @@ def build_rebalance_runtime(self) -> LongBridgeRebalanceRuntime: resolve_rebalance_plan=self.strategy_adapters.resolve_rebalance_plan, market_data_port_factory=self.broker_adapters.build_market_data_port, estimate_max_purchase_quantity=self.estimate_max_purchase_quantity_fn, - notifications=notification_adapters.notification_port, + notifications=notifications, notify_issue=notification_adapters.notify_issue, portfolio_port_factory=self.broker_adapters.build_portfolio_port, execution_port_factory=self.broker_adapters.build_execution_port, diff --git a/main.py b/main.py index 10cbeaf..d75fa0d 100644 --- a/main.py +++ b/main.py @@ -252,7 +252,9 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali flush=True, ) run_rebalance_cycle( - runtime=composer.build_rebalance_runtime(), + runtime=composer.build_rebalance_runtime( + silent_cycle_notifications=validation_only, + ), config=composer.build_rebalance_config(strategy_plugin_signals=strategy_plugin_signals), ) finalize_runtime_report(report, status="ok") @@ -290,6 +292,95 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali except Exception as persist_exc: print(f"failed to persist execution report: {persist_exc}", flush=True) + +def run_probe(*, response_body: str = "Probe OK"): + composer = None + reporting_adapters = None + log_context = None + report = None + try: + composer = build_composer(dry_run_only_override=True) + reporting_adapters = composer.build_reporting_adapters() + log_context, report = reporting_adapters.start_run() + strategy_plugin_signals, strategy_plugin_error = composer.load_strategy_plugin_signals( + getattr(RUNTIME_SETTINGS, "strategy_plugin_mounts_json", None) + ) + composer.attach_strategy_plugin_report( + report, + signals=strategy_plugin_signals, + error=strategy_plugin_error, + ) + reporting_adapters.log_event( + log_context, + "health_probe_received", + message="Received health probe request", + execution_window="probe", + ) + runtime = composer.build_rebalance_runtime(silent_cycle_notifications=True) + quote_context, trade_context, _indicators = runtime.bootstrap() + snapshot = runtime.portfolio_port_factory( + quote_context, + trade_context, + ).get_portfolio_snapshot() + positions = tuple(getattr(snapshot, "positions", ()) or ()) + buying_power = float(getattr(snapshot, "buying_power", 0.0) or 0.0) + total_equity = float(getattr(snapshot, "total_equity", 0.0) or 0.0) + finalize_runtime_report( + report, + status="ok", + summary={ + "buying_power": buying_power, + "total_equity": total_equity, + "positions_count": len(positions), + }, + ) + reporting_adapters.log_event( + log_context, + "health_probe_completed", + message="Health probe completed", + execution_window="probe", + buying_power=buying_power, + total_equity=total_equity, + positions_count=len(positions), + ) + return response_body, 200 + except Exception as exc: + if report is not None: + append_runtime_report_error( + report, + stage="health_probe", + message=str(exc), + error_type=type(exc).__name__, + ) + finalize_runtime_report(report, status="error") + if reporting_adapters is not None and log_context is not None: + reporting_adapters.log_event( + log_context, + "health_probe_failed", + message="Health probe failed", + severity="ERROR", + execution_window="probe", + error_type=type(exc).__name__, + error_message=str(exc), + ) + err = f"{t('health_probe_title')}\n{t('health_probe_error_prefix')}{traceback.format_exc()}" + if composer is not None: + composer.build_notification_adapters().publish_cycle_notification( + detailed_text=err, + compact_text=err, + ) + else: + print(err, flush=True) + return "Error", 500 + finally: + try: + if reporting_adapters is not None and report is not None: + report_path = reporting_adapters.persist_execution_report(report) + print(f"execution_report {report_path}", flush=True) + except Exception as persist_exc: + print(f"failed to persist execution report: {persist_exc}", flush=True) + + @app.route("/", methods=["POST", "GET"]) def handle_trigger(): """Entrypoint for Cloud Run / scheduler: run strategy and return 200.""" @@ -311,5 +402,11 @@ def handle_precheck(): return "Precheck OK", 200 +@app.route("/probe", methods=["POST", "GET"]) +def handle_probe(): + """Post-open broker/account health probe; notify only on failure.""" + return run_probe() + + if __name__ == "__main__": app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080))) diff --git a/notifications/telegram.py b/notifications/telegram.py index ca94a47..c969077 100644 --- a/notifications/telegram.py +++ b/notifications/telegram.py @@ -31,6 +31,8 @@ "income_locked": "🏦 收入层锁定占比: {ratio}", "signal": "🎯 触发信号: {msg}", "heartbeat_title": "💓 【心跳检测】", + "health_probe_title": "🔎 【连接探针】", + "health_probe_error_prefix": "健康探针异常:\n", "equity": "💰 净值: ${value}", "buying_power": "购买力", "reserved_cash": "预留现金", @@ -59,6 +61,7 @@ "buy_deferred": "ℹ️ [买入说明] {detail}", "buy_deferred_no_investable_cash": "账户现金 ${available} 低于策略保留阈值,可投资现金为 ${investable},本轮不发起买单", "buy_deferred_non_usd_cash": "检测到非 USD 现金({currencies}),但美股策略可用 USD 现金为 ${available}、可投资现金为 ${investable};请先换汇或入金 USD 后再买入", + "buy_deferred_small_target_gap": "{symbol} 目标差额 ${diff} 未超过 1 股价格 ${price};为避免超过目标仓位,本轮不买入", "buy_deferred_small_cash": "{symbol} 目标差额 ${diff},但可投资现金 ${investable} 不足买入 1 股(价格 ${price})", "buy_deferred_cash_limit": "{symbol} 目标差额 ${diff},预算可买 {budget_qty} 股,但券商估算可买数量为 0;可能有未完成挂单、结算或购买力占用", "cash_sweep_rebuy": "🏦 [尾部回补] 剩余可投资现金回补 {symbol}: {qty}股 @ ${price}", @@ -125,6 +128,8 @@ "income_locked": "🏦 Income Locked: {ratio}", "signal": "🎯 Signal: {msg}", "heartbeat_title": "💓 【Heartbeat】", + "health_probe_title": "🔎 【Health Probe】", + "health_probe_error_prefix": "Health probe error:\n", "equity": "💰 Equity: ${value}", "buying_power": "Buying Power", "reserved_cash": "Reserved Cash", @@ -153,6 +158,7 @@ "buy_deferred": "ℹ️ [Buy note] {detail}", "buy_deferred_no_investable_cash": "Account cash ${available} is below the strategy reserve threshold, investable cash is ${investable}; no buy order this cycle", "buy_deferred_non_usd_cash": "Non-USD cash is present ({currencies}), but this US-equity strategy has USD cash ${available} and investable cash ${investable}; convert or deposit USD before buying", + "buy_deferred_small_target_gap": "{symbol} target gap ${diff} does not exceed the 1-share price ${price}; skipped to avoid exceeding the target allocation", "buy_deferred_small_cash": "{symbol} target gap ${diff}, but investable cash ${investable} is not enough for 1 share at ${price}", "buy_deferred_cash_limit": "{symbol} target gap ${diff}, budget supports {budget_qty} shares, but broker estimate returned 0; an open order, settlement, or buying-power hold may still be blocking funds", "cash_sweep_rebuy": "🏦 [tail rebuy] residual investable cash rebought {symbol}: {qty} shares @ ${price}", diff --git a/tests/test_rebalance_service.py b/tests/test_rebalance_service.py index d7fe0a3..1a69f6e 100644 --- a/tests/test_rebalance_service.py +++ b/tests/test_rebalance_service.py @@ -574,13 +574,48 @@ def test_strategy_target_rebuys_cash_sweep_symbol_after_buy_skip(self): self.assertEqual(len(sent_messages), 1) self.assertIn("🔔 【调仓指令】", sent_messages[0]) self.assertIn("SOXX.US 目标差额 $163.14", sent_messages[0]) - self.assertIn("不足买入 1 股", sent_messages[0]) + self.assertIn("SOXX.US 目标差额 $163.14 未超过 1 股价格 $504.60", sent_messages[0]) + self.assertNotIn("可投资现金 $891.03 不足买入 1 股", sent_messages[0]) self.assertNotIn("市价卖出] BOXX", sent_messages[0]) self.assertNotIn("市价买入] SOXX", sent_messages[0]) self.assertIn("市价买入] BOXX: 7股", sent_messages[0]) self.assertIn("买入说明", sent_messages[0]) self.assertNotIn("限价买入] SOXX", sent_messages[0]) + def test_target_gap_below_one_share_does_not_report_cash_shortage(self): + plan = _build_plan( + strategy_symbols=("SOXL", "SOXX", "BOXX", "QQQI", "SPYI"), + risk_symbols=("SOXL", "SOXX"), + income_symbols=("QQQI", "SPYI"), + safe_haven_symbols=("BOXX",), + targets={"SOXL": 636.28, "SOXX": 218.01, "BOXX": 105.08, "QQQI": 0.0, "SPYI": 0.0}, + market_values={"SOXL": 636.28, "SOXX": 218.01, "BOXX": 0.0, "QQQI": 0.0, "SPYI": 0.0}, + sellable_quantities={"SOXL": 4, "SOXX": 0.4326, "BOXX": 0, "QQQI": 0, "SPYI": 0}, + quantities={"SOXL": 4, "SOXX": 0.4326, "BOXX": 0, "QQQI": 0, "SPYI": 0}, + current_min_trade=100.0, + trade_threshold_value=100.0, + investable_cash=164.98, + market_status="🚀 风险开启(SOXX+SOXL)", + deploy_ratio_text="90.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="SOXX 站上 140 日门槛线,持有 SOXL 70.0% + SOXX 20.0%", + available_cash=196.50, + total_strategy_equity=1050.79, + portfolio_rows=(("SOXL", "SOXX"), ("BOXX", "QQQI", "SPYI")), + ) + + sent_messages, _, _ = self._run_strategy( + plan, + prices={"BOXX.US": 116.74}, + dry_run_only=True, + ) + + self.assertEqual(len(sent_messages), 1) + self.assertIn("💓 【心跳检测】", sent_messages[0]) + self.assertIn("BOXX.US 目标差额 $105.08 未超过 1 股价格 $116.74", sent_messages[0]) + self.assertNotIn("可投资现金 $164.98 不足买入 1 股", sent_messages[0]) + def test_strategy_target_buy_floors_to_cash_backed_whole_shares(self): plan = _build_plan( strategy_symbols=("SOXL",), diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index 4a80067..c58e975 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -251,6 +251,113 @@ def fake_run_strategy(*, force_run=False, validation_only=False, validation_labe self.assertTrue(observed["validation_only"]) self.assertEqual(observed["validation_label"], "precheck") + def test_handle_probe_checks_account_snapshot_without_success_notification(self): + module = load_module() + observed = {"override": None, "events": [], "notifications": []} + snapshot = types.SimpleNamespace( + buying_power=123.0, + total_equity=456.0, + positions=(types.SimpleNamespace(symbol="SOXL"),), + ) + + class FakePortfolioPort: + def get_portfolio_snapshot(self): + observed["snapshot_called"] = True + return snapshot + + class FakeRuntime: + def __init__(self): + self.bootstrap = lambda: ("quote-context", "trade-context", {"trend": "ok"}) + self.portfolio_port_factory = lambda quote_context, trade_context: FakePortfolioPort() + + class FakeComposer: + def build_reporting_adapters(self): + return types.SimpleNamespace( + start_run=lambda: (types.SimpleNamespace(run_id="run-001"), {"status": "pending"}), + log_event=lambda context, event, **fields: observed["events"].append((event, fields)), + persist_execution_report=lambda report: observed.setdefault("report", dict(report)) or "/tmp/report.json", + ) + + def build_rebalance_runtime(self, *, silent_cycle_notifications=False): + observed["silent_cycle_notifications"] = silent_cycle_notifications + return FakeRuntime() + + def build_notification_adapters(self): + raise AssertionError("probe success should stay silent") + + def load_strategy_plugin_signals(self, *_args, **_kwargs): + return (), None + + def attach_strategy_plugin_report(self, *_args, **_kwargs): + return None + + module.build_composer = lambda *, dry_run_only_override=None: observed.__setitem__("override", dry_run_only_override) or FakeComposer() + + with module.app.test_request_context("/probe", method="POST"): + body, status = module.handle_probe() + + self.assertEqual(status, 200) + self.assertEqual(body, "Probe OK") + self.assertTrue(observed["override"]) + self.assertTrue(observed["silent_cycle_notifications"]) + self.assertTrue(observed["snapshot_called"]) + self.assertEqual( + [event for event, _fields in observed["events"]], + ["health_probe_received", "health_probe_completed"], + ) + self.assertEqual(observed["report"]["status"], "ok") + self.assertEqual(observed["report"]["summary"]["buying_power"], 123.0) + self.assertEqual(observed["report"]["summary"]["total_equity"], 456.0) + self.assertEqual(observed["report"]["summary"]["positions_count"], 1) + + def test_handle_probe_failure_sends_notification(self): + module = load_module() + observed = {"events": [], "notifications": []} + + class FakeRuntime: + def bootstrap(self): + raise RuntimeError("probe failed") + + class FakeNotifications: + def publish_cycle_notification(self, **kwargs): + observed["notifications"].append(kwargs) + + class FakeComposer: + def build_reporting_adapters(self): + return types.SimpleNamespace( + start_run=lambda: (types.SimpleNamespace(run_id="run-001"), {"status": "pending"}), + log_event=lambda context, event, **fields: observed["events"].append((event, fields)), + persist_execution_report=lambda report: observed.setdefault("report", dict(report)) or "/tmp/report.json", + ) + + def build_rebalance_runtime(self, *, silent_cycle_notifications=False): + return FakeRuntime() + + def build_notification_adapters(self): + return FakeNotifications() + + def load_strategy_plugin_signals(self, *_args, **_kwargs): + return (), None + + def attach_strategy_plugin_report(self, *_args, **_kwargs): + return None + + module.build_composer = lambda *, dry_run_only_override=None: FakeComposer() + + with module.app.test_request_context("/probe", method="POST"): + body, status = module.handle_probe() + + self.assertEqual(status, 500) + self.assertEqual(body, "Error") + self.assertEqual(observed["report"]["status"], "error") + self.assertEqual(observed["report"]["errors"][0]["stage"], "health_probe") + self.assertEqual( + [event for event, _fields in observed["events"]], + ["health_probe_received", "health_probe_failed"], + ) + self.assertEqual(len(observed["notifications"]), 1) + self.assertIn("probe failed", observed["notifications"][0]["detailed_text"]) + def test_run_strategy_emits_structured_runtime_events(self): module = load_module() observed = [] @@ -308,7 +415,8 @@ def attach_strategy_plugin_report(self, *_args, **_kwargs): def with_prefix(self, message): return message - def build_rebalance_runtime(self): + def build_rebalance_runtime(self, *, silent_cycle_notifications=False): + observed["silent_cycle_notifications"] = silent_cycle_notifications return types.SimpleNamespace() def build_rebalance_config(self, *, strategy_plugin_signals=()): @@ -323,6 +431,7 @@ def build_rebalance_config(self, *, strategy_plugin_signals=()): module.run_strategy(force_run=True, validation_only=True) self.assertTrue(observed["override"]) + self.assertTrue(observed["silent_cycle_notifications"]) def test_run_strategy_persists_machine_readable_report(self): module = load_module() diff --git a/tests/test_runtime_composer.py b/tests/test_runtime_composer.py index 46ed20f..f3552eb 100644 --- a/tests/test_runtime_composer.py +++ b/tests/test_runtime_composer.py @@ -103,6 +103,7 @@ def fake_bootstrap_builder(**kwargs): notification_adapters = composer.build_notification_adapters() reporting_adapters = composer.build_reporting_adapters() runtime = composer.build_rebalance_runtime() + silent_runtime = composer.build_rebalance_runtime(silent_cycle_notifications=True) config = composer.build_rebalance_config() assert notification_adapters.notification_port == "notification-port" @@ -121,6 +122,8 @@ def fake_bootstrap_builder(**kwargs): assert runtime.resolve_rebalance_plan == "resolve-plan" assert runtime.market_data_port_factory == "market-data-port-factory" assert runtime.notifications == "notification-port" + silent_runtime.notifications.send_text("precheck heartbeat") + assert observed["sent_message"] == ("tg-token", "chat-id", "[HK] hello") assert runtime.post_submit_order == "post-submit-order" assert config.limit_sell_discount == 0.995 assert config.limit_buy_premium == 1.005