diff --git a/application/execution_service.py b/application/execution_service.py index dcf4b09..2c281b0 100644 --- a/application/execution_service.py +++ b/application/execution_service.py @@ -315,13 +315,19 @@ def _apply_snapshot_price_fallbacks( dry_run_only: bool, snapshot_price_fallbacks: dict[str, float] | None, ) -> tuple[dict[str, float], tuple[str, ...]]: - if not dry_run_only or not snapshot_price_fallbacks: + del dry_run_only + if not snapshot_price_fallbacks: return dict(prices), () resolved = dict(prices) fallback_symbols: list[str] = [] for symbol in symbols: normalized = str(symbol).strip().upper() - if normalized in resolved: + try: + existing_price = float(resolved.get(normalized, 0.0) or 0.0) + except (TypeError, ValueError): + existing_price = 0.0 + if existing_price > 0.0: + resolved[normalized] = existing_price continue fallback_price = snapshot_price_fallbacks.get(normalized) if fallback_price and float(fallback_price) > 0: @@ -330,6 +336,27 @@ def _apply_snapshot_price_fallbacks( return resolved, tuple(fallback_symbols) +def _normalize_price_fallbacks(signal_metadata: dict[str, Any] | None) -> dict[str, float]: + metadata = dict(signal_metadata or {}) + raw_fallbacks = {} + for key in ("dry_run_price_fallbacks", "price_fallbacks"): + candidate = metadata.get(key) + if isinstance(candidate, dict): + raw_fallbacks.update(candidate) + normalized: dict[str, float] = {} + for symbol, price in raw_fallbacks.items(): + normalized_symbol = str(symbol).strip().upper() + if not normalized_symbol: + continue + try: + numeric_price = float(price) + except (TypeError, ValueError): + continue + if numeric_price > 0.0: + normalized[normalized_symbol] = numeric_price + return normalized + + def _format_symbol_preview(symbols: tuple[str, ...], *, limit: int = 3) -> str: if not symbols: return "" @@ -576,11 +603,10 @@ def execute_rebalance( if strategy_symbols: all_symbols = all_symbols & set(strategy_symbols) - snapshot_price_fallbacks = { - str(symbol).strip().upper(): float(price) - for symbol, price in dict(signal_metadata.get("dry_run_price_fallbacks") or {}).items() - if price is not None - } + snapshot_price_fallbacks = _normalize_price_fallbacks(signal_metadata) + price_fallback_source = str(signal_metadata.get("price_fallback_source") or "").strip() or ( + "snapshot_close" if signal_metadata.get("dry_run_price_fallbacks") else "close" + ) prices = get_market_prices( ib, all_symbols, @@ -595,8 +621,12 @@ def execute_rebalance( execution_summary["snapshot_price_fallback_used"] = bool(snapshot_price_fallback_symbols) execution_summary["snapshot_price_fallback_symbols"] = list(snapshot_price_fallback_symbols) execution_summary["snapshot_price_fallback_count"] = len(snapshot_price_fallback_symbols) + execution_summary["price_fallback_used"] = bool(snapshot_price_fallback_symbols) + execution_summary["price_fallback_symbols"] = list(snapshot_price_fallback_symbols) + execution_summary["price_fallback_count"] = len(snapshot_price_fallback_symbols) + execution_summary["price_fallback_source"] = price_fallback_source if snapshot_price_fallback_symbols else None if snapshot_price_fallback_symbols: - execution_summary["price_source_mode"] = "mixed_market_quote_snapshot_close" + execution_summary["price_source_mode"] = f"mixed_market_quote_{price_fallback_source}" current_mv = {} for symbol in all_symbols: @@ -667,7 +697,7 @@ def execute_rebalance( if snapshot_price_fallback_symbols: trade_logs.append( translator( - "dry_run_snapshot_prices", + "dry_run_snapshot_prices" if dry_run_only else "price_fallback_prices", count=len(snapshot_price_fallback_symbols), symbols=_format_symbol_preview(snapshot_price_fallback_symbols), ) diff --git a/notifications/renderers.py b/notifications/renderers.py index 18984b7..03b8606 100644 --- a/notifications/renderers.py +++ b/notifications/renderers.py @@ -191,7 +191,9 @@ def _build_notification_trade_lines( if execution_summary.get("snapshot_price_fallback_used") and fallback_symbols: lines.append( translator( - "dry_run_snapshot_prices", + "dry_run_snapshot_prices" + if execution_summary.get("mode") == "dry_run" + else "price_fallback_prices", count=len(fallback_symbols), symbols=_format_symbol_preview(fallback_symbols), ) diff --git a/notifications/telegram.py b/notifications/telegram.py index ad6c2d8..41d5443 100644 --- a/notifications/telegram.py +++ b/notifications/telegram.py @@ -37,6 +37,7 @@ "snapshot_path_detail": "快照路径={value}", "config_source_detail": "配置来源={value}", "dry_run_snapshot_prices": "🧪 模拟运行估价: 使用快照收盘价 {count}个标的 ({symbols})", + "price_fallback_prices": "📌 执行估价: 使用最近收盘价 {count}个标的 ({symbols})", "target_diff_summary": "调仓变化: {details}", "trade_date_detail": "交易日={value}", "target_diff": "目标差异 {symbol}: 当前={current} 目标={target} 变化={delta}", @@ -145,6 +146,7 @@ "snapshot_path_detail": "snapshot_path={value}", "config_source_detail": "config_source={value}", "dry_run_snapshot_prices": "🧪 dry-run pricing: snapshot close for {count} symbols ({symbols})", + "price_fallback_prices": "📌 execution pricing: recent close for {count} symbols ({symbols})", "target_diff_summary": "Target changes: {details}", "trade_date_detail": "trade_date={value}", "target_diff": "target_diff {symbol}: current={current} target={target} delta={delta}", diff --git a/strategy_runtime.py b/strategy_runtime.py index 57af199..3081372 100644 --- a/strategy_runtime.py +++ b/strategy_runtime.py @@ -104,6 +104,97 @@ def _fetch_portfolio_snapshot_for_context(self, ib, *, required: bool) -> Any | ) return None + @staticmethod + def _normalize_symbols(symbols) -> tuple[str, ...]: + normalized = [] + for symbol in symbols or (): + text = str(symbol or "").strip().upper() + if text: + normalized.append(text) + return tuple(dict.fromkeys(normalized)) + + def _build_price_fallback_symbol_list( + self, + decision: StrategyDecision, + *, + managed_symbols: tuple[str, ...], + current_holdings, + ) -> tuple[str, ...]: + decision_symbols = [getattr(position, "symbol", "") for position in decision.positions] + candidates = [ + *decision_symbols, + *(current_holdings or ()), + ] + if not candidates: + candidates.extend(managed_symbols) + return self._normalize_symbols(candidates) + + @staticmethod + def _extract_latest_positive_close(price_history) -> float | None: + if price_history is None: + return None + if isinstance(price_history, pd.DataFrame): + if price_history.empty: + return None + if "close" in price_history.columns: + series = price_history["close"] + else: + series = price_history.iloc[:, 0] + values = pd.to_numeric(series, errors="coerce").dropna() + values = values[values > 0] + if values.empty: + return None + return float(values.iloc[-1]) + if isinstance(price_history, pd.Series): + values = pd.to_numeric(price_history, errors="coerce").dropna() + values = values[values > 0] + if values.empty: + return None + return float(values.iloc[-1]) + values = [] + try: + iterator = iter(price_history) + except TypeError: + iterator = iter((price_history,)) + for item in iterator: + if isinstance(item, Mapping): + candidate = item.get("close") + else: + candidate = getattr(item, "close", item) + try: + numeric = float(candidate) + except (TypeError, ValueError): + continue + if numeric > 0.0: + values.append(numeric) + return float(values[-1]) if values else None + + def _build_historical_close_map( + self, + ib, + historical_close_loader: Callable[..., Any], + symbols: tuple[str, ...], + ) -> dict[str, float]: + close_map: dict[str, float] = {} + for symbol in self._normalize_symbols(symbols): + try: + price_history = historical_close_loader( + ib, + symbol, + duration="10 D", + bar_size="1 day", + ) + except Exception as exc: + self.logger( + "historical_price_fallback_failed | " + f"profile={self.profile} symbol={symbol} error_type={type(exc).__name__} error={exc}" + ) + continue + latest_close = self._extract_latest_positive_close(price_history) + if latest_close is not None: + close_map[symbol] = latest_close + return close_map + def _build_strategy_context( self, *, @@ -218,6 +309,15 @@ def _evaluate_market_data_strategy( if safe_haven_symbol: managed_candidates.append(safe_haven_symbol) managed_symbols = tuple(dict.fromkeys(managed_candidates)) + price_fallbacks = self._build_historical_close_map( + ib, + historical_close_loader, + self._build_price_fallback_symbol_list( + decision, + managed_symbols=managed_symbols, + current_holdings=current_holdings, + ), + ) metadata = { "strategy_profile": self.profile, "managed_symbols": managed_symbols, @@ -234,6 +334,10 @@ def _evaluate_market_data_strategy( metadata["portfolio_total_equity"] = float(getattr(portfolio_snapshot, "total_equity", 0.0) or 0.0) if safe_haven_symbol: metadata["safe_haven_symbol"] = safe_haven_symbol + if price_fallbacks: + metadata["price_fallbacks"] = price_fallbacks + metadata["dry_run_price_fallbacks"] = price_fallbacks + metadata["price_fallback_source"] = "historical_close" return StrategyEvaluationResult(decision=decision, metadata=metadata) def _evaluate_value_target_strategy( @@ -274,6 +378,15 @@ def _evaluate_value_target_strategy( (position.symbol for position in decision.positions if position.role == "safe_haven"), None, ) + price_fallbacks = self._build_historical_close_map( + ib, + historical_close_loader, + self._build_price_fallback_symbol_list( + decision, + managed_symbols=managed_symbols, + current_holdings=current_holdings, + ), + ) metadata = { "strategy_profile": self.profile, "managed_symbols": managed_symbols, @@ -292,6 +405,10 @@ def _evaluate_value_target_strategy( benchmark_symbol = market_inputs.get("benchmark_symbol") if benchmark_symbol: metadata["benchmark_symbol"] = str(benchmark_symbol) + if price_fallbacks: + metadata["price_fallbacks"] = price_fallbacks + metadata["dry_run_price_fallbacks"] = price_fallbacks + metadata["price_fallback_source"] = "historical_close" return StrategyEvaluationResult(decision=decision, metadata=metadata) def _build_value_target_market_inputs( @@ -413,12 +530,15 @@ def build_extra_metadata( managed_symbols: tuple[str, ...], _decision: StrategyDecision, ) -> Mapping[str, Any]: + price_fallbacks = self._build_snapshot_close_map( + feature_snapshot, + managed_symbols=managed_symbols, + ) return { "trade_date": run_as_of.date().isoformat(), - "dry_run_price_fallbacks": self._build_snapshot_close_map( - feature_snapshot, - managed_symbols=managed_symbols, - ), + "price_fallbacks": price_fallbacks, + "dry_run_price_fallbacks": price_fallbacks, + "price_fallback_source": "snapshot_close", } result = evaluate_feature_snapshot_strategy( diff --git a/tests/test_execution_service.py b/tests/test_execution_service.py index 9d52a0b..a62152b 100644 --- a/tests/test_execution_service.py +++ b/tests/test_execution_service.py @@ -53,6 +53,7 @@ def translate(key, **kwargs): "same_day_execution_locked": "same_day_execution_locked profile={profile} mode={mode} trade_date={trade_date} snapshot_date={snapshot_date} target_hash={target_hash} lock_path={lock_path}", "execution_lock_acquired": "execution_lock_acquired mode={mode} trade_date={trade_date} snapshot_date={snapshot_date} lock_path={lock_path}", "dry_run_snapshot_prices": "dry_run_snapshot_prices count={count} symbols={symbols}", + "price_fallback_prices": "price_fallback_prices count={count} symbols={symbols}", "no_equity": "❌ No equity", } template = templates[key] @@ -677,3 +678,58 @@ def accountValues(self): assert any(log.startswith("dry_run_snapshot_prices count=2") for log in trade_logs) assert any(log.startswith("DRY_RUN buy VOO") for log in trade_logs) assert any(log.startswith("DRY_RUN buy BOXX") for log in trade_logs) + + +def test_execute_rebalance_uses_price_fallbacks_for_live_when_quotes_missing(monkeypatch, tmp_path): + class FakeIB: + def openTrades(self): + return [] + + def fills(self): + return [] + + def accountValues(self): + return [SimpleNamespace(tag="AvailableFunds", currency="USD", value="5000")] + + submitted = [] + + def fake_submit_order_intent(_ib, intent): + submitted.append(intent) + return SimpleNamespace(broker_order_id="1", status="Submitted") + + monkeypatch.setattr("application.execution_service.time.sleep", lambda _seconds: None) + + trade_logs, summary = execute_rebalance( + FakeIB(), + {"VOO": 1.0}, + {}, + {"equity": 1000.0, "buying_power": 1000.0}, + fetch_quote_snapshots=lambda *_args, **_kwargs: {}, + submit_order_intent=fake_submit_order_intent, + order_intent_cls=OrderIntent, + translator=translate, + strategy_symbols=["VOO"], + strategy_profile="tech_communication_pullback_enhancement", + signal_metadata=_signal_metadata( + {"VOO": 1.0}, + risk_symbols=("VOO",), + trade_date="2026-04-01", + price_fallbacks={"VOO": 100.0}, + price_fallback_source="historical_close", + ), + dry_run_only=False, + cash_reserve_ratio=0.0, + rebalance_threshold_ratio=0.02, + limit_buy_premium=1.005, + sell_settle_delay_sec=0, + execution_lock_dir=tmp_path, + return_summary=True, + ) + + assert summary["execution_status"] == "executed" + assert len(submitted) == 1 + assert submitted[0].symbol == "VOO" + assert summary["snapshot_price_fallback_used"] is True + assert summary["price_fallback_source"] == "historical_close" + assert summary["price_source_mode"] == "mixed_market_quote_historical_close" + assert any(log.startswith("price_fallback_prices count=1") for log in trade_logs) diff --git a/tests/test_rebalance_service.py b/tests/test_rebalance_service.py index 51922f3..c09ee59 100644 --- a/tests/test_rebalance_service.py +++ b/tests/test_rebalance_service.py @@ -45,6 +45,7 @@ def _build_test_translator(): "target_diff_summary": "target_changes {details}", "same_day_execution_locked_notice": "same_day_execution_locked_notice {mode} {trade_date} {snapshot_date}", "dry_run_snapshot_prices": "dry_run_snapshot_prices count={count} symbols={symbols}", + "price_fallback_prices": "price_fallback_prices count={count} symbols={symbols}", "dry_run_buy_batch": "dry_run_buy_batch count={count} details={details}", "dry_run_sell_batch": "dry_run_sell_batch count={count} details={details}", "submitted_buy_batch": "submitted_buy_batch count={count} details={details}", diff --git a/tests/test_strategy_runtime.py b/tests/test_strategy_runtime.py index b1328df..29fe1d8 100644 --- a/tests/test_strategy_runtime.py +++ b/tests/test_strategy_runtime.py @@ -565,10 +565,20 @@ def fake_candle_loader(_ib, symbol, duration="2 Y", bar_size="1 day"): for _ in range(220) ] + close_loader_symbols = [] + + def fake_close_loader(_ib, symbol, duration="2 Y", bar_size="1 day"): + close_loader_symbols.append((symbol, duration, bar_size)) + prices = {"TQQQ": 70.0, "BOXX": 105.0} + value = prices.get(symbol) + if value is None: + return strategy_runtime_module.pd.Series(dtype=float) + return strategy_runtime_module.pd.Series([value - 1.0, value]) + result = runtime.evaluate( ib="fake-ib", current_holdings={"TQQQ"}, - historical_close_loader=lambda *_args, **_kwargs: None, + historical_close_loader=fake_close_loader, historical_candle_loader=fake_candle_loader, run_as_of=strategy_runtime_module.pd.Timestamp("2026-04-01"), translator=lambda key, **_kwargs: key, @@ -586,3 +596,6 @@ def fake_candle_loader(_ib, symbol, duration="2 Y", bar_size="1 day"): assert result.metadata["signal_date"] == "2026-04-01" assert result.metadata["effective_date"] == "2026-04-02" assert result.metadata["execution_timing_contract"] == "next_trading_day" + assert close_loader_symbols == [("TQQQ", "10 D", "1 day"), ("BOXX", "10 D", "1 day")] + assert result.metadata["price_fallback_source"] == "historical_close" + assert result.metadata["price_fallbacks"] == {"TQQQ": 70.0, "BOXX": 105.0}