Fix auto presets at login for outputs where PipeWire restores the selected speaker sink before reporting the active port route.
+Fixed auto presets sometimes resetting to Neutral when PipeWire route metadata changed.
diff --git a/pyproject.toml b/pyproject.toml index a7e3a93..5d18cd3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mini-eq" -version = "0.8.5" +version = "0.8.6" description = "Compact PipeWire system-wide parametric equalizer for Linux desktops." readme = "README.md" requires-python = ">=3.11" diff --git a/src/mini_eq/core.py b/src/mini_eq/core.py index de52026..0dbc568 100644 --- a/src/mini_eq/core.py +++ b/src/mini_eq/core.py @@ -9,6 +9,7 @@ from dataclasses import dataclass from functools import lru_cache from pathlib import Path +from urllib.parse import unquote import numpy as np @@ -23,6 +24,7 @@ PRESET_FILE_SUFFIX = ".json" OUTPUT_PRESET_LINKS_VERSION = 1 OUTPUT_PRESET_LINKS_FILE = "output-presets.json" +OUTPUT_PRESET_ROUTE_KEY_PREFIX = "pipewire-route:v1:" EQ_MODE_APO = 6 SAMPLE_RATE = 48000.0 GRAPH_FREQ_MIN = 20.0 @@ -426,6 +428,35 @@ def normalize_output_preset_key_candidates(output_keys: str | Iterable[str | Non return normalized +def parse_output_preset_route_key(output_key: str | None) -> dict[str, object] | None: + key = str(output_key or "").strip() + if not key.startswith(OUTPUT_PRESET_ROUTE_KEY_PREFIX): + return None + + fields: dict[str, str] = {} + for part in key[len(OUTPUT_PRESET_ROUTE_KEY_PREFIX) :].split(";"): + if "=" not in part: + return None + name, value = part.split("=", 1) + fields[name] = unquote(value) + + device_name = fields.get("device", "").strip() + route_name = fields.get("route", "").strip() + try: + route_device = int(fields.get("route-device", "")) + except ValueError: + return None + + if not device_name or not route_name or route_device < 0: + return None + + return { + "device": device_name, + "route": route_name, + "route_device": route_device, + } + + def load_output_preset_config() -> tuple[dict[str, str], str | None]: links_path = output_preset_links_path() if not links_path.exists(): @@ -491,6 +522,34 @@ def get_output_preset_link(output_keys: str | Iterable[str | None] | None) -> st return match[1] if match is not None else None +def get_output_preset_route_device_link_match( + device_name: str | None, + route_device: int | None, +) -> tuple[str, str] | None: + device = str(device_name or "").strip() + try: + route_device_id = int(route_device) + except (TypeError, ValueError): + return None + + if not device or route_device_id < 0: + return None + + links, _default_preset = load_output_preset_config() + matches: list[tuple[str, str]] = [] + for output_key, preset_name in links.items(): + route_key = parse_output_preset_route_key(output_key) + if route_key is None: + continue + if route_key["device"] == device and route_key["route_device"] == route_device_id: + matches.append((output_key, preset_name)) + + if len(matches) == 1: + return matches[0] + + return None + + def get_default_preset_name() -> str | None: _links, default_preset = load_output_preset_config() return default_preset diff --git a/src/mini_eq/diagnostics.py b/src/mini_eq/diagnostics.py new file mode 100644 index 0000000..ad8a76c --- /dev/null +++ b/src/mini_eq/diagnostics.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import json +import os +from datetime import UTC, datetime +from pathlib import Path + +from gi.repository import GLib + +STARTUP_TRACE_ENV = "MINI_EQ_STARTUP_TRACE" +STARTUP_TRACE_DIR_NAME = "mini-eq" +STARTUP_TRACE_FILE_NAME = "startup-trace.log" + + +def startup_trace_enabled() -> bool: + value = os.environ.get(STARTUP_TRACE_ENV, "") + return value.strip().lower() not in {"", "0", "false", "no", "off"} + + +def startup_trace_path() -> Path: + return Path(GLib.get_user_state_dir()) / STARTUP_TRACE_DIR_NAME / STARTUP_TRACE_FILE_NAME + + +def describe_output_preset_target(target: object | None) -> dict[str, object] | None: + if target is None: + return None + + route = getattr(target, "route", None) + route_info = None + if route is not None: + route_info = { + "description": _json_safe(getattr(route, "description", None)), + "device_name": _json_safe(getattr(route, "device_name", None)), + "name": _json_safe(getattr(route, "name", None)), + "output_preset_key": _json_safe(getattr(route, "output_preset_key", None)), + "route_device": _json_safe(getattr(route, "route_device", None)), + } + + return { + "device_name": _json_safe(getattr(target, "device_name", None)), + "has_route_key": bool(getattr(target, "has_route_key", False)), + "keys": _json_safe(tuple(getattr(target, "keys", ()) or ())), + "link_key": _json_safe(getattr(target, "link_key", "")), + "output_key": _json_safe(getattr(target, "output_key", None)), + "route": route_info, + "route_device": _json_safe(getattr(target, "route_device", None)), + } + + +def describe_output_preset_snapshot(snapshot: object | None) -> dict[str, object] | None: + if snapshot is None: + return None + + return { + "identity": _json_safe(getattr(snapshot, "identity", None)), + "sink_name": _json_safe(getattr(snapshot, "sink_name", None)), + "target": describe_output_preset_target(getattr(snapshot, "target", None)), + } + + +def describe_output_preset_transition(transition: object | None) -> dict[str, object] | None: + if transition is None: + return None + + return { + "changed": bool(getattr(transition, "changed", False)), + "current": describe_output_preset_snapshot(getattr(transition, "current", None)), + "previous": describe_output_preset_snapshot(getattr(transition, "previous", None)), + } + + +def trace_startup_event(event: str, **fields: object) -> None: + if not startup_trace_enabled(): + return + + path = startup_trace_path() + record = { + "event": event, + "timestamp": datetime.now(UTC).isoformat(timespec="milliseconds").replace("+00:00", "Z"), + } + record.update({key: _json_safe(value) for key, value in fields.items()}) + + try: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", encoding="utf-8") as trace_file: + trace_file.write(json.dumps(record, sort_keys=True, separators=(",", ":"))) + trace_file.write("\n") + except Exception: + return + + +def _json_safe(value: object) -> object: + if value is None or isinstance(value, str | int | float | bool): + return value + + if isinstance(value, tuple | list | set): + return [_json_safe(item) for item in value] + + if isinstance(value, dict): + return {str(key): _json_safe(item) for key, item in value.items()} + + return str(value) diff --git a/src/mini_eq/pipewire_routes.py b/src/mini_eq/pipewire_routes.py index 16ff9b8..f24dc1b 100644 --- a/src/mini_eq/pipewire_routes.py +++ b/src/mini_eq/pipewire_routes.py @@ -1,5 +1,6 @@ from __future__ import annotations +import re from dataclasses import dataclass, field from urllib.parse import quote @@ -9,6 +10,29 @@ DEVICE_ENUM_ROUTE_PARAM_NAME = "EnumRoute" DEVICE_ROUTE_EVENT_PARAM_NAMES = (DEVICE_ROUTE_PARAM_NAME, DEVICE_ENUM_ROUTE_PARAM_NAME) DEVICE_LABEL_PROPERTY_KEYS = ("device.description", "device.nick", "device.name") +ROUTE_LABEL_TOKEN_STOP_WORDS = frozenset( + { + "alsa", + "analog", + "audio", + "card", + "device", + "fi", + "generic", + "hi", + "hifi", + "in", + "input", + "out", + "output", + "pci", + "profile", + "sink", + "source", + "stereo", + "usb", + } +) @dataclass(frozen=True) @@ -35,6 +59,8 @@ class PipeWireOutputPresetTarget: output_key: str | None route: PipeWireOutputRoute | None keys: tuple[str, ...] + device_name: str | None = None + route_device: int = 0 @property def link_key(self) -> str: @@ -45,6 +71,18 @@ def has_route_key(self) -> bool: route_key = self.route.output_preset_key if self.route is not None else None return route_key is not None and route_key in self.keys + @property + def route_device_identity(self) -> str | None: + if self.has_route_key: + return None + + device = str(self.device_name or "").strip() + if not device or self.route_device <= 0: + return None + + encoded_device = quote(device, safe="") + return f"pipewire-route-device:v1:device={encoded_device};route-device={int(self.route_device)}" + def build_output_route_preset_key(device_name: str | None, route_name: str | None, route_device: int) -> str | None: device = str(device_name or "").strip() @@ -57,6 +95,58 @@ def build_output_route_preset_key(device_name: str | None, route_name: str | Non return f"{OUTPUT_PRESET_ROUTE_KEY_PREFIX}device={encoded_device};route={encoded_route};route-device={int(route_device)}" +def _route_label_tokens(value: str | None) -> tuple[str, ...]: + text = str(value or "").strip() + if not text: + return () + + text = re.sub(r"(?<=[a-z0-9])(?=[A-Z])", " ", text).casefold() + tokens: list[str] = [] + for route_part in re.findall(r"[a-z0-9]+", text): + if route_part.isdigit() or route_part in ROUTE_LABEL_TOKEN_STOP_WORDS: + continue + if len(route_part) > 3 and route_part.endswith("s") and not route_part.endswith("ss"): + route_part = route_part[:-1] + if route_part and route_part not in ROUTE_LABEL_TOKEN_STOP_WORDS and route_part not in tokens: + tokens.append(route_part) + return tuple(tokens) + + +def route_matches_sink_label(route: PipeWireOutputRoute, sink) -> bool: + sink_labels = ( + getattr(sink, "node_name", None), + getattr(sink, "node_description", None), + ) + sink_tokens: set[str] = set() + sink_compacts: list[str] = [] + for label in sink_labels: + tokens = _route_label_tokens(label) + sink_tokens.update(tokens) + compact = "".join(tokens) + if compact: + sink_compacts.append(compact) + + if not sink_tokens and not sink_compacts: + return False + + route_labels = ( + route.description, + route.name, + ) + for label in route_labels: + route_tokens = _route_label_tokens(label) + if not route_tokens: + continue + if set(route_tokens).issubset(sink_tokens): + return True + + route_compact = "".join(route_tokens) + if len(route_compact) >= 4 and any(route_compact in compact for compact in sink_compacts): + return True + + return False + + class PipeWireRouteMixin: def output_preset_keys_for_sink_name(self, sink_name: str | None) -> tuple[str, ...]: return self.output_preset_target_for_sink_name(sink_name).keys @@ -69,12 +159,28 @@ def output_preset_target_for_sink_name(self, sink_name: str | None) -> PipeWireO keys: list[str] = [] sink = self.audio_sink_by_name(node_name) route = self.output_route_for_sink(sink) + device_name = None + route_device = 0 + if route is not None: + device_name = route.device_name + route_device = route.route_device + elif sink is not None: + device_name = str(sink.properties.get("device.name") or "").strip() or self._device_name_by_bound_id( + sink.device_id + ) + route_device = int(sink.card_profile_device) route_key = route.output_preset_key if route is not None else None if route_key: keys.append(route_key) keys.append(node_name) - return PipeWireOutputPresetTarget(node_name, route, tuple(dict.fromkeys(keys))) + return PipeWireOutputPresetTarget( + node_name, + route, + tuple(dict.fromkeys(keys)), + device_name=device_name, + route_device=route_device, + ) def output_route_for_sink(self, sink) -> PipeWireOutputRoute | None: if sink is None or sink.device_id <= 0: @@ -91,14 +197,27 @@ def output_route_for_sink(self, sink) -> PipeWireOutputRoute | None: if device is None: return None - routes = self._enumerate_device_routes(device, sink.device_id) - output_routes = [ + routes = self._output_routes_from_device(device, sink.device_id, DEVICE_ROUTE_PARAM_NAME) + route = self._select_output_route_for_sink(sink, routes) + if route is not None: + return route + + enum_routes = self._output_routes_from_device(device, sink.device_id, DEVICE_ENUM_ROUTE_PARAM_NAME) + return self._select_output_route_for_sink(sink, enum_routes) + + def _output_routes_from_device( + self, + device, + device_bound_id: int, + param_name: str, + ) -> list[PipeWireOutputRoute]: + routes = self._enumerate_device_routes(device, device_bound_id, param_name) + return [ route for route in routes if str(route.direction or "").casefold() == OUTPUT_ROUTE_DIRECTION and (route.availability or "unknown").casefold() != "no" ] - return self._select_output_route_for_sink(sink, output_routes) def _cached_output_route_for_sink(self, sink) -> PipeWireOutputRoute | None: try: @@ -122,6 +241,12 @@ def _select_output_route_for_sink( if len(matching_device_routes) > 1: return None + matching_label_routes = [route for route in output_routes if route_matches_sink_label(route, sink)] + if len(matching_label_routes) == 1: + return matching_label_routes[0] + if len(matching_label_routes) > 1: + return None + if len(output_routes) == 1: return output_routes[0] @@ -218,8 +343,13 @@ def _device_properties_by_bound_id(self, bound_id: int) -> dict[str, str]: return self._properties_dict(global_) - def _enumerate_device_routes(self, device, device_bound_id: int) -> list[PipeWireOutputRoute]: - route_param_id = self._device_route_param_id(device) + def _enumerate_device_routes( + self, + device, + device_bound_id: int, + route_param_name: str = DEVICE_ROUTE_PARAM_NAME, + ) -> list[PipeWireOutputRoute]: + route_param_id = self._device_route_param_id(device, route_param_name) if route_param_id is None: return [] @@ -234,10 +364,10 @@ def _enumerate_device_routes(self, device, device_bound_id: int) -> list[PipeWir routes: list[PipeWireOutputRoute] = [] for param in self._iterate_model(params): try: - param_name = param.dup_name() + copied_param_name = param.dup_name() except Exception: - param_name = None - if param_name != DEVICE_ROUTE_PARAM_NAME: + copied_param_name = None + if copied_param_name != route_param_name: continue try: @@ -256,11 +386,11 @@ def _enumerate_device_routes(self, device, device_bound_id: int) -> list[PipeWir finally: self._device_route_refreshing_bound_ids.discard(bound_id) - def _device_route_param_id(self, device) -> int | None: - route_param_id = self._device_param_id_by_name(device, DEVICE_ROUTE_PARAM_NAME) + def _device_route_param_id(self, device, param_name: str = DEVICE_ROUTE_PARAM_NAME) -> int | None: + route_param_id = self._device_param_id_by_name(device, param_name) if route_param_id is None: self._sync_proxy(device, "device") - route_param_id = self._device_param_id_by_name(device, DEVICE_ROUTE_PARAM_NAME) + route_param_id = self._device_param_id_by_name(device, param_name) return route_param_id def _device_route_event_param_ids(self, device) -> dict[str, int]: diff --git a/src/mini_eq/routing.py b/src/mini_eq/routing.py index 29a6c31..8c78674 100644 --- a/src/mini_eq/routing.py +++ b/src/mini_eq/routing.py @@ -3,7 +3,7 @@ import json import sys from collections.abc import Callable -from dataclasses import replace +from dataclasses import dataclass, replace from gi.repository import GLib @@ -54,6 +54,47 @@ from .pipewire_stream_router import PipeWireStreamRouter FILTER_CONTROL_PARAM_ECHO_GRACE_USEC = 500_000 +_OUTPUT_PRESET_TARGET_UNSET = object() + + +@dataclass(frozen=True) +class OutputPresetTargetSnapshot: + sink_name: str | None + identity: str | None + target: object | None + + +@dataclass(frozen=True) +class OutputPresetTargetTransition: + previous: OutputPresetTargetSnapshot | None + current: OutputPresetTargetSnapshot + changed: bool + + +def output_preset_target_identity( + target: object | None, + fallback: str | None, +) -> str | None: + if target is not None: + has_route_key = bool(getattr(target, "has_route_key", False)) + link_key = str(getattr(target, "link_key", "") or "").strip() + if has_route_key and link_key: + return link_key + + route_device_identity = str(getattr(target, "route_device_identity", "") or "").strip() + if route_device_identity: + return route_device_identity + + if link_key: + return link_key + + for key in tuple(getattr(target, "keys", ()) or ()): + key_text = str(key or "").strip() + if key_text: + return key_text + + fallback_text = str(fallback or "").strip() + return fallback_text or None class SystemWideEqController: @@ -67,6 +108,7 @@ def __init__(self, output_sink: str | None) -> None: self.output_sink = output_sink or self.original_default_sink self._output_preset_target_sink: str | None = None self._output_preset_target: PipeWireOutputPresetTarget | None = None + self._observed_output_preset_target_snapshot: OutputPresetTargetSnapshot | None = None self.filter_output_name = f"{self.virtual_sink_name}{FILTER_OUTPUT_SUFFIX}" self.engine_module = None self.engine_start_watch = None @@ -199,6 +241,35 @@ def output_preset_target(self, *, refresh: bool = False) -> PipeWireOutputPreset self._output_preset_target = target return target + def output_preset_target_snapshot( + self, + target: object | None = _OUTPUT_PRESET_TARGET_UNSET, + *, + refresh: bool = False, + ) -> OutputPresetTargetSnapshot: + if target is _OUTPUT_PRESET_TARGET_UNSET: + target = self.output_preset_target(refresh=refresh) + sink_name = getattr(self, "output_sink", None) + return OutputPresetTargetSnapshot( + sink_name=sink_name, + identity=output_preset_target_identity(target, sink_name), + target=target, + ) + + def remember_output_preset_target(self, target: object | None = _OUTPUT_PRESET_TARGET_UNSET) -> None: + self._observed_output_preset_target_snapshot = self.output_preset_target_snapshot(target) + + def output_preset_target_transition(self, *, consume: bool = True) -> OutputPresetTargetTransition: + previous = getattr(self, "_observed_output_preset_target_snapshot", None) + try: + current = self.output_preset_target_snapshot() + except Exception: + current = self.output_preset_target_snapshot(None) + changed = previous is not None and previous.identity != current.identity + if consume or previous is None: + self._observed_output_preset_target_snapshot = current + return OutputPresetTargetTransition(previous, current, changed) + def default_output_sink_candidates(self, *, refresh: bool = False, snapshot: bool = False) -> tuple[str, ...]: defaults = ( self.output_backend.refresh_defaults(snapshot=snapshot) if refresh else self.output_backend.defaults() @@ -310,6 +381,7 @@ def switch_output_sink(self, sink_name: str, explicit: bool) -> None: if explicit: self.follow_default_output = False + self.invalidate_output_preset_target() output_sink = self.get_sink(sink_name) if output_sink is None: return diff --git a/src/mini_eq/window.py b/src/mini_eq/window.py index a7cf589..ceb355c 100644 --- a/src/mini_eq/window.py +++ b/src/mini_eq/window.py @@ -36,6 +36,7 @@ ensure_preset_storage_dir, estimate_response_peak_db, ) +from .diagnostics import describe_output_preset_transition, trace_startup_event from .glib_utils import destroy_glib_source from .gtk_utils import create_dropdown_from_strings from .pipewire_backend import PipeWireBackendError, PipeWireNode, node_sample_rate, parse_positive_int @@ -79,57 +80,6 @@ def compact_warning_title(message: str) -> str: return COMPACT_WARNING_TITLES.get(message, message) -def output_preset_target_identity(owner: object, fallback: str | None) -> str | None: - target = None - target_factory = getattr(owner, "output_preset_target", None) - if callable(target_factory): - try: - target = target_factory() - except Exception: - target = None - - if target is not None: - try: - link_key = str(getattr(target, "link_key", "") or "").strip() - except Exception: - link_key = "" - if link_key: - return link_key - - try: - keys = tuple(getattr(target, "keys", ()) or ()) - except Exception: - keys = () - for key in keys: - key_text = str(key or "").strip() - if key_text: - return key_text - - controller = getattr(owner, "controller", None) - if controller is not None: - link_key_factory = getattr(controller, "output_preset_link_key", None) - if callable(link_key_factory): - try: - link_key = str(link_key_factory() or "").strip() - except Exception: - link_key = "" - if link_key: - return link_key - - keys_factory = getattr(controller, "output_preset_keys", None) - if callable(keys_factory): - try: - keys = tuple(keys_factory() or ()) - except Exception: - keys = () - for key in keys: - key_text = str(key or "").strip() - if key_text: - return key_text - - return fallback - - def fit_window_default_size_to_monitor( width: int, height: int, @@ -225,8 +175,6 @@ def __init__( self.output_preset_auto_applied = False self.output_preset_curve_auto_loaded = False self.updating_output_preset_switch = False - self.last_output_preset_sink_name: str | None = None - self.last_output_preset_target_identity: str | None = None self.preset_monitor: Gio.FileMonitor | None = None self.preset_refresh_source_id = 0 self.analyzer_enabled = load_monitor_enabled() @@ -351,15 +299,39 @@ def on_startup_ready_idle(self) -> bool: if self.ui_shutting_down or self.startup_ready: return False + controller = getattr(self, "controller", None) + trace_startup_event( + "startup-ready-begin", + auto_route_on_startup=self.auto_route_on_startup, + current_preset=getattr(self, "current_preset_name", None), + output_preset_auto_applied=bool(getattr(self, "output_preset_auto_applied", False)), + output_preset_curve_auto_loaded=bool(getattr(self, "output_preset_curve_auto_loaded", False)), + output_sink=getattr(controller, "output_sink", None), + ) self.startup_ready = True self.start_preset_monitoring() - self.apply_output_preset_for_current_output() + initial_output_preset_applied = self.apply_output_preset_for_current_output() + trace_startup_event( + "startup-ready-initial-output-preset-done", + applied=initial_output_preset_applied, + current_preset=getattr(self, "current_preset_name", None), + output_preset_auto_applied=bool(getattr(self, "output_preset_auto_applied", False)), + output_preset_curve_auto_loaded=bool(getattr(self, "output_preset_curve_auto_loaded", False)), + output_sink=getattr(controller, "output_sink", None), + ) if self.ui_shutting_down: return False self.start_analyzer_preview() if self.auto_route_on_startup: + trace_startup_event( + "startup-ready-auto-route-begin", + current_preset=getattr(self, "current_preset_name", None), + output_preset_auto_applied=bool(getattr(self, "output_preset_auto_applied", False)), + output_preset_curve_auto_loaded=bool(getattr(self, "output_preset_curve_auto_loaded", False)), + output_sink=getattr(controller, "output_sink", None), + ) self.apply_startup_auto_route() if not self.ui_shutting_down: @@ -370,6 +342,13 @@ def on_startup_ready_idle(self) -> bool: prepare_startup_notification(self) self.set_visible(True) self.present() + trace_startup_event( + "startup-ready-complete", + current_preset=getattr(self, "current_preset_name", None), + output_preset_auto_applied=bool(getattr(self, "output_preset_auto_applied", False)), + output_preset_curve_auto_loaded=bool(getattr(self, "output_preset_curve_auto_loaded", False)), + output_sink=getattr(controller, "output_sink", None), + ) self.notify_control_state_changed() return False @@ -412,13 +391,64 @@ def schedule_startup_auto_route_retry_after_error(self, exc: Exception) -> bool: def apply_startup_auto_route(self) -> None: eq_was_enabled = self.controller.eq_enabled + previous_output_preset_auto_loaded = bool(getattr(self, "output_preset_curve_auto_loaded", False)) + route_applied = False + trace_startup_event( + "startup-auto-route-start", + current_preset=getattr(self, "current_preset_name", None), + eq_enabled_before=eq_was_enabled, + output_preset_auto_applied=bool(getattr(self, "output_preset_auto_applied", False)), + output_preset_curve_auto_loaded=previous_output_preset_auto_loaded, + output_sink=getattr(self.controller, "output_sink", None), + ) try: self.controller.route_system_audio(True) except Exception as exc: - if not self.schedule_startup_auto_route_retry_after_error(exc): + retry_scheduled = self.schedule_startup_auto_route_retry_after_error(exc) + trace_startup_event( + "startup-auto-route-error", + current_preset=getattr(self, "current_preset_name", None), + error=str(exc), + output_sink=getattr(self.controller, "output_sink", None), + retry_scheduled=retry_scheduled, + ) + if not retry_scheduled: self.set_status(str(exc)) else: + route_applied = True self.startup_auto_route_deadline_us = 0 + trace_startup_event( + "startup-auto-route-routed", + current_preset=getattr(self, "current_preset_name", None), + output_sink=getattr(self.controller, "output_sink", None), + ) + if route_applied: + target_transition = self.controller.output_preset_target_transition() + trace_startup_event( + "startup-auto-route-transition", + current_preset=getattr(self, "current_preset_name", None), + output_sink=getattr(self.controller, "output_sink", None), + transition=describe_output_preset_transition(target_transition), + ) + if target_transition.changed: + applied = self.apply_output_preset_for_current_output( + reset_auto_preset_without_link=previous_output_preset_auto_loaded, + announce_no_output_preset=True, + ) + trace_startup_event( + "startup-auto-route-reapply-done", + applied=applied, + current_preset=getattr(self, "current_preset_name", None), + output_preset_auto_applied=bool(getattr(self, "output_preset_auto_applied", False)), + output_preset_curve_auto_loaded=bool(getattr(self, "output_preset_curve_auto_loaded", False)), + output_sink=getattr(self.controller, "output_sink", None), + ) + else: + trace_startup_event( + "startup-auto-route-reapply-skipped", + current_preset=getattr(self, "current_preset_name", None), + output_sink=getattr(self.controller, "output_sink", None), + ) self.refresh_after_route_state_changed(eq_was_enabled=eq_was_enabled) def on_startup_auto_route_idle(self) -> bool: @@ -903,10 +933,18 @@ def refresh_output_sinks(self, *, handle_observed_output_change: bool = True) -> return active = self.controller.output_sink - previous_output = self.last_output_preset_sink_name - previous_output_identity = getattr(self, "last_output_preset_target_identity", previous_output) - active_output_identity = output_preset_target_identity(self, active) + target_transition = self.controller.output_preset_target_transition(consume=handle_observed_output_change) previous_output_preset_auto_loaded = self.output_preset_curve_auto_loaded + trace_startup_event( + "output-refresh-transition", + consume_transition=handle_observed_output_change, + current_preset=getattr(self, "current_preset_name", None), + output_preset_auto_applied=bool(getattr(self, "output_preset_auto_applied", False)), + output_preset_curve_auto_loaded=previous_output_preset_auto_loaded, + output_sink=active, + startup_ready=bool(getattr(self, "startup_ready", False)), + transition=describe_output_preset_transition(target_transition), + ) visible_sinks = self.list_visible_output_sinks() visible_sink_names = [sink.node_name for sink in visible_sinks if sink.node_name is not None] visible_sink_labels = self.build_output_sink_labels(visible_sinks) @@ -932,21 +970,23 @@ def refresh_output_sinks(self, *, handle_observed_output_change: bool = True) -> finally: self.updating_output_combo = False - output_changed = previous_output_identity is not None and previous_output_identity != active_output_identity - # App-originated selector refreshes should not consume the output - # transition; the next PipeWire-observed refresh owns preset handling. - if handle_observed_output_change or previous_output is None: - self.last_output_preset_sink_name = active - self.last_output_preset_target_identity = active_output_identity self.update_preset_state() self.update_info_label() self.update_status_summary() - if self.startup_ready and handle_observed_output_change and output_changed: - self.apply_output_preset_for_current_output( + if self.startup_ready and handle_observed_output_change and target_transition.changed: + applied = self.apply_output_preset_for_current_output( reset_auto_preset_without_link=previous_output_preset_auto_loaded, announce_no_output_preset=True, ) + trace_startup_event( + "output-refresh-reapply-done", + applied=applied, + current_preset=getattr(self, "current_preset_name", None), + output_preset_auto_applied=bool(getattr(self, "output_preset_auto_applied", False)), + output_preset_curve_auto_loaded=bool(getattr(self, "output_preset_curve_auto_loaded", False)), + output_sink=getattr(self.controller, "output_sink", None), + ) def update_info_label(self) -> None: return diff --git a/src/mini_eq/window_presets.py b/src/mini_eq/window_presets.py index a61e241..e39c225 100644 --- a/src/mini_eq/window_presets.py +++ b/src/mini_eq/window_presets.py @@ -21,7 +21,8 @@ ensure_json_suffix, fader_band_count_for_profile, get_output_preset_fallback_name, - get_output_preset_link, + get_output_preset_link_match, + get_output_preset_route_device_link_match, list_preset_names, load_mini_eq_preset_file, preset_path_for_name, @@ -31,6 +32,7 @@ set_output_preset_link, write_mini_eq_preset_file, ) +from .diagnostics import describe_output_preset_target, trace_startup_event from .window_utils import requested_switch_state, set_accessible_label, set_switch_confirmed_state APO_IMPORT_LABEL_PREFIX = "Imported APO: " @@ -107,6 +109,11 @@ def output_preset_link_key(self, target=None) -> str: except Exception: return getattr(self.controller, "output_sink", "") or "" + def remember_output_preset_target(self, target=None) -> None: + remember_target = getattr(self.controller, "remember_output_preset_target", None) + if callable(remember_target): + remember_target(target) + def output_preset_has_route(self, target=None) -> bool: return bool(getattr(target, "has_route_key", False)) @@ -155,10 +162,24 @@ def update_output_scope_state(self, target=None) -> None: def output_preset_link_name(self) -> str | None: try: target = self.output_preset_target() - return get_output_preset_link(self.output_preset_keys(target)) + match = self.output_preset_link_match(target) + return match[1] if match is not None else None except Exception: return None + def output_preset_link_match(self, target=None) -> tuple[str, str] | None: + match = get_output_preset_link_match(self.output_preset_keys(target)) + if match is not None: + return match + + if target is None or getattr(target, "route", None) is not None: + return None + + return get_output_preset_route_device_link_match( + getattr(target, "device_name", None), + getattr(target, "route_device", 0), + ) + def fallback_preset_name(self) -> str | None: try: return get_output_preset_fallback_name() @@ -503,7 +524,7 @@ def sync_output_preset_switch( switch.set_tooltip_text(tooltip) try: - linked_preset = get_output_preset_link(self.output_preset_keys(target)) + link_match = self.output_preset_link_match(target) except Exception as exc: sync_output_preset_switch( active=False, @@ -514,6 +535,7 @@ def sync_output_preset_switch( ) return + linked_preset = link_match[1] if link_match is not None else None has_output = bool(self.controller.output_sink) current_signature = self.controller.state_signature() has_named_preset = self.current_preset_name is not None @@ -922,15 +944,51 @@ def apply_output_preset_for_current_output( reset_auto_preset_without_link: bool = False, announce_no_output_preset: bool = False, ) -> bool: + current_preset_before = getattr(self, "current_preset_name", None) + auto_loaded_before = bool(getattr(self, "output_preset_curve_auto_loaded", False)) + auto_applied_before = bool(getattr(self, "output_preset_auto_applied", False)) target = self.output_preset_target() + self.remember_output_preset_target(target) + keys = self.output_preset_keys(target) + trace_startup_event( + "output-preset-apply-start", + announce_no_output_preset=announce_no_output_preset, + auto_applied_before=auto_applied_before, + auto_loaded_before=auto_loaded_before, + current_preset_before=current_preset_before, + output_sink=getattr(self.controller, "output_sink", None), + reset_auto_preset_without_link=reset_auto_preset_without_link, + target=describe_output_preset_target(target), + target_keys=keys, + ) try: - linked_preset = get_output_preset_link(self.output_preset_keys(target)) + link_match = self.output_preset_link_match(target) except Exception as exc: self.update_preset_state() self.set_status(str(exc)) + trace_startup_event( + "output-preset-link-error", + current_preset_after=getattr(self, "current_preset_name", None), + error=str(exc), + output_sink=getattr(self.controller, "output_sink", None), + target=describe_output_preset_target(target), + target_keys=keys, + ) self.notify_control_state_changed() return True + linked_key = link_match[0] if link_match is not None else None + linked_preset = link_match[1] if link_match is not None else None + trace_startup_event( + "output-preset-link-selected", + current_preset_before=current_preset_before, + linked_key=linked_key, + linked_preset=linked_preset, + output_sink=getattr(self.controller, "output_sink", None), + target=describe_output_preset_target(target), + target_keys=keys, + ) + if not linked_preset: if self.has_unsaved_curve_changes(): self.output_preset_auto_applied = False @@ -938,6 +996,14 @@ def apply_output_preset_for_current_output( self.update_preset_state() if announce_no_output_preset: self.set_status("Current curve kept") + trace_startup_event( + "output-preset-no-link-unsaved-kept", + announce_no_output_preset=announce_no_output_preset, + current_preset_after=getattr(self, "current_preset_name", None), + output_sink=getattr(self.controller, "output_sink", None), + target=describe_output_preset_target(target), + target_keys=keys, + ) self.notify_control_state_changed() return announce_no_output_preset @@ -958,17 +1024,47 @@ def apply_output_preset_for_current_output( self.output_preset_curve_auto_loaded = False self.update_preset_state() self.set_status("Fallback preset unavailable") + trace_startup_event( + "output-preset-fallback-unavailable", + current_preset_after=getattr(self, "current_preset_name", None), + fallback_preset=default_preset, + output_sink=getattr(self.controller, "output_sink", None), + target=describe_output_preset_target(target), + target_keys=keys, + ) self.notify_control_state_changed() else: + trace_startup_event( + "output-preset-fallback-applied", + current_preset_after=getattr(self, "current_preset_name", None), + fallback_preset=default_preset, + output_sink=getattr(self.controller, "output_sink", None), + target=describe_output_preset_target(target), + target_keys=keys, + ) return True if reset_auto_preset_without_link: self.reset_curve_to_neutral("Unmatched output bypassed") + trace_startup_event( + "output-preset-no-link-reset-neutral", + current_preset_after=getattr(self, "current_preset_name", None), + output_sink=getattr(self.controller, "output_sink", None), + target=describe_output_preset_target(target), + target_keys=keys, + ) return True self.output_preset_auto_applied = False self.output_preset_curve_auto_loaded = False self.update_preset_state() + trace_startup_event( + "output-preset-no-link", + current_preset_after=getattr(self, "current_preset_name", None), + output_sink=getattr(self.controller, "output_sink", None), + target=describe_output_preset_target(target), + target_keys=keys, + ) self.notify_control_state_changed() return False @@ -977,6 +1073,14 @@ def apply_output_preset_for_current_output( self.output_preset_curve_auto_loaded = False self.update_preset_state() self.set_status("Current curve kept") + trace_startup_event( + "output-preset-linked-unsaved-kept", + current_preset_after=getattr(self, "current_preset_name", None), + linked_preset=linked_preset, + output_sink=getattr(self.controller, "output_sink", None), + target=describe_output_preset_target(target), + target_keys=keys, + ) self.notify_control_state_changed() return True @@ -987,11 +1091,27 @@ def apply_output_preset_for_current_output( self.output_preset_curve_auto_loaded = False self.update_preset_state() self.set_status("Auto preset unavailable") + trace_startup_event( + "output-preset-linked-unavailable", + current_preset_after=getattr(self, "current_preset_name", None), + linked_preset=linked_preset, + output_sink=getattr(self.controller, "output_sink", None), + target=describe_output_preset_target(target), + target_keys=keys, + ) self.notify_control_state_changed() return True self.output_preset_auto_applied = True self.update_output_preset_state() + trace_startup_event( + "output-preset-linked-applied", + current_preset_after=getattr(self, "current_preset_name", None), + linked_preset=linked_preset, + output_sink=getattr(self.controller, "output_sink", None), + target=describe_output_preset_target(target), + target_keys=keys, + ) self.notify_control_state_changed() return True @@ -1200,7 +1320,11 @@ def on_bypass_unmatched_outputs_clicked(self, _button: Gtk.Widget) -> None: def on_clear_output_preset_link_clicked(self, _button: Gtk.Widget) -> None: target = self.output_preset_target() try: - removed = clear_output_preset_link(self.output_preset_keys(target)) + keys = list(self.output_preset_keys(target)) + link_match = self.output_preset_link_match(target) + if link_match is not None and link_match[0] not in keys: + keys.insert(0, link_match[0]) + removed = clear_output_preset_link(keys) self.output_preset_auto_applied = False self.output_preset_curve_auto_loaded = False self.update_preset_state() diff --git a/tests/test_mini_eq_core.py b/tests/test_mini_eq_core.py index 3c571a9..d59f16e 100644 --- a/tests/test_mini_eq_core.py +++ b/tests/test_mini_eq_core.py @@ -126,6 +126,49 @@ def test_output_preset_link_uses_first_matching_output_key(monkeypatch: pytest.M assert core.get_output_preset_link(("missing", "alsa_output.speakers")) == "Speakers" +def test_output_preset_route_key_parser_decodes_device_route_identity() -> None: + route_key = "pipewire-route:v1:device=alsa_card.usb-Generic_USB_Audio-00;route=%5BOut%5D%20Speaker;route-device=11" + + assert core.parse_output_preset_route_key(route_key) == { + "device": "alsa_card.usb-Generic_USB_Audio-00", + "route": "[Out] Speaker", + "route_device": 11, + } + assert core.parse_output_preset_route_key("alsa_output.speakers") is None + assert core.parse_output_preset_route_key("pipewire-route:v1:device=card;route=headphones") is None + + +def test_output_preset_route_device_link_matches_single_saved_route(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None: + monkeypatch.setattr(core, "OUTPUT_PRESET_LINKS_PATH", tmp_path / "output-presets.json") + route_key = "pipewire-route:v1:device=alsa_card.usb-Generic_USB_Audio-00;route=%5BOut%5D%20Speaker;route-device=11" + core.write_output_preset_links( + { + "alsa_output.speakers": "Output Wide", + route_key: "Speakers", + } + ) + + assert core.get_output_preset_route_device_link_match("alsa_card.usb-Generic_USB_Audio-00", 11) == ( + route_key, + "Speakers", + ) + assert core.get_output_preset_route_device_link_match("alsa_card.usb-Generic_USB_Audio-00", 12) is None + + +def test_output_preset_route_device_link_does_not_guess_between_saved_routes( + monkeypatch: pytest.MonkeyPatch, tmp_path +) -> None: + monkeypatch.setattr(core, "OUTPUT_PRESET_LINKS_PATH", tmp_path / "output-presets.json") + core.write_output_preset_links( + { + "pipewire-route:v1:device=alsa_card.test;route=analog-output-a;route-device=8": "A", + "pipewire-route:v1:device=alsa_card.test;route=analog-output-b;route-device=8": "B", + } + ) + + assert core.get_output_preset_route_device_link_match("alsa_card.test", 8) is None + + def test_output_preset_links_missing_file_returns_empty(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None: monkeypatch.setattr(core, "OUTPUT_PRESET_LINKS_PATH", tmp_path / "missing.json") diff --git a/tests/test_mini_eq_diagnostics.py b/tests/test_mini_eq_diagnostics.py new file mode 100644 index 0000000..3ad92ce --- /dev/null +++ b/tests/test_mini_eq_diagnostics.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import json +from types import SimpleNamespace + +from tests._mini_eq_imports import import_mini_eq_module + +diagnostics = import_mini_eq_module("diagnostics") + + +def test_startup_trace_is_disabled_by_default(monkeypatch, tmp_path) -> None: + trace_path = tmp_path / "state" / "mini-eq" / "startup-trace.log" + monkeypatch.delenv(diagnostics.STARTUP_TRACE_ENV, raising=False) + monkeypatch.setattr(diagnostics, "startup_trace_path", lambda: trace_path) + + diagnostics.trace_startup_event("startup-ready-begin", output_sink="speakers") + + assert not trace_path.exists() + + +def test_startup_trace_writes_json_lines_when_enabled(monkeypatch, tmp_path) -> None: + trace_path = tmp_path / "state" / "mini-eq" / "startup-trace.log" + monkeypatch.setenv(diagnostics.STARTUP_TRACE_ENV, "1") + monkeypatch.setattr(diagnostics, "startup_trace_path", lambda: trace_path) + + diagnostics.trace_startup_event( + "output-preset-apply-start", + output_sink="speakers", + target_keys=("route-key", "speakers"), + ) + + record = json.loads(trace_path.read_text(encoding="utf-8")) + assert record["event"] == "output-preset-apply-start" + assert record["output_sink"] == "speakers" + assert record["target_keys"] == ["route-key", "speakers"] + assert record["timestamp"].endswith("Z") + + +def test_describe_output_preset_target_records_route_identity() -> None: + route = SimpleNamespace( + description="Speakers", + device_name="alsa_card.test", + name="analog-output-speaker", + output_preset_key="pipewire-route:v1:speakers", + route_device=11, + ) + target = SimpleNamespace( + device_name="alsa_card.test", + output_key="alsa_output.speakers", + route=route, + route_device=11, + keys=("pipewire-route:v1:speakers", "alsa_output.speakers"), + link_key="pipewire-route:v1:speakers", + has_route_key=True, + ) + + description = diagnostics.describe_output_preset_target(target) + + assert description == { + "device_name": "alsa_card.test", + "has_route_key": True, + "keys": ["pipewire-route:v1:speakers", "alsa_output.speakers"], + "link_key": "pipewire-route:v1:speakers", + "output_key": "alsa_output.speakers", + "route": { + "description": "Speakers", + "device_name": "alsa_card.test", + "name": "analog-output-speaker", + "output_preset_key": "pipewire-route:v1:speakers", + "route_device": 11, + }, + "route_device": 11, + } diff --git a/tests/test_mini_eq_output_presets.py b/tests/test_mini_eq_output_presets.py index a8b7800..e414d7b 100644 --- a/tests/test_mini_eq_output_presets.py +++ b/tests/test_mini_eq_output_presets.py @@ -1,9 +1,11 @@ from __future__ import annotations +import json from types import SimpleNamespace from tests._mini_eq_imports import core, import_mini_eq_module, routing +diagnostics = import_mini_eq_module("diagnostics") window = import_mini_eq_module("window") window_presets = import_mini_eq_module("window_presets") @@ -96,6 +98,33 @@ def set_sensitive(self, sensitive: bool) -> None: self.sensitive = sensitive +class FakeOutputTransitionController(SimpleNamespace): + def __init__( + self, + *, + output_sink: str, + observed_identity: str | None, + current_identity: str, + follow_default_output: bool = True, + ) -> None: + super().__init__( + output_sink=output_sink, + follow_default_output=follow_default_output, + get_default_output_sink_name=lambda: output_sink, + get_sink=lambda _sink_name: None, + ) + self.observed_identity = observed_identity + self.current_identity = current_identity + self.transition_consumes: list[bool] = [] + + def output_preset_target_transition(self, *, consume: bool = True) -> SimpleNamespace: + self.transition_consumes.append(consume) + changed = self.observed_identity is not None and self.observed_identity != self.current_identity + if consume or self.observed_identity is None: + self.observed_identity = self.current_identity + return SimpleNamespace(changed=changed) + + class FakeDeleteDialog: def __init__(self, response: str = "delete") -> None: self.response = response @@ -605,6 +634,88 @@ def test_initial_output_preset_auto_loads_linked_preset(monkeypatch, tmp_path) - assert test_window.output_preset_switch.active is True +def test_output_preset_auto_load_trace_records_linked_decision(monkeypatch, tmp_path) -> None: + monkeypatch.setenv(diagnostics.STARTUP_TRACE_ENV, "1") + trace_path = tmp_path / "state" / "mini-eq" / "startup-trace.log" + monkeypatch.setattr(diagnostics, "startup_trace_path", lambda: trace_path) + monkeypatch.setattr(core, "PRESET_STORAGE_DIR", tmp_path / "presets") + monkeypatch.setattr(core, "OUTPUT_PRESET_LINKS_PATH", tmp_path / "output-presets.json") + write_test_preset("Headphones", 2.5) + core.set_output_preset_link("alsa_output.headphones", "Headphones") + controller = make_controller() + test_window = OutputPresetWindow(controller) + + assert test_window.apply_output_preset_for_current_output() is True + + events = [json.loads(line) for line in trace_path.read_text(encoding="utf-8").splitlines()] + assert [event["event"] for event in events] == [ + "output-preset-apply-start", + "output-preset-link-selected", + "output-preset-linked-applied", + ] + assert events[0]["output_sink"] == "alsa_output.headphones" + assert events[0]["target_keys"] == ["alsa_output.headphones"] + assert events[1]["linked_preset"] == "Headphones" + assert events[2]["current_preset_after"] == "Headphones" + + +def test_auto_apply_remembers_route_identity_for_followup_refresh(monkeypatch, tmp_path) -> None: + monkeypatch.setattr(core, "PRESET_STORAGE_DIR", tmp_path / "presets") + monkeypatch.setattr(core, "OUTPUT_PRESET_LINKS_PATH", tmp_path / "output-presets.json") + write_test_preset("Headphones", 2.5) + route_key = "pipewire-route:v1:device=alsa_card.test;route=analog-output-headphones;route-device=8" + core.set_output_preset_link(route_key, "Headphones") + controller = make_controller("alsa_output.internal") + route = SimpleNamespace( + description="Headphones", + name="analog-output-headphones", + output_preset_key=route_key, + ) + target = SimpleNamespace( + output_key=controller.output_sink, + route=route, + keys=(route_key, controller.output_sink), + link_key=route_key, + has_route_key=True, + ) + controller.output_preset_target = lambda *, refresh=False: target + test_window = OutputPresetWindow(controller) + + assert test_window.apply_output_preset_for_current_output() is True + + assert test_window.current_preset_name == "Headphones" + assert test_window.output_preset_curve_auto_loaded is True + assert controller._observed_output_preset_target_snapshot.sink_name == controller.output_sink + assert controller._observed_output_preset_target_snapshot.identity == route_key + + output_wide_target = SimpleNamespace( + output_key=controller.output_sink, + route=None, + keys=(controller.output_sink,), + link_key=controller.output_sink, + has_route_key=False, + ) + controller.output_preset_target = lambda *, refresh=False: output_wide_target + controller.follow_default_output = True + test_window.ui_shutting_down = False + test_window.startup_ready = True + test_window.list_visible_output_sinks = lambda: [] + test_window.build_output_sink_labels = lambda _sinks: [] + test_window.follow_default_output_label = lambda: "Follow system output" + test_window.output_sink_names = [] + test_window.output_sink_labels = [] + test_window.output_sink_model = FakeModel() + test_window.output_combo = FakeCombo() + test_window.update_info_label = lambda: None + test_window.update_status_summary = lambda: None + + window.MiniEqWindow.refresh_output_sinks(test_window) + + assert test_window.current_preset_name is None + assert controller.state_signature() == controller.default_state_signature() + assert test_window.statuses[-1] == "Unmatched output bypassed" + + def test_output_preset_auto_apply_protects_unsaved_edits(monkeypatch, tmp_path) -> None: monkeypatch.setattr(core, "PRESET_STORAGE_DIR", tmp_path / "presets") monkeypatch.setattr(core, "OUTPUT_PRESET_LINKS_PATH", tmp_path / "output-presets.json") @@ -678,6 +789,66 @@ def test_fallback_preset_loads_for_initial_unlinked_output(monkeypatch, tmp_path assert test_window.statuses[-1] == "Fallback preset applied" +def test_auto_apply_uses_saved_route_device_link_when_route_key_is_missing(monkeypatch, tmp_path) -> None: + monkeypatch.setattr(core, "PRESET_STORAGE_DIR", tmp_path / "presets") + monkeypatch.setattr(core, "OUTPUT_PRESET_LINKS_PATH", tmp_path / "output-presets.json") + speaker_route_key = ( + "pipewire-route:v1:device=alsa_card.usb-Generic_USB_Audio-00;route=%5BOut%5D%20Speaker;route-device=11" + ) + write_test_preset("Speakers Profile", -1.5) + write_test_preset("Headset Profile", 2.5) + core.set_output_preset_link(speaker_route_key, "Speakers Profile") + controller = make_controller("alsa_output.usb-Generic_USB_Audio-00.HiFi__Speaker__sink") + target = SimpleNamespace( + output_key=controller.output_sink, + route=None, + keys=(controller.output_sink,), + link_key=controller.output_sink, + has_route_key=False, + device_name="alsa_card.usb-Generic_USB_Audio-00", + route_device=11, + ) + controller.output_preset_target = lambda *, refresh=False: target + test_window = OutputPresetWindow(controller) + test_window.refresh_preset_list() + test_window.load_library_preset("Headset Profile") + + assert test_window.apply_output_preset_for_current_output() is True + + assert test_window.current_preset_name == "Speakers Profile" + assert controller.bands[0].gain_db == -1.5 + assert test_window.output_preset_auto_applied is True + assert test_window.output_preset_curve_auto_loaded is True + assert test_window.statuses[-1] == "Auto preset applied" + + +def test_clear_output_preset_removes_recovered_route_device_link(monkeypatch, tmp_path) -> None: + monkeypatch.setattr(core, "PRESET_STORAGE_DIR", tmp_path / "presets") + monkeypatch.setattr(core, "OUTPUT_PRESET_LINKS_PATH", tmp_path / "output-presets.json") + speaker_route_key = ( + "pipewire-route:v1:device=alsa_card.usb-Generic_USB_Audio-00;route=%5BOut%5D%20Speaker;route-device=11" + ) + write_test_preset("Speakers Profile", -1.5) + core.set_output_preset_link(speaker_route_key, "Speakers Profile") + controller = make_controller("alsa_output.usb-Generic_USB_Audio-00.HiFi__Speaker__sink") + target = SimpleNamespace( + output_key=controller.output_sink, + route=None, + keys=(controller.output_sink,), + link_key=controller.output_sink, + has_route_key=False, + device_name="alsa_card.usb-Generic_USB_Audio-00", + route_device=11, + ) + controller.output_preset_target = lambda *, refresh=False: target + test_window = OutputPresetWindow(controller) + + test_window.on_clear_output_preset_link_clicked(FakeButton()) + + assert core.get_output_preset_link(speaker_route_key) is None + assert test_window.statuses[-1] == "Auto preset cleared" + + def test_missing_fallback_preset_reports_unavailable(monkeypatch, tmp_path) -> None: monkeypatch.setattr(core, "PRESET_STORAGE_DIR", tmp_path / "presets") monkeypatch.setattr(core, "OUTPUT_PRESET_LINKS_PATH", tmp_path / "output-presets.json") @@ -794,7 +965,7 @@ def test_output_preset_actions_use_route_key_when_available(monkeypatch, tmp_pat link_key=route_key, has_route_key=True, ) - controller.output_preset_target = lambda: target + controller.output_preset_target = lambda *, refresh=False: target controller.output_preset_keys = lambda: (route_key, controller.output_sink) controller.output_preset_link_key = lambda: route_key test_window = OutputPresetWindow(controller) @@ -1170,13 +1341,11 @@ def test_pipewire_observed_output_change_runs_output_preset_handling() -> None: calls: list[object] = [] fake_window = SimpleNamespace( ui_shutting_down=False, - controller=SimpleNamespace( + controller=FakeOutputTransitionController( output_sink="alsa_output.usb", - follow_default_output=True, - get_default_output_sink_name=lambda: "alsa_output.usb", - get_sink=lambda _sink_name: None, + observed_identity="alsa_output.speakers", + current_identity="alsa_output.usb", ), - last_output_preset_sink_name="alsa_output.speakers", output_preset_auto_applied=True, output_preset_curve_auto_loaded=True, startup_ready=True, @@ -1202,7 +1371,8 @@ def test_pipewire_observed_output_change_runs_output_preset_handling() -> None: "summary", ("auto", {"reset_auto_preset_without_link": True, "announce_no_output_preset": True}), ] - assert fake_window.last_output_preset_sink_name == "alsa_output.usb" + assert fake_window.controller.observed_identity == "alsa_output.usb" + assert fake_window.controller.transition_consumes == [True] def test_pipewire_observed_port_scope_change_runs_output_preset_handling() -> None: @@ -1215,14 +1385,11 @@ def test_pipewire_observed_port_scope_change_runs_output_preset_handling() -> No ) fake_window = SimpleNamespace( ui_shutting_down=False, - controller=SimpleNamespace( + controller=FakeOutputTransitionController( output_sink="alsa_output.internal", - follow_default_output=True, - get_default_output_sink_name=lambda: "alsa_output.internal", - get_sink=lambda _sink_name: None, + observed_identity=old_route_key, + current_identity=new_route_key, ), - last_output_preset_sink_name="alsa_output.internal", - last_output_preset_target_identity=old_route_key, output_preset_auto_applied=True, output_preset_curve_auto_loaded=True, startup_ready=True, @@ -1249,21 +1416,20 @@ def test_pipewire_observed_port_scope_change_runs_output_preset_handling() -> No "summary", ("auto", {"reset_auto_preset_without_link": True, "announce_no_output_preset": True}), ] - assert fake_window.last_output_preset_sink_name == "alsa_output.internal" - assert fake_window.last_output_preset_target_identity == new_route_key + assert fake_window.controller.observed_identity == new_route_key + assert fake_window.controller.transition_consumes == [True] def test_manual_output_refresh_updates_selector_without_handling_observed_output_change() -> None: calls: list[object] = [] fake_window = SimpleNamespace( ui_shutting_down=False, - controller=SimpleNamespace( + controller=FakeOutputTransitionController( output_sink="alsa_output.usb", + observed_identity="alsa_output.speakers", + current_identity="alsa_output.usb", follow_default_output=False, - get_default_output_sink_name=lambda: "alsa_output.usb", - get_sink=lambda _sink_name: None, ), - last_output_preset_sink_name="alsa_output.speakers", output_preset_auto_applied=True, output_preset_curve_auto_loaded=False, startup_ready=True, @@ -1287,7 +1453,8 @@ def test_manual_output_refresh_updates_selector_without_handling_observed_output ) assert calls == ["preset-state", "info", "summary"] - assert fake_window.last_output_preset_sink_name == "alsa_output.speakers" + assert fake_window.controller.observed_identity == "alsa_output.speakers" + assert fake_window.controller.transition_consumes == [False] calls.clear() window.MiniEqWindow.refresh_output_sinks(fake_window) @@ -1298,7 +1465,8 @@ def test_manual_output_refresh_updates_selector_without_handling_observed_output "summary", ("auto", {"reset_auto_preset_without_link": False, "announce_no_output_preset": True}), ] - assert fake_window.last_output_preset_sink_name == "alsa_output.usb" + assert fake_window.controller.observed_identity == "alsa_output.usb" + assert fake_window.controller.transition_consumes == [False, True] def test_missing_manual_output_stays_visible_in_selector() -> None: @@ -1306,13 +1474,12 @@ def test_missing_manual_output_stays_visible_in_selector() -> None: visible_sink = SimpleNamespace(node_name="alsa_output.usb") fake_window = SimpleNamespace( ui_shutting_down=False, - controller=SimpleNamespace( + controller=FakeOutputTransitionController( output_sink="alsa_output.missing", + observed_identity=None, + current_identity="alsa_output.missing", follow_default_output=False, - get_default_output_sink_name=lambda: "alsa_output.usb", - get_sink=lambda _sink_name: None, ), - last_output_preset_sink_name=None, output_preset_auto_applied=False, output_preset_curve_auto_loaded=False, startup_ready=True, diff --git a/tests/test_mini_eq_pipewire_backend.py b/tests/test_mini_eq_pipewire_backend.py index 7f2f85c..f367189 100644 --- a/tests/test_mini_eq_pipewire_backend.py +++ b/tests/test_mini_eq_pipewire_backend.py @@ -911,7 +911,9 @@ def test_output_preset_keys_prefer_matching_active_route(monkeypatch: pytest.Mon monkeypatch.setattr(backend, "audio_sink_by_name", lambda _name: sink) monkeypatch.setattr(backend, "_device_proxy_by_bound_id", lambda _bound_id: object()) - monkeypatch.setattr(backend, "_enumerate_device_routes", lambda _device, _bound_id: [line_out, headphones]) + monkeypatch.setattr( + backend, "_enumerate_device_routes", lambda _device, _bound_id, _param_name="Route": [line_out, headphones] + ) assert backend.output_preset_keys_for_sink_name("alsa_output.test") == ( "pipewire-route:v1:device=alsa_card.test;route=analog-output-headphones;route-device=8", @@ -950,7 +952,7 @@ def test_output_preset_keys_use_single_route_even_when_profile_device_is_stale( monkeypatch.setattr(backend, "audio_sink_by_name", lambda _name: sink) monkeypatch.setattr(backend, "_device_proxy_by_bound_id", lambda _bound_id: object()) - monkeypatch.setattr(backend, "_enumerate_device_routes", lambda _device, _bound_id: [speakers]) + monkeypatch.setattr(backend, "_enumerate_device_routes", lambda _device, _bound_id, _param_name="Route": [speakers]) assert backend.output_preset_keys_for_sink_name("alsa_output.test") == ( "pipewire-route:v1:device=alsa_card.test;route=analog-output-speaker;route-device=6", @@ -1001,7 +1003,9 @@ def test_output_preset_keys_do_not_guess_between_routes_sharing_profile_device( monkeypatch.setattr(backend, "audio_sink_by_name", lambda _name: sink) monkeypatch.setattr(backend, "_device_proxy_by_bound_id", lambda _bound_id: object()) - monkeypatch.setattr(backend, "_enumerate_device_routes", lambda _device, _bound_id: [speakers, headphones]) + monkeypatch.setattr( + backend, "_enumerate_device_routes", lambda _device, _bound_id, _param_name="Route": [speakers, headphones] + ) assert backend.output_preset_keys_for_sink_name("alsa_output.test") == ("alsa_output.test",) @@ -1026,6 +1030,136 @@ def test_output_preset_keys_fall_back_to_sink_name_without_route_api(monkeypatch assert backend.output_preset_keys_for_sink_name("alsa_output.test") == ("alsa_output.test",) +def test_output_preset_keys_use_enum_route_when_active_route_is_missing_and_label_matches( + monkeypatch: pytest.MonkeyPatch, +) -> None: + class FakeParam: + def __init__( + self, + name: str, + *, + route_device: int, + route_name: str, + description: str, + ) -> None: + self.name = name + self.route_device = route_device + self.route_name = route_name + self.description = description + + def dup_name(self) -> str: + return self.name + + class FakeParamInfo: + def __init__(self, param_id: int, name: str) -> None: + self.param_id = param_id + self.name = name + + def get_id(self) -> int: + return self.param_id + + def dup_name(self) -> str: + return self.name + + class FakeModel: + def __init__(self, items: list[object]) -> None: + self.items = items + + def get_n_items(self) -> int: + return len(self.items) + + def get_item(self, index: int) -> object: + return self.items[index] + + class FakeDevice: + def __init__(self) -> None: + self.param_infos = FakeModel([FakeParamInfo(13, "Route"), FakeParamInfo(14, "EnumRoute")]) + self.enum_calls: list[int] = [] + + def get_param_infos(self) -> FakeModel: + return self.param_infos + + def enum_params_sync(self, param_id: int, _start: int, _num: int, _timeout_ms: int) -> FakeModel: + self.enum_calls.append(param_id) + if param_id == 13: + return FakeModel([]) + if param_id == 14: + return FakeModel( + [ + FakeParam( + "EnumRoute", + route_device=11, + route_name="[Out] Speaker", + description="Speakers", + ), + FakeParam( + "EnumRoute", + route_device=12, + route_name="[Out] Headset", + description="Headset", + ), + ] + ) + raise AssertionError(f"unexpected param id: {param_id}") + + class FakeRouteInfo: + def __init__(self, param: FakeParam) -> None: + self.param = param + + def get_index(self) -> int: + return 1 + + def get_device(self) -> int: + return self.param.route_device + + def get_profile(self) -> int: + return 0 + + def get_priority(self) -> int: + return 200 + + def dup_direction(self) -> str: + return "output" + + def dup_name(self) -> str: + return self.param.route_name + + def dup_description(self) -> str: + return self.param.description + + def dup_availability(self) -> str: + return "yes" + + def get_info(self) -> dict[str, str]: + return {} + + backend = pw_backend.PipeWireBackend() + backend._Pwg = SimpleNamespace(Device=FakeDeviceApi, RouteInfo=SimpleNamespace(new_from_param=FakeRouteInfo)) + sink_name = "alsa_output.usb-Generic_USB_Audio-00.HiFi__Speaker__sink" + sink = pw_backend.PipeWireNode( + bound_id=39, + object_serial="67", + media_class=pw_backend.AUDIO_SINK, + node_name=sink_name, + node_description=None, + application_name=None, + node_dont_move=False, + device_id=72, + card_profile_device=0, + ) + device = FakeDevice() + + monkeypatch.setattr(backend, "audio_sink_by_name", lambda _name: sink) + monkeypatch.setattr(backend, "_device_proxy_by_bound_id", lambda _bound_id: device) + monkeypatch.setattr(backend, "_device_name_by_bound_id", lambda _bound_id: "alsa_card.usb-Generic_USB_Audio-00") + + assert backend.output_preset_keys_for_sink_name(sink_name) == ( + "pipewire-route:v1:device=alsa_card.usb-Generic_USB_Audio-00;route=%5BOut%5D%20Speaker;route-device=11", + sink_name, + ) + assert device.enum_calls == [13, 14] + + def test_enumerate_device_routes_ignores_enum_route_params(monkeypatch: pytest.MonkeyPatch) -> None: class FakeParam: def __init__(self, name: str, *, seq: int = 12, next_index: int = 0) -> None: diff --git a/tests/test_mini_eq_pipewire_routes.py b/tests/test_mini_eq_pipewire_routes.py index bb77706..623499b 100644 --- a/tests/test_mini_eq_pipewire_routes.py +++ b/tests/test_mini_eq_pipewire_routes.py @@ -1,5 +1,7 @@ from __future__ import annotations +from types import SimpleNamespace + from tests._mini_eq_imports import pipewire_routes as pw_routes @@ -39,3 +41,52 @@ def test_output_preset_target_falls_back_to_output_key_without_route() -> None: assert target.link_key == "alsa_output.test" assert target.has_route_key is False + + +def test_output_preset_target_records_sink_route_device_without_route_key() -> None: + class FakeRouteBackend(pw_routes.PipeWireRouteMixin): + def audio_sink_by_name(self, _sink_name: str): + return SimpleNamespace( + device_id=72, + card_profile_device=11, + properties={}, + ) + + def output_route_for_sink(self, _sink): + return None + + def _device_name_by_bound_id(self, _bound_id: int) -> str: + return "alsa_card.usb-Generic_USB_Audio-00" + + target = FakeRouteBackend().output_preset_target_for_sink_name( + "alsa_output.usb-Generic_USB_Audio-00.HiFi__Speaker__sink" + ) + + assert target.keys == ("alsa_output.usb-Generic_USB_Audio-00.HiFi__Speaker__sink",) + assert target.device_name == "alsa_card.usb-Generic_USB_Audio-00" + assert target.route_device == 11 + assert ( + target.route_device_identity + == "pipewire-route-device:v1:device=alsa_card.usb-Generic_USB_Audio-00;route-device=11" + ) + + +def test_route_matches_route_specific_ucm_sink_label() -> None: + route = pw_routes.PipeWireOutputRoute( + device_bound_id=72, + device_name="alsa_card.usb-Generic_USB_Audio-00", + index=1, + route_device=11, + profile=0, + priority=200, + direction="Output", + name="[Out] Speaker", + description="Speakers", + availability="yes", + ) + sink = SimpleNamespace( + node_name="alsa_output.usb-Generic_USB_Audio-00.HiFi__Speaker__sink", + node_description=None, + ) + + assert pw_routes.route_matches_sink_label(route, sink) is True diff --git a/tests/test_mini_eq_routing.py b/tests/test_mini_eq_routing.py index 1d2817b..c3b9bf8 100644 --- a/tests/test_mini_eq_routing.py +++ b/tests/test_mini_eq_routing.py @@ -137,6 +137,102 @@ def output_preset_target_for_sink_name(self, sink_name: str | None) -> pw_routes assert calls == ["speakers", "hdmi", "hdmi"] +def test_output_preset_target_transition_tracks_controller_snapshot() -> None: + controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) + calls: list[str | None] = [] + + class FakeBackend(FakeOutputBackend): + def output_preset_target_for_sink_name(self, sink_name: str | None) -> pw_routes.PipeWireOutputPresetTarget: + calls.append(sink_name) + return pw_routes.PipeWireOutputPresetTarget(sink_name, None, (sink_name,) if sink_name else ()) + + controller.output_backend = FakeBackend([make_node(1, "speakers"), make_node(2, "hdmi")]) + controller.output_sink = "speakers" + + first = routing.SystemWideEqController.output_preset_target_transition(controller) + + assert first.previous is None + assert first.current.identity == "speakers" + assert first.changed is False + assert controller._observed_output_preset_target_snapshot.identity == "speakers" + + controller.output_sink = "hdmi" + preview = routing.SystemWideEqController.output_preset_target_transition(controller, consume=False) + + assert preview.previous.identity == "speakers" + assert preview.current.identity == "hdmi" + assert preview.changed is True + assert controller._observed_output_preset_target_snapshot.identity == "speakers" + + consumed = routing.SystemWideEqController.output_preset_target_transition(controller) + + assert consumed.changed is True + assert controller._observed_output_preset_target_snapshot.identity == "hdmi" + + unchanged = routing.SystemWideEqController.output_preset_target_transition(controller) + + assert unchanged.changed is False + assert calls == ["speakers", "hdmi"] + + +def test_output_preset_target_identity_uses_route_device_when_route_key_is_missing() -> None: + target = pw_routes.PipeWireOutputPresetTarget( + "alsa_output.usb", + None, + ("alsa_output.usb",), + device_name="alsa_card.usb-Generic_USB_Audio-00", + route_device=11, + ) + + assert ( + routing.output_preset_target_identity(target, "alsa_output.usb") + == "pipewire-route-device:v1:device=alsa_card.usb-Generic_USB_Audio-00;route-device=11" + ) + + +def test_remember_output_preset_target_distinguishes_missing_target_from_omitted_target() -> None: + controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) + calls: list[str | None] = [] + + class FakeBackend(FakeOutputBackend): + def output_preset_target_for_sink_name(self, sink_name: str | None) -> pw_routes.PipeWireOutputPresetTarget: + calls.append(sink_name) + return pw_routes.PipeWireOutputPresetTarget(sink_name, None, (sink_name,) if sink_name else ()) + + controller.output_backend = FakeBackend([make_node(1, "speakers")]) + controller.output_sink = "speakers" + + routing.SystemWideEqController.remember_output_preset_target(controller, None) + + assert controller._observed_output_preset_target_snapshot.identity == "speakers" + assert controller._observed_output_preset_target_snapshot.target is None + assert calls == [] + + routing.SystemWideEqController.remember_output_preset_target(controller) + + assert controller._observed_output_preset_target_snapshot.identity == "speakers" + assert isinstance(controller._observed_output_preset_target_snapshot.target, pw_routes.PipeWireOutputPresetTarget) + assert calls == ["speakers"] + + +def test_output_preset_target_transition_falls_back_to_sink_when_target_lookup_fails() -> None: + controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) + + class FailingBackend(FakeOutputBackend): + def output_preset_target_for_sink_name(self, sink_name: str | None) -> pw_routes.PipeWireOutputPresetTarget: + raise RuntimeError(f"route lookup failed for {sink_name}") + + controller.output_backend = FailingBackend([make_node(1, "speakers")]) + controller.output_sink = "speakers" + + transition = routing.SystemWideEqController.output_preset_target_transition(controller) + + assert transition.current.identity == "speakers" + assert transition.current.target is None + assert transition.changed is False + assert controller._observed_output_preset_target_snapshot.identity == "speakers" + + def test_get_default_output_sink_name_uses_cached_metadata_by_default() -> None: controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) backend = FakeDefaultOutputBackend( @@ -664,6 +760,13 @@ def set_output_sink_name(self, sink_name: str, sink_description: str | None = No controller.output_backend = FakeBackend([make_node(2, "hdmi")]) controller.output_sink = "hdmi" + controller._output_preset_target_sink = "hdmi" + controller._output_preset_target = pw_routes.PipeWireOutputPresetTarget("hdmi", None, ("hdmi",)) + controller._observed_output_preset_target_snapshot = routing.OutputPresetTargetSnapshot( + "hdmi", + "old-route", + controller._output_preset_target, + ) controller.follow_default_output = True controller.running = True controller.filter_node_id = 42 @@ -681,6 +784,9 @@ def set_output_sink_name(self, sink_name: str, sink_description: str | None = No assert controller.output_sink == "hdmi" assert controller.follow_default_output is True + assert controller._output_preset_target_sink is None + assert controller._output_preset_target is None + assert controller._observed_output_preset_target_snapshot.identity == "old-route" assert calls == [ "route-param-monitor", ("router-target", "hdmi"), @@ -714,6 +820,8 @@ def set_output_sink_name(self, sink_name: str) -> None: controller.output_backend = FakeBackend([sink]) controller.output_sink = "hdmi" + controller._output_preset_target_sink = "hdmi" + controller._output_preset_target = pw_routes.PipeWireOutputPresetTarget("hdmi", None, ("hdmi",)) controller.follow_default_output = True controller.running = True controller.filter_node_id = 42 @@ -729,6 +837,8 @@ def set_output_sink_name(self, sink_name: str) -> None: routing.SystemWideEqController.switch_output_sink(controller, "hdmi", explicit=False) assert calls == ["route-param-monitor", ("router-target", "hdmi")] + assert controller._output_preset_target_sink is None + assert controller._output_preset_target is None def test_explicit_output_change_schedules_coalesced_output_refresh(monkeypatch: pytest.MonkeyPatch) -> None: diff --git a/tests/test_mini_eq_window.py b/tests/test_mini_eq_window.py index 5c02aa2..3875642 100644 --- a/tests/test_mini_eq_window.py +++ b/tests/test_mini_eq_window.py @@ -362,6 +362,7 @@ def route_system_audio(enabled: bool) -> None: controller.routed = enabled controller.route_system_audio = route_system_audio + controller.output_preset_target_transition = lambda: SimpleNamespace(changed=False) fake_window = SimpleNamespace( startup_ready_source_id=99, @@ -454,6 +455,7 @@ def route_system_audio(enabled: bool) -> None: controller.routed = enabled controller.route_system_audio = route_system_audio + controller.output_preset_target_transition = lambda: SimpleNamespace(changed=False) fake_window = SimpleNamespace( startup_auto_route_source_id=321, @@ -488,6 +490,59 @@ def route_system_audio(enabled: bool) -> None: ] +def test_startup_auto_route_reapplies_preset_when_followed_output_changes() -> None: + calls: list[object] = [] + controller = SimpleNamespace( + eq_enabled=True, + routed=False, + output_sink="alsa_output.headset", + ) + + def route_system_audio(enabled: bool) -> None: + calls.append(("route", enabled)) + controller.routed = enabled + controller.output_sink = "alsa_output.speakers" + + controller.route_system_audio = route_system_audio + controller.output_preset_target_transition = lambda: SimpleNamespace(changed=True) + + fake_window = SimpleNamespace( + startup_auto_route_source_id=0, + startup_auto_route_deadline_us=123, + ui_shutting_down=False, + auto_route_on_startup=True, + updating_ui=False, + output_preset_curve_auto_loaded=True, + route_switch=FakeSwitch(False), + bypass_switch=FakeSwitch(True), + controller=controller, + apply_output_preset_for_current_output=lambda **kwargs: calls.append(("output-preset", kwargs)) or True, + update_eq_power_indicator=lambda: calls.append(("power", fake_window.route_switch.get_active())), + update_info_label=lambda: calls.append(("info", fake_window.route_switch.get_active())), + update_status_summary=lambda: calls.append(("summary", fake_window.route_switch.get_active())), + update_focus_summary=lambda: calls.append("focus"), + set_status=lambda message: calls.append(("status", message)), + notify_control_state_changed=lambda: calls.append("notify"), + ) + bind_control_refresh_methods(fake_window) + + window.MiniEqWindow.apply_startup_auto_route(fake_window) + + assert fake_window.startup_auto_route_deadline_us == 0 + assert calls == [ + ("route", True), + ( + "output-preset", + {"reset_auto_preset_without_link": True, "announce_no_output_preset": True}, + ), + ("power", True), + ("info", True), + ("summary", True), + "focus", + "notify", + ] + + def test_startup_auto_route_retries_until_filter_chain_is_ready(monkeypatch) -> None: calls: list[object] = [] scheduled_callbacks = [] @@ -508,6 +563,7 @@ def timeout_add_seconds(interval_seconds: int, callback): return 777 controller.route_system_audio = route_system_audio + controller.output_preset_target_transition = lambda: SimpleNamespace(changed=False) monkeypatch.setattr(window.GLib, "get_monotonic_time", lambda: 1_000) monkeypatch.setattr(window.GLib, "timeout_add_seconds", timeout_add_seconds)