From fe937fc7e3d0b604510ce4d00103e8c0b304caee Mon Sep 17 00:00:00 2001 From: Erick Date: Sat, 30 May 2026 05:36:56 -0700 Subject: [PATCH 1/2] fix(bridge): detect silent pipeline init failure and add startup watchdog - Track pipelineReady separately from initialized; set only after all pipeline bus subscribers are wired (end of init()), not at entry. Fixes health().ok falsely reporting true while init() is hung. - Add 120s startup watchdog in daemon mode: if core.init() neither resolves nor rejects within the deadline, force-exit so the gateway respawns instead of running headless for hours with scoring dead. - Wrap ensureHubRuntimeStarted() hub.start() in a 30s timeout so a stalled hub connection can't block init() indefinitely. - Expose pipelineReady on GET /api/v1/health and GET /api/v1/ping so monitoring can distinguish "bridge alive" from "scoring alive". - Viewer healthStatus degrades to "degraded" when pipelineReady=false. Co-Authored-By: Claude Sonnet 4.6 --- .../agent-contract/memory-core.ts | 2 ++ apps/memos-local-plugin/bridge.cts | 33 +++++++++++++++++++ .../core/pipeline/memory-core.ts | 7 +++- .../server/routes/health.ts | 5 ++- .../viewer/src/stores/health.ts | 4 ++- 5 files changed, 48 insertions(+), 3 deletions(-) diff --git a/apps/memos-local-plugin/agent-contract/memory-core.ts b/apps/memos-local-plugin/agent-contract/memory-core.ts index 9c3ad826c..5566262bc 100644 --- a/apps/memos-local-plugin/agent-contract/memory-core.ts +++ b/apps/memos-local-plugin/agent-contract/memory-core.ts @@ -38,6 +38,8 @@ import type { LogRecord } from "./log-record.js"; export interface CoreHealth { ok: boolean; + /** True only after init() has fully wired all pipeline subscribers. */ + pipelineReady: boolean; version: string; uptimeMs: number; agent: AgentKind; diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index 4e7ea1ad4..40f2f76a3 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -404,6 +404,39 @@ async function main(): Promise { }; process.on("SIGINT", () => void shutdownDaemon("SIGINT")); process.on("SIGTERM", () => void shutdownDaemon("SIGTERM")); + + // Run core.init() in the background. A dirty-episode rescore can + // take minutes; keeping it async lets the HTTP server stay responsive + // to health probes throughout and prevents ensure_viewer_daemon() + // from timing out and spawning a replacement daemon. + // + // Watchdog: if core.init() neither resolves nor rejects within + // INIT_WATCHDOG_MS, the bridge appears healthy (HTTP serving) but the + // scoring pipeline is never wired. Force-exit so the gateway respawns + // a fresh bridge rather than silently dropping all scoring for hours. + const INIT_WATCHDOG_MS = 120_000; + void (async () => { + const result = await Promise.race([ + core.init().then(() => "done" as const), + new Promise<"timeout">((resolve) => { + const t = setTimeout(() => resolve("timeout"), INIT_WATCHDOG_MS); + (t as unknown as { unref?: () => void }).unref?.(); + }), + ]); + if (result === "timeout") { + process.stderr.write( + `bridge: daemon core.init watchdog: pipeline.ready not seen within ${INIT_WATCHDOG_MS}ms — restarting\n`, + ); + void shutdownDaemon("init.watchdog"); + } + })().catch((err) => { + process.stderr.write( + `bridge: daemon core.init error: ${err instanceof Error ? err.stack ?? err.message : String(err)}\n`, + ); + void shutdownDaemon("init.error"); + }); + + // Process stays alive via the HTTP server's ref'd socket. return; } diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 4974ee16d..6be3e316e 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -475,6 +475,7 @@ export function createMemoryCore( const log = rootLogger.child({ channel: "core.pipeline.memory-core" }); let telemetry = options.telemetry ?? null; let initialized = false; + let pipelineReady = false; let shutDown = false; /** Per-episode monotonic step counter for tool outcomes. */ const toolStepByEpisode = new Map(); @@ -844,7 +845,7 @@ export function createMemoryCore( if (!hubRuntime) { hubRuntime = makeHubRuntime(config); } - await hubRuntime.start(); + await withTimeout(hubRuntime.start(), 30_000, "hub_start_timeout"); } async function restartHubRuntime(config: ResolvedConfig): Promise { @@ -1130,6 +1131,9 @@ export function createMemoryCore( }, durationSince(result.startedAt, result.completedAt), ok); } }); + + pipelineReady = true; + log.info("pipeline.ready"); } async function recoverOpenEpisodesAsSessionEnd( @@ -1481,6 +1485,7 @@ export function createMemoryCore( return { ok: initialized && !shutDown, + pipelineReady, version: pkgVersion, uptimeMs: Date.now() - bootAt, agent: handle.agent, diff --git a/apps/memos-local-plugin/server/routes/health.ts b/apps/memos-local-plugin/server/routes/health.ts index 9dee2774c..301ac2f29 100644 --- a/apps/memos-local-plugin/server/routes/health.ts +++ b/apps/memos-local-plugin/server/routes/health.ts @@ -22,6 +22,9 @@ export function registerHealthRoutes(routes: Routes, deps: ServerDeps): void { }; return bridge ? { ...health, ...identity, bridge } : { ...health, ...identity }; }); - routes.set("GET /api/v1/ping", () => ({ ok: true, ...serviceIdentity, ts: Date.now() })); + routes.set("GET /api/v1/ping", async () => { + const health = await deps.core.health(); + return { ok: true, ...serviceIdentity, ts: Date.now(), pipelineReady: health.pipelineReady }; + }); void ({} as RouteContext); } diff --git a/apps/memos-local-plugin/viewer/src/stores/health.ts b/apps/memos-local-plugin/viewer/src/stores/health.ts index 48abbaf8c..189fe92ae 100644 --- a/apps/memos-local-plugin/viewer/src/stores/health.ts +++ b/apps/memos-local-plugin/viewer/src/stores/health.ts @@ -42,6 +42,8 @@ export interface ModelCallStatus { export interface HealthPayload { ok: boolean; + /** True only after core.init() has fully wired all pipeline subscribers. */ + pipelineReady?: boolean; version?: string; uptimeMs?: number; agent?: string; @@ -77,7 +79,7 @@ async function tick(): Promise { try { const data = await api.get("/api/v1/health"); health.value = data; - healthStatus.value = data.ok ? "ok" : "degraded"; + healthStatus.value = !data.ok ? "degraded" : data.pipelineReady === false ? "degraded" : "ok"; } catch { health.value = null; healthStatus.value = "down"; From feaa312426bcb39dd6a6e298bc8e115f00864e96 Mon Sep 17 00:00:00 2001 From: Erick Date: Sat, 30 May 2026 14:47:26 -0700 Subject: [PATCH 2/2] feat(config): make daemon init watchdog timeout configurable Add bridge.initWatchdogMs to the config schema (default: 120 000 ms, minimum: 30 000 ms). bridge.cts reads the value from config instead of using the hardcoded constant. Operators running large episode histories (many traces, per-step scoring) can raise the timeout to survive a long startup recovery without disabling the watchdog entirely. Co-Authored-By: Claude Sonnet 4.6 --- apps/memos-local-plugin/bridge.cts | 2 +- apps/memos-local-plugin/core/config/defaults.ts | 1 + apps/memos-local-plugin/core/config/schema.ts | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index 40f2f76a3..a1d8c538a 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -414,7 +414,7 @@ async function main(): Promise { // INIT_WATCHDOG_MS, the bridge appears healthy (HTTP serving) but the // scoring pipeline is never wired. Force-exit so the gateway respawns // a fresh bridge rather than silently dropping all scoring for hours. - const INIT_WATCHDOG_MS = 120_000; + const INIT_WATCHDOG_MS = config.bridge.initWatchdogMs; void (async () => { const result = await Promise.race([ core.init().then(() => "done" as const), diff --git a/apps/memos-local-plugin/core/config/defaults.ts b/apps/memos-local-plugin/core/config/defaults.ts index 1cf2d2cf6..fc2db1b8c 100644 --- a/apps/memos-local-plugin/core/config/defaults.ts +++ b/apps/memos-local-plugin/core/config/defaults.ts @@ -21,6 +21,7 @@ export const DEFAULT_CONFIG: ResolvedConfig = { bridge: { port: 18911, mode: "stdio", + initWatchdogMs: 120_000, }, embedding: { provider: "local", diff --git a/apps/memos-local-plugin/core/config/schema.ts b/apps/memos-local-plugin/core/config/schema.ts index 7c9ff193b..c69702bcd 100644 --- a/apps/memos-local-plugin/core/config/schema.ts +++ b/apps/memos-local-plugin/core/config/schema.ts @@ -28,6 +28,7 @@ const ViewerSchema = Type.Object({ const BridgeSchema = Type.Object({ port: NumberInRange(18911, 1, 65535), mode: Type.Union([Type.Literal("stdio"), Type.Literal("tcp")], { default: "stdio" }), + initWatchdogMs: NumberInRange(120_000, 30_000), }, { default: {} }); const EmbeddingSchema = Type.Object({