Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions application/execution_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,19 @@ def _apply_snapshot_price_fallbacks(
dry_run_only: bool,
snapshot_price_fallbacks: dict[str, float] | None,
) -> tuple[dict[str, float], tuple[str, ...]]:
if not dry_run_only or not snapshot_price_fallbacks:
del dry_run_only
if not snapshot_price_fallbacks:
return dict(prices), ()
resolved = dict(prices)
fallback_symbols: list[str] = []
for symbol in symbols:
normalized = str(symbol).strip().upper()
if normalized in resolved:
try:
existing_price = float(resolved.get(normalized, 0.0) or 0.0)
except (TypeError, ValueError):
existing_price = 0.0
if existing_price > 0.0:
resolved[normalized] = existing_price
continue
fallback_price = snapshot_price_fallbacks.get(normalized)
if fallback_price and float(fallback_price) > 0:
Expand All @@ -330,6 +336,27 @@ def _apply_snapshot_price_fallbacks(
return resolved, tuple(fallback_symbols)


def _normalize_price_fallbacks(signal_metadata: dict[str, Any] | None) -> dict[str, float]:
metadata = dict(signal_metadata or {})
raw_fallbacks = {}
for key in ("dry_run_price_fallbacks", "price_fallbacks"):
candidate = metadata.get(key)
if isinstance(candidate, dict):
raw_fallbacks.update(candidate)
normalized: dict[str, float] = {}
for symbol, price in raw_fallbacks.items():
normalized_symbol = str(symbol).strip().upper()
if not normalized_symbol:
continue
try:
numeric_price = float(price)
except (TypeError, ValueError):
continue
if numeric_price > 0.0:
normalized[normalized_symbol] = numeric_price
return normalized


def _format_symbol_preview(symbols: tuple[str, ...], *, limit: int = 3) -> str:
if not symbols:
return ""
Expand Down Expand Up @@ -576,11 +603,10 @@ def execute_rebalance(
if strategy_symbols:
all_symbols = all_symbols & set(strategy_symbols)

snapshot_price_fallbacks = {
str(symbol).strip().upper(): float(price)
for symbol, price in dict(signal_metadata.get("dry_run_price_fallbacks") or {}).items()
if price is not None
}
snapshot_price_fallbacks = _normalize_price_fallbacks(signal_metadata)
price_fallback_source = str(signal_metadata.get("price_fallback_source") or "").strip() or (
"snapshot_close" if signal_metadata.get("dry_run_price_fallbacks") else "close"
)
prices = get_market_prices(
ib,
all_symbols,
Expand All @@ -595,8 +621,12 @@ def execute_rebalance(
execution_summary["snapshot_price_fallback_used"] = bool(snapshot_price_fallback_symbols)
execution_summary["snapshot_price_fallback_symbols"] = list(snapshot_price_fallback_symbols)
execution_summary["snapshot_price_fallback_count"] = len(snapshot_price_fallback_symbols)
execution_summary["price_fallback_used"] = bool(snapshot_price_fallback_symbols)
execution_summary["price_fallback_symbols"] = list(snapshot_price_fallback_symbols)
execution_summary["price_fallback_count"] = len(snapshot_price_fallback_symbols)
execution_summary["price_fallback_source"] = price_fallback_source if snapshot_price_fallback_symbols else None
if snapshot_price_fallback_symbols:
execution_summary["price_source_mode"] = "mixed_market_quote_snapshot_close"
execution_summary["price_source_mode"] = f"mixed_market_quote_{price_fallback_source}"

current_mv = {}
for symbol in all_symbols:
Expand Down Expand Up @@ -667,7 +697,7 @@ def execute_rebalance(
if snapshot_price_fallback_symbols:
trade_logs.append(
translator(
"dry_run_snapshot_prices",
"dry_run_snapshot_prices" if dry_run_only else "price_fallback_prices",
count=len(snapshot_price_fallback_symbols),
symbols=_format_symbol_preview(snapshot_price_fallback_symbols),
)
Expand Down
4 changes: 3 additions & 1 deletion notifications/renderers.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ def _build_notification_trade_lines(
if execution_summary.get("snapshot_price_fallback_used") and fallback_symbols:
lines.append(
translator(
"dry_run_snapshot_prices",
"dry_run_snapshot_prices"
if execution_summary.get("mode") == "dry_run"
else "price_fallback_prices",
count=len(fallback_symbols),
symbols=_format_symbol_preview(fallback_symbols),
)
Expand Down
2 changes: 2 additions & 0 deletions notifications/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"snapshot_path_detail": "快照路径={value}",
"config_source_detail": "配置来源={value}",
"dry_run_snapshot_prices": "🧪 模拟运行估价: 使用快照收盘价 {count}个标的 ({symbols})",
"price_fallback_prices": "📌 执行估价: 使用最近收盘价 {count}个标的 ({symbols})",
"target_diff_summary": "调仓变化: {details}",
"trade_date_detail": "交易日={value}",
"target_diff": "目标差异 {symbol}: 当前={current} 目标={target} 变化={delta}",
Expand Down Expand Up @@ -145,6 +146,7 @@
"snapshot_path_detail": "snapshot_path={value}",
"config_source_detail": "config_source={value}",
"dry_run_snapshot_prices": "🧪 dry-run pricing: snapshot close for {count} symbols ({symbols})",
"price_fallback_prices": "📌 execution pricing: recent close for {count} symbols ({symbols})",
"target_diff_summary": "Target changes: {details}",
"trade_date_detail": "trade_date={value}",
"target_diff": "target_diff {symbol}: current={current} target={target} delta={delta}",
Expand Down
128 changes: 124 additions & 4 deletions strategy_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,97 @@ def _fetch_portfolio_snapshot_for_context(self, ib, *, required: bool) -> Any |
)
return None

@staticmethod
def _normalize_symbols(symbols) -> tuple[str, ...]:
normalized = []
for symbol in symbols or ():
text = str(symbol or "").strip().upper()
if text:
normalized.append(text)
return tuple(dict.fromkeys(normalized))

def _build_price_fallback_symbol_list(
self,
decision: StrategyDecision,
*,
managed_symbols: tuple[str, ...],
current_holdings,
) -> tuple[str, ...]:
decision_symbols = [getattr(position, "symbol", "") for position in decision.positions]
candidates = [
*decision_symbols,
*(current_holdings or ()),
]
if not candidates:
candidates.extend(managed_symbols)
return self._normalize_symbols(candidates)

@staticmethod
def _extract_latest_positive_close(price_history) -> float | None:
if price_history is None:
return None
if isinstance(price_history, pd.DataFrame):
if price_history.empty:
return None
if "close" in price_history.columns:
series = price_history["close"]
else:
series = price_history.iloc[:, 0]
values = pd.to_numeric(series, errors="coerce").dropna()
values = values[values > 0]
if values.empty:
return None
return float(values.iloc[-1])
if isinstance(price_history, pd.Series):
values = pd.to_numeric(price_history, errors="coerce").dropna()
values = values[values > 0]
if values.empty:
return None
return float(values.iloc[-1])
values = []
try:
iterator = iter(price_history)
except TypeError:
iterator = iter((price_history,))
for item in iterator:
if isinstance(item, Mapping):
candidate = item.get("close")
else:
candidate = getattr(item, "close", item)
try:
numeric = float(candidate)
except (TypeError, ValueError):
continue
if numeric > 0.0:
values.append(numeric)
return float(values[-1]) if values else None

def _build_historical_close_map(
self,
ib,
historical_close_loader: Callable[..., Any],
symbols: tuple[str, ...],
) -> dict[str, float]:
close_map: dict[str, float] = {}
for symbol in self._normalize_symbols(symbols):
try:
price_history = historical_close_loader(
ib,
symbol,
duration="10 D",
bar_size="1 day",
)
except Exception as exc:
self.logger(
"historical_price_fallback_failed | "
f"profile={self.profile} symbol={symbol} error_type={type(exc).__name__} error={exc}"
)
continue
latest_close = self._extract_latest_positive_close(price_history)
if latest_close is not None:
close_map[symbol] = latest_close
return close_map

def _build_strategy_context(
self,
*,
Expand Down Expand Up @@ -218,6 +309,15 @@ def _evaluate_market_data_strategy(
if safe_haven_symbol:
managed_candidates.append(safe_haven_symbol)
managed_symbols = tuple(dict.fromkeys(managed_candidates))
price_fallbacks = self._build_historical_close_map(
ib,
historical_close_loader,
self._build_price_fallback_symbol_list(
decision,
managed_symbols=managed_symbols,
current_holdings=current_holdings,
),
)
metadata = {
"strategy_profile": self.profile,
"managed_symbols": managed_symbols,
Expand All @@ -234,6 +334,10 @@ def _evaluate_market_data_strategy(
metadata["portfolio_total_equity"] = float(getattr(portfolio_snapshot, "total_equity", 0.0) or 0.0)
if safe_haven_symbol:
metadata["safe_haven_symbol"] = safe_haven_symbol
if price_fallbacks:
metadata["price_fallbacks"] = price_fallbacks
metadata["dry_run_price_fallbacks"] = price_fallbacks
metadata["price_fallback_source"] = "historical_close"
return StrategyEvaluationResult(decision=decision, metadata=metadata)

def _evaluate_value_target_strategy(
Expand Down Expand Up @@ -274,6 +378,15 @@ def _evaluate_value_target_strategy(
(position.symbol for position in decision.positions if position.role == "safe_haven"),
None,
)
price_fallbacks = self._build_historical_close_map(
ib,
historical_close_loader,
self._build_price_fallback_symbol_list(
decision,
managed_symbols=managed_symbols,
current_holdings=current_holdings,
),
)
metadata = {
"strategy_profile": self.profile,
"managed_symbols": managed_symbols,
Expand All @@ -292,6 +405,10 @@ def _evaluate_value_target_strategy(
benchmark_symbol = market_inputs.get("benchmark_symbol")
if benchmark_symbol:
metadata["benchmark_symbol"] = str(benchmark_symbol)
if price_fallbacks:
metadata["price_fallbacks"] = price_fallbacks
metadata["dry_run_price_fallbacks"] = price_fallbacks
metadata["price_fallback_source"] = "historical_close"
return StrategyEvaluationResult(decision=decision, metadata=metadata)

def _build_value_target_market_inputs(
Expand Down Expand Up @@ -413,12 +530,15 @@ def build_extra_metadata(
managed_symbols: tuple[str, ...],
_decision: StrategyDecision,
) -> Mapping[str, Any]:
price_fallbacks = self._build_snapshot_close_map(
feature_snapshot,
managed_symbols=managed_symbols,
)
return {
"trade_date": run_as_of.date().isoformat(),
"dry_run_price_fallbacks": self._build_snapshot_close_map(
feature_snapshot,
managed_symbols=managed_symbols,
),
"price_fallbacks": price_fallbacks,
"dry_run_price_fallbacks": price_fallbacks,
"price_fallback_source": "snapshot_close",
}

result = evaluate_feature_snapshot_strategy(
Expand Down
56 changes: 56 additions & 0 deletions tests/test_execution_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def translate(key, **kwargs):
"same_day_execution_locked": "same_day_execution_locked profile={profile} mode={mode} trade_date={trade_date} snapshot_date={snapshot_date} target_hash={target_hash} lock_path={lock_path}",
"execution_lock_acquired": "execution_lock_acquired mode={mode} trade_date={trade_date} snapshot_date={snapshot_date} lock_path={lock_path}",
"dry_run_snapshot_prices": "dry_run_snapshot_prices count={count} symbols={symbols}",
"price_fallback_prices": "price_fallback_prices count={count} symbols={symbols}",
"no_equity": "❌ No equity",
}
template = templates[key]
Expand Down Expand Up @@ -677,3 +678,58 @@ def accountValues(self):
assert any(log.startswith("dry_run_snapshot_prices count=2") for log in trade_logs)
assert any(log.startswith("DRY_RUN buy VOO") for log in trade_logs)
assert any(log.startswith("DRY_RUN buy BOXX") for log in trade_logs)


def test_execute_rebalance_uses_price_fallbacks_for_live_when_quotes_missing(monkeypatch, tmp_path):
class FakeIB:
def openTrades(self):
return []

def fills(self):
return []

def accountValues(self):
return [SimpleNamespace(tag="AvailableFunds", currency="USD", value="5000")]

submitted = []

def fake_submit_order_intent(_ib, intent):
submitted.append(intent)
return SimpleNamespace(broker_order_id="1", status="Submitted")

monkeypatch.setattr("application.execution_service.time.sleep", lambda _seconds: None)

trade_logs, summary = execute_rebalance(
FakeIB(),
{"VOO": 1.0},
{},
{"equity": 1000.0, "buying_power": 1000.0},
fetch_quote_snapshots=lambda *_args, **_kwargs: {},
submit_order_intent=fake_submit_order_intent,
order_intent_cls=OrderIntent,
translator=translate,
strategy_symbols=["VOO"],
strategy_profile="tech_communication_pullback_enhancement",
signal_metadata=_signal_metadata(
{"VOO": 1.0},
risk_symbols=("VOO",),
trade_date="2026-04-01",
price_fallbacks={"VOO": 100.0},
price_fallback_source="historical_close",
),
dry_run_only=False,
cash_reserve_ratio=0.0,
rebalance_threshold_ratio=0.02,
limit_buy_premium=1.005,
sell_settle_delay_sec=0,
execution_lock_dir=tmp_path,
return_summary=True,
)

assert summary["execution_status"] == "executed"
assert len(submitted) == 1
assert submitted[0].symbol == "VOO"
assert summary["snapshot_price_fallback_used"] is True
assert summary["price_fallback_source"] == "historical_close"
assert summary["price_source_mode"] == "mixed_market_quote_historical_close"
assert any(log.startswith("price_fallback_prices count=1") for log in trade_logs)
1 change: 1 addition & 0 deletions tests/test_rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def _build_test_translator():
"target_diff_summary": "target_changes {details}",
"same_day_execution_locked_notice": "same_day_execution_locked_notice {mode} {trade_date} {snapshot_date}",
"dry_run_snapshot_prices": "dry_run_snapshot_prices count={count} symbols={symbols}",
"price_fallback_prices": "price_fallback_prices count={count} symbols={symbols}",
"dry_run_buy_batch": "dry_run_buy_batch count={count} details={details}",
"dry_run_sell_batch": "dry_run_sell_batch count={count} details={details}",
"submitted_buy_batch": "submitted_buy_batch count={count} details={details}",
Expand Down
15 changes: 14 additions & 1 deletion tests/test_strategy_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,10 +565,20 @@ def fake_candle_loader(_ib, symbol, duration="2 Y", bar_size="1 day"):
for _ in range(220)
]

close_loader_symbols = []

def fake_close_loader(_ib, symbol, duration="2 Y", bar_size="1 day"):
close_loader_symbols.append((symbol, duration, bar_size))
prices = {"TQQQ": 70.0, "BOXX": 105.0}
value = prices.get(symbol)
if value is None:
return strategy_runtime_module.pd.Series(dtype=float)
return strategy_runtime_module.pd.Series([value - 1.0, value])

result = runtime.evaluate(
ib="fake-ib",
current_holdings={"TQQQ"},
historical_close_loader=lambda *_args, **_kwargs: None,
historical_close_loader=fake_close_loader,
historical_candle_loader=fake_candle_loader,
run_as_of=strategy_runtime_module.pd.Timestamp("2026-04-01"),
translator=lambda key, **_kwargs: key,
Expand All @@ -586,3 +596,6 @@ def fake_candle_loader(_ib, symbol, duration="2 Y", bar_size="1 day"):
assert result.metadata["signal_date"] == "2026-04-01"
assert result.metadata["effective_date"] == "2026-04-02"
assert result.metadata["execution_timing_contract"] == "next_trading_day"
assert close_loader_symbols == [("TQQQ", "10 D", "1 day"), ("BOXX", "10 D", "1 day")]
assert result.metadata["price_fallback_source"] == "historical_close"
assert result.metadata["price_fallbacks"] == {"TQQQ": 70.0, "BOXX": 105.0}