Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions application/execution_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,15 +658,27 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
investable_cash = max(0, investable_cash - cost_estimate)
action_done = True
else:
if diff <= investable_cash:
note_kind = "buy_deferred_small_target_gap"
note_kwargs = {
"symbol": f"{symbol}.US",
"diff": f"{diff:.2f}",
"price": f"{price:.2f}",
}
else:
note_kind = "buy_deferred_small_cash"
note_kwargs = {
"symbol": f"{symbol}.US",
"diff": f"{diff:.2f}",
"investable": f"{investable_cash:.2f}",
"price": f"{price:.2f}",
}
record_note_log(
note_logs,
translator=translator,
with_prefix=with_prefix,
kind="buy_deferred_small_cash",
symbol=f"{symbol}.US",
diff=f"{diff:.2f}",
investable=f"{investable_cash:.2f}",
price=f"{price:.2f}",
kind=note_kind,
**note_kwargs,
)

if (
Expand Down
10 changes: 8 additions & 2 deletions application/runtime_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from application.runtime_dependencies import LongBridgeRebalanceConfig, LongBridgeRebalanceRuntime
from application.runtime_notification_adapters import build_runtime_notification_adapters
from application.runtime_reporting_adapters import build_runtime_reporting_adapters
from quant_platform_kit.common.port_adapters import CallableNotificationPort
from quant_platform_kit.common.runtime_assembly import build_runtime_assembly
from quant_platform_kit.common.runtime_target import build_runtime_context_fields
from quant_platform_kit.common.runtime_target import RuntimeTarget
Expand Down Expand Up @@ -144,8 +145,13 @@ def build_reporting_adapters(self):
printer=lambda line: self.printer(line, flush=True),
)

def build_rebalance_runtime(self) -> LongBridgeRebalanceRuntime:
def build_rebalance_runtime(self, *, silent_cycle_notifications: bool = False) -> LongBridgeRebalanceRuntime:
notification_adapters = self.build_notification_adapters()
notifications = (
CallableNotificationPort(lambda _message: None)
if silent_cycle_notifications
else notification_adapters.notification_port
)
return LongBridgeRebalanceRuntime(
bootstrap=self.bootstrap_builder(
project_id=self.project_id,
Expand All @@ -160,7 +166,7 @@ def build_rebalance_runtime(self) -> LongBridgeRebalanceRuntime:
resolve_rebalance_plan=self.strategy_adapters.resolve_rebalance_plan,
market_data_port_factory=self.broker_adapters.build_market_data_port,
estimate_max_purchase_quantity=self.estimate_max_purchase_quantity_fn,
notifications=notification_adapters.notification_port,
notifications=notifications,
notify_issue=notification_adapters.notify_issue,
portfolio_port_factory=self.broker_adapters.build_portfolio_port,
execution_port_factory=self.broker_adapters.build_execution_port,
Expand Down
99 changes: 98 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali
flush=True,
)
run_rebalance_cycle(
runtime=composer.build_rebalance_runtime(),
runtime=composer.build_rebalance_runtime(
silent_cycle_notifications=validation_only,
),
config=composer.build_rebalance_config(strategy_plugin_signals=strategy_plugin_signals),
)
finalize_runtime_report(report, status="ok")
Expand Down Expand Up @@ -290,6 +292,95 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False, vali
except Exception as persist_exc:
print(f"failed to persist execution report: {persist_exc}", flush=True)


def run_probe(*, response_body: str = "Probe OK"):
composer = None
reporting_adapters = None
log_context = None
report = None
try:
composer = build_composer(dry_run_only_override=True)
reporting_adapters = composer.build_reporting_adapters()
log_context, report = reporting_adapters.start_run()
strategy_plugin_signals, strategy_plugin_error = composer.load_strategy_plugin_signals(
getattr(RUNTIME_SETTINGS, "strategy_plugin_mounts_json", None)
)
composer.attach_strategy_plugin_report(
report,
signals=strategy_plugin_signals,
error=strategy_plugin_error,
)
reporting_adapters.log_event(
log_context,
"health_probe_received",
message="Received health probe request",
execution_window="probe",
)
runtime = composer.build_rebalance_runtime(silent_cycle_notifications=True)
quote_context, trade_context, _indicators = runtime.bootstrap()
snapshot = runtime.portfolio_port_factory(
quote_context,
trade_context,
).get_portfolio_snapshot()
positions = tuple(getattr(snapshot, "positions", ()) or ())
buying_power = float(getattr(snapshot, "buying_power", 0.0) or 0.0)
total_equity = float(getattr(snapshot, "total_equity", 0.0) or 0.0)
finalize_runtime_report(
report,
status="ok",
summary={
"buying_power": buying_power,
"total_equity": total_equity,
"positions_count": len(positions),
},
)
reporting_adapters.log_event(
log_context,
"health_probe_completed",
message="Health probe completed",
execution_window="probe",
buying_power=buying_power,
total_equity=total_equity,
positions_count=len(positions),
)
return response_body, 200
except Exception as exc:
if report is not None:
append_runtime_report_error(
report,
stage="health_probe",
message=str(exc),
error_type=type(exc).__name__,
)
finalize_runtime_report(report, status="error")
if reporting_adapters is not None and log_context is not None:
reporting_adapters.log_event(
log_context,
"health_probe_failed",
message="Health probe failed",
severity="ERROR",
execution_window="probe",
error_type=type(exc).__name__,
error_message=str(exc),
)
err = f"{t('health_probe_title')}\n{t('health_probe_error_prefix')}{traceback.format_exc()}"
if composer is not None:
composer.build_notification_adapters().publish_cycle_notification(
detailed_text=err,
compact_text=err,
)
else:
print(err, flush=True)
return "Error", 500
finally:
try:
if reporting_adapters is not None and report is not None:
report_path = reporting_adapters.persist_execution_report(report)
print(f"execution_report {report_path}", flush=True)
except Exception as persist_exc:
print(f"failed to persist execution report: {persist_exc}", flush=True)


@app.route("/", methods=["POST", "GET"])
def handle_trigger():
"""Entrypoint for Cloud Run / scheduler: run strategy and return 200."""
Expand All @@ -311,5 +402,11 @@ def handle_precheck():
return "Precheck OK", 200


@app.route("/probe", methods=["POST", "GET"])
def handle_probe():
"""Post-open broker/account health probe; notify only on failure."""
return run_probe()


if __name__ == "__main__":
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
6 changes: 6 additions & 0 deletions notifications/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
"income_locked": "🏦 收入层锁定占比: {ratio}",
"signal": "🎯 触发信号: {msg}",
"heartbeat_title": "💓 【心跳检测】",
"health_probe_title": "🔎 【连接探针】",
"health_probe_error_prefix": "健康探针异常:\n",
"equity": "💰 净值: ${value}",
"buying_power": "购买力",
"reserved_cash": "预留现金",
Expand Down Expand Up @@ -59,6 +61,7 @@
"buy_deferred": "ℹ️ [买入说明] {detail}",
"buy_deferred_no_investable_cash": "账户现金 ${available} 低于策略保留阈值,可投资现金为 ${investable},本轮不发起买单",
"buy_deferred_non_usd_cash": "检测到非 USD 现金({currencies}),但美股策略可用 USD 现金为 ${available}、可投资现金为 ${investable};请先换汇或入金 USD 后再买入",
"buy_deferred_small_target_gap": "{symbol} 目标差额 ${diff} 未超过 1 股价格 ${price};为避免超过目标仓位,本轮不买入",
"buy_deferred_small_cash": "{symbol} 目标差额 ${diff},但可投资现金 ${investable} 不足买入 1 股(价格 ${price})",
"buy_deferred_cash_limit": "{symbol} 目标差额 ${diff},预算可买 {budget_qty} 股,但券商估算可买数量为 0;可能有未完成挂单、结算或购买力占用",
"cash_sweep_rebuy": "🏦 [尾部回补] 剩余可投资现金回补 {symbol}: {qty}股 @ ${price}",
Expand Down Expand Up @@ -125,6 +128,8 @@
"income_locked": "🏦 Income Locked: {ratio}",
"signal": "🎯 Signal: {msg}",
"heartbeat_title": "💓 【Heartbeat】",
"health_probe_title": "🔎 【Health Probe】",
"health_probe_error_prefix": "Health probe error:\n",
"equity": "💰 Equity: ${value}",
"buying_power": "Buying Power",
"reserved_cash": "Reserved Cash",
Expand Down Expand Up @@ -153,6 +158,7 @@
"buy_deferred": "ℹ️ [Buy note] {detail}",
"buy_deferred_no_investable_cash": "Account cash ${available} is below the strategy reserve threshold, investable cash is ${investable}; no buy order this cycle",
"buy_deferred_non_usd_cash": "Non-USD cash is present ({currencies}), but this US-equity strategy has USD cash ${available} and investable cash ${investable}; convert or deposit USD before buying",
"buy_deferred_small_target_gap": "{symbol} target gap ${diff} does not exceed the 1-share price ${price}; skipped to avoid exceeding the target allocation",
"buy_deferred_small_cash": "{symbol} target gap ${diff}, but investable cash ${investable} is not enough for 1 share at ${price}",
"buy_deferred_cash_limit": "{symbol} target gap ${diff}, budget supports {budget_qty} shares, but broker estimate returned 0; an open order, settlement, or buying-power hold may still be blocking funds",
"cash_sweep_rebuy": "🏦 [tail rebuy] residual investable cash rebought {symbol}: {qty} shares @ ${price}",
Expand Down
37 changes: 36 additions & 1 deletion tests/test_rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,48 @@ def test_strategy_target_rebuys_cash_sweep_symbol_after_buy_skip(self):
self.assertEqual(len(sent_messages), 1)
self.assertIn("🔔 【调仓指令】", sent_messages[0])
self.assertIn("SOXX.US 目标差额 $163.14", sent_messages[0])
self.assertIn("不足买入 1 股", sent_messages[0])
self.assertIn("SOXX.US 目标差额 $163.14 未超过 1 股价格 $504.60", sent_messages[0])
self.assertNotIn("可投资现金 $891.03 不足买入 1 股", sent_messages[0])
self.assertNotIn("市价卖出] BOXX", sent_messages[0])
self.assertNotIn("市价买入] SOXX", sent_messages[0])
self.assertIn("市价买入] BOXX: 7股", sent_messages[0])
self.assertIn("买入说明", sent_messages[0])
self.assertNotIn("限价买入] SOXX", sent_messages[0])

def test_target_gap_below_one_share_does_not_report_cash_shortage(self):
plan = _build_plan(
strategy_symbols=("SOXL", "SOXX", "BOXX", "QQQI", "SPYI"),
risk_symbols=("SOXL", "SOXX"),
income_symbols=("QQQI", "SPYI"),
safe_haven_symbols=("BOXX",),
targets={"SOXL": 636.28, "SOXX": 218.01, "BOXX": 105.08, "QQQI": 0.0, "SPYI": 0.0},
market_values={"SOXL": 636.28, "SOXX": 218.01, "BOXX": 0.0, "QQQI": 0.0, "SPYI": 0.0},
sellable_quantities={"SOXL": 4, "SOXX": 0.4326, "BOXX": 0, "QQQI": 0, "SPYI": 0},
quantities={"SOXL": 4, "SOXX": 0.4326, "BOXX": 0, "QQQI": 0, "SPYI": 0},
current_min_trade=100.0,
trade_threshold_value=100.0,
investable_cash=164.98,
market_status="🚀 风险开启(SOXX+SOXL)",
deploy_ratio_text="90.0%",
income_ratio_text="0.0%",
income_locked_ratio_text="0.0%",
signal_message="SOXX 站上 140 日门槛线,持有 SOXL 70.0% + SOXX 20.0%",
available_cash=196.50,
total_strategy_equity=1050.79,
portfolio_rows=(("SOXL", "SOXX"), ("BOXX", "QQQI", "SPYI")),
)

sent_messages, _, _ = self._run_strategy(
plan,
prices={"BOXX.US": 116.74},
dry_run_only=True,
)

self.assertEqual(len(sent_messages), 1)
self.assertIn("💓 【心跳检测】", sent_messages[0])
self.assertIn("BOXX.US 目标差额 $105.08 未超过 1 股价格 $116.74", sent_messages[0])
self.assertNotIn("可投资现金 $164.98 不足买入 1 股", sent_messages[0])

def test_strategy_target_buy_floors_to_cash_backed_whole_shares(self):
plan = _build_plan(
strategy_symbols=("SOXL",),
Expand Down
111 changes: 110 additions & 1 deletion tests/test_request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,113 @@ def fake_run_strategy(*, force_run=False, validation_only=False, validation_labe
self.assertTrue(observed["validation_only"])
self.assertEqual(observed["validation_label"], "precheck")

def test_handle_probe_checks_account_snapshot_without_success_notification(self):
module = load_module()
observed = {"override": None, "events": [], "notifications": []}
snapshot = types.SimpleNamespace(
buying_power=123.0,
total_equity=456.0,
positions=(types.SimpleNamespace(symbol="SOXL"),),
)

class FakePortfolioPort:
def get_portfolio_snapshot(self):
observed["snapshot_called"] = True
return snapshot

class FakeRuntime:
def __init__(self):
self.bootstrap = lambda: ("quote-context", "trade-context", {"trend": "ok"})
self.portfolio_port_factory = lambda quote_context, trade_context: FakePortfolioPort()

class FakeComposer:
def build_reporting_adapters(self):
return types.SimpleNamespace(
start_run=lambda: (types.SimpleNamespace(run_id="run-001"), {"status": "pending"}),
log_event=lambda context, event, **fields: observed["events"].append((event, fields)),
persist_execution_report=lambda report: observed.setdefault("report", dict(report)) or "/tmp/report.json",
)

def build_rebalance_runtime(self, *, silent_cycle_notifications=False):
observed["silent_cycle_notifications"] = silent_cycle_notifications
return FakeRuntime()

def build_notification_adapters(self):
raise AssertionError("probe success should stay silent")

def load_strategy_plugin_signals(self, *_args, **_kwargs):
return (), None

def attach_strategy_plugin_report(self, *_args, **_kwargs):
return None

module.build_composer = lambda *, dry_run_only_override=None: observed.__setitem__("override", dry_run_only_override) or FakeComposer()

with module.app.test_request_context("/probe", method="POST"):
body, status = module.handle_probe()

self.assertEqual(status, 200)
self.assertEqual(body, "Probe OK")
self.assertTrue(observed["override"])
self.assertTrue(observed["silent_cycle_notifications"])
self.assertTrue(observed["snapshot_called"])
self.assertEqual(
[event for event, _fields in observed["events"]],
["health_probe_received", "health_probe_completed"],
)
self.assertEqual(observed["report"]["status"], "ok")
self.assertEqual(observed["report"]["summary"]["buying_power"], 123.0)
self.assertEqual(observed["report"]["summary"]["total_equity"], 456.0)
self.assertEqual(observed["report"]["summary"]["positions_count"], 1)

def test_handle_probe_failure_sends_notification(self):
module = load_module()
observed = {"events": [], "notifications": []}

class FakeRuntime:
def bootstrap(self):
raise RuntimeError("probe failed")

class FakeNotifications:
def publish_cycle_notification(self, **kwargs):
observed["notifications"].append(kwargs)

class FakeComposer:
def build_reporting_adapters(self):
return types.SimpleNamespace(
start_run=lambda: (types.SimpleNamespace(run_id="run-001"), {"status": "pending"}),
log_event=lambda context, event, **fields: observed["events"].append((event, fields)),
persist_execution_report=lambda report: observed.setdefault("report", dict(report)) or "/tmp/report.json",
)

def build_rebalance_runtime(self, *, silent_cycle_notifications=False):
return FakeRuntime()

def build_notification_adapters(self):
return FakeNotifications()

def load_strategy_plugin_signals(self, *_args, **_kwargs):
return (), None

def attach_strategy_plugin_report(self, *_args, **_kwargs):
return None

module.build_composer = lambda *, dry_run_only_override=None: FakeComposer()

with module.app.test_request_context("/probe", method="POST"):
body, status = module.handle_probe()

self.assertEqual(status, 500)
self.assertEqual(body, "Error")
self.assertEqual(observed["report"]["status"], "error")
self.assertEqual(observed["report"]["errors"][0]["stage"], "health_probe")
self.assertEqual(
[event for event, _fields in observed["events"]],
["health_probe_received", "health_probe_failed"],
)
self.assertEqual(len(observed["notifications"]), 1)
self.assertIn("probe failed", observed["notifications"][0]["detailed_text"])

def test_run_strategy_emits_structured_runtime_events(self):
module = load_module()
observed = []
Expand Down Expand Up @@ -308,7 +415,8 @@ def attach_strategy_plugin_report(self, *_args, **_kwargs):
def with_prefix(self, message):
return message

def build_rebalance_runtime(self):
def build_rebalance_runtime(self, *, silent_cycle_notifications=False):
observed["silent_cycle_notifications"] = silent_cycle_notifications
return types.SimpleNamespace()

def build_rebalance_config(self, *, strategy_plugin_signals=()):
Expand All @@ -323,6 +431,7 @@ def build_rebalance_config(self, *, strategy_plugin_signals=()):
module.run_strategy(force_run=True, validation_only=True)

self.assertTrue(observed["override"])
self.assertTrue(observed["silent_cycle_notifications"])

def test_run_strategy_persists_machine_readable_report(self):
module = load_module()
Expand Down
Loading