diff --git a/apps/memos-local-plugin/agent-contract/memory-core.ts b/apps/memos-local-plugin/agent-contract/memory-core.ts index 9c3ad826c..5566262bc 100644 --- a/apps/memos-local-plugin/agent-contract/memory-core.ts +++ b/apps/memos-local-plugin/agent-contract/memory-core.ts @@ -38,6 +38,8 @@ import type { LogRecord } from "./log-record.js"; export interface CoreHealth { ok: boolean; + /** True only after init() has fully wired all pipeline subscribers. */ + pipelineReady: boolean; version: string; uptimeMs: number; agent: AgentKind; diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index 4e7ea1ad4..a1d8c538a 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -404,6 +404,39 @@ async function main(): Promise { }; process.on("SIGINT", () => void shutdownDaemon("SIGINT")); process.on("SIGTERM", () => void shutdownDaemon("SIGTERM")); + + // Run core.init() in the background. A dirty-episode rescore can + // take minutes; keeping it async lets the HTTP server stay responsive + // to health probes throughout and prevents ensure_viewer_daemon() + // from timing out and spawning a replacement daemon. + // + // Watchdog: if core.init() neither resolves nor rejects within + // INIT_WATCHDOG_MS, the bridge appears healthy (HTTP serving) but the + // scoring pipeline is never wired. Force-exit so the gateway respawns + // a fresh bridge rather than silently dropping all scoring for hours. + const INIT_WATCHDOG_MS = config.bridge.initWatchdogMs; + void (async () => { + const result = await Promise.race([ + core.init().then(() => "done" as const), + new Promise<"timeout">((resolve) => { + const t = setTimeout(() => resolve("timeout"), INIT_WATCHDOG_MS); + (t as unknown as { unref?: () => void }).unref?.(); + }), + ]); + if (result === "timeout") { + process.stderr.write( + `bridge: daemon core.init watchdog: pipeline.ready not seen within ${INIT_WATCHDOG_MS}ms — restarting\n`, + ); + void shutdownDaemon("init.watchdog"); + } + })().catch((err) => { + process.stderr.write( + `bridge: daemon core.init error: ${err instanceof Error ? err.stack ?? err.message : String(err)}\n`, + ); + void shutdownDaemon("init.error"); + }); + + // Process stays alive via the HTTP server's ref'd socket. return; } diff --git a/apps/memos-local-plugin/core/config/defaults.ts b/apps/memos-local-plugin/core/config/defaults.ts index 1cf2d2cf6..fc2db1b8c 100644 --- a/apps/memos-local-plugin/core/config/defaults.ts +++ b/apps/memos-local-plugin/core/config/defaults.ts @@ -21,6 +21,7 @@ export const DEFAULT_CONFIG: ResolvedConfig = { bridge: { port: 18911, mode: "stdio", + initWatchdogMs: 120_000, }, embedding: { provider: "local", diff --git a/apps/memos-local-plugin/core/config/schema.ts b/apps/memos-local-plugin/core/config/schema.ts index 7c9ff193b..c69702bcd 100644 --- a/apps/memos-local-plugin/core/config/schema.ts +++ b/apps/memos-local-plugin/core/config/schema.ts @@ -28,6 +28,7 @@ const ViewerSchema = Type.Object({ const BridgeSchema = Type.Object({ port: NumberInRange(18911, 1, 65535), mode: Type.Union([Type.Literal("stdio"), Type.Literal("tcp")], { default: "stdio" }), + initWatchdogMs: NumberInRange(120_000, 30_000), }, { default: {} }); const EmbeddingSchema = Type.Object({ diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 4974ee16d..6be3e316e 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -475,6 +475,7 @@ export function createMemoryCore( const log = rootLogger.child({ channel: "core.pipeline.memory-core" }); let telemetry = options.telemetry ?? null; let initialized = false; + let pipelineReady = false; let shutDown = false; /** Per-episode monotonic step counter for tool outcomes. */ const toolStepByEpisode = new Map(); @@ -844,7 +845,7 @@ export function createMemoryCore( if (!hubRuntime) { hubRuntime = makeHubRuntime(config); } - await hubRuntime.start(); + await withTimeout(hubRuntime.start(), 30_000, "hub_start_timeout"); } async function restartHubRuntime(config: ResolvedConfig): Promise { @@ -1130,6 +1131,9 @@ export function createMemoryCore( }, durationSince(result.startedAt, result.completedAt), ok); } }); + + pipelineReady = true; + log.info("pipeline.ready"); } async function recoverOpenEpisodesAsSessionEnd( @@ -1481,6 +1485,7 @@ export function createMemoryCore( return { ok: initialized && !shutDown, + pipelineReady, version: pkgVersion, uptimeMs: Date.now() - bootAt, agent: handle.agent, diff --git a/apps/memos-local-plugin/server/routes/health.ts b/apps/memos-local-plugin/server/routes/health.ts index 9dee2774c..301ac2f29 100644 --- a/apps/memos-local-plugin/server/routes/health.ts +++ b/apps/memos-local-plugin/server/routes/health.ts @@ -22,6 +22,9 @@ export function registerHealthRoutes(routes: Routes, deps: ServerDeps): void { }; return bridge ? { ...health, ...identity, bridge } : { ...health, ...identity }; }); - routes.set("GET /api/v1/ping", () => ({ ok: true, ...serviceIdentity, ts: Date.now() })); + routes.set("GET /api/v1/ping", async () => { + const health = await deps.core.health(); + return { ok: true, ...serviceIdentity, ts: Date.now(), pipelineReady: health.pipelineReady }; + }); void ({} as RouteContext); } diff --git a/apps/memos-local-plugin/viewer/src/stores/health.ts b/apps/memos-local-plugin/viewer/src/stores/health.ts index 48abbaf8c..189fe92ae 100644 --- a/apps/memos-local-plugin/viewer/src/stores/health.ts +++ b/apps/memos-local-plugin/viewer/src/stores/health.ts @@ -42,6 +42,8 @@ export interface ModelCallStatus { export interface HealthPayload { ok: boolean; + /** True only after core.init() has fully wired all pipeline subscribers. */ + pipelineReady?: boolean; version?: string; uptimeMs?: number; agent?: string; @@ -77,7 +79,7 @@ async function tick(): Promise { try { const data = await api.get("/api/v1/health"); health.value = data; - healthStatus.value = data.ok ? "ok" : "degraded"; + healthStatus.value = !data.ok ? "degraded" : data.pipelineReady === false ? "degraded" : "ok"; } catch { health.value = null; healthStatus.value = "down";