Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/memos-local-plugin/agent-contract/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import type { LogRecord } from "./log-record.js";

export interface CoreHealth {
ok: boolean;
/** True only after init() has fully wired all pipeline subscribers. */
pipelineReady: boolean;
version: string;
uptimeMs: number;
agent: AgentKind;
Expand Down
33 changes: 33 additions & 0 deletions apps/memos-local-plugin/bridge.cts
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,39 @@ async function main(): Promise<void> {
};
process.on("SIGINT", () => void shutdownDaemon("SIGINT"));
process.on("SIGTERM", () => void shutdownDaemon("SIGTERM"));

// Run core.init() in the background. A dirty-episode rescore can
// take minutes; keeping it async lets the HTTP server stay responsive
// to health probes throughout and prevents ensure_viewer_daemon()
// from timing out and spawning a replacement daemon.
//
// Watchdog: if core.init() neither resolves nor rejects within
// INIT_WATCHDOG_MS, the bridge appears healthy (HTTP serving) but the
// scoring pipeline is never wired. Force-exit so the gateway respawns
// a fresh bridge rather than silently dropping all scoring for hours.
const INIT_WATCHDOG_MS = config.bridge.initWatchdogMs;
void (async () => {
const result = await Promise.race([
core.init().then(() => "done" as const),
new Promise<"timeout">((resolve) => {
const t = setTimeout(() => resolve("timeout"), INIT_WATCHDOG_MS);
(t as unknown as { unref?: () => void }).unref?.();
}),
]);
if (result === "timeout") {
process.stderr.write(
`bridge: daemon core.init watchdog: pipeline.ready not seen within ${INIT_WATCHDOG_MS}ms — restarting\n`,
);
void shutdownDaemon("init.watchdog");
}
})().catch((err) => {
process.stderr.write(
`bridge: daemon core.init error: ${err instanceof Error ? err.stack ?? err.message : String(err)}\n`,
);
void shutdownDaemon("init.error");
});


// Process stays alive via the HTTP server's ref'd socket.
return;
}
Expand Down
1 change: 1 addition & 0 deletions apps/memos-local-plugin/core/config/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export const DEFAULT_CONFIG: ResolvedConfig = {
bridge: {
port: 18911,
mode: "stdio",
initWatchdogMs: 120_000,
},
embedding: {
provider: "local",
Expand Down
1 change: 1 addition & 0 deletions apps/memos-local-plugin/core/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const ViewerSchema = Type.Object({
const BridgeSchema = Type.Object({
port: NumberInRange(18911, 1, 65535),
mode: Type.Union([Type.Literal("stdio"), Type.Literal("tcp")], { default: "stdio" }),
initWatchdogMs: NumberInRange(120_000, 30_000),
}, { default: {} });

const EmbeddingSchema = Type.Object({
Expand Down
7 changes: 6 additions & 1 deletion apps/memos-local-plugin/core/pipeline/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ export function createMemoryCore(
const log = rootLogger.child({ channel: "core.pipeline.memory-core" });
let telemetry = options.telemetry ?? null;
let initialized = false;
let pipelineReady = false;
let shutDown = false;
/** Per-episode monotonic step counter for tool outcomes. */
const toolStepByEpisode = new Map<string, number>();
Expand Down Expand Up @@ -844,7 +845,7 @@ export function createMemoryCore(
if (!hubRuntime) {
hubRuntime = makeHubRuntime(config);
}
await hubRuntime.start();
await withTimeout(hubRuntime.start(), 30_000, "hub_start_timeout");
}

async function restartHubRuntime(config: ResolvedConfig): Promise<void> {
Expand Down Expand Up @@ -1130,6 +1131,9 @@ export function createMemoryCore(
}, durationSince(result.startedAt, result.completedAt), ok);
}
});

pipelineReady = true;
log.info("pipeline.ready");
}

async function recoverOpenEpisodesAsSessionEnd(
Expand Down Expand Up @@ -1481,6 +1485,7 @@ export function createMemoryCore(

return {
ok: initialized && !shutDown,
pipelineReady,
version: pkgVersion,
uptimeMs: Date.now() - bootAt,
agent: handle.agent,
Expand Down
5 changes: 4 additions & 1 deletion apps/memos-local-plugin/server/routes/health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ export function registerHealthRoutes(routes: Routes, deps: ServerDeps): void {
};
return bridge ? { ...health, ...identity, bridge } : { ...health, ...identity };
});
routes.set("GET /api/v1/ping", () => ({ ok: true, ...serviceIdentity, ts: Date.now() }));
routes.set("GET /api/v1/ping", async () => {
const health = await deps.core.health();
return { ok: true, ...serviceIdentity, ts: Date.now(), pipelineReady: health.pipelineReady };
});
void ({} as RouteContext);
}
4 changes: 3 additions & 1 deletion apps/memos-local-plugin/viewer/src/stores/health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ export interface ModelCallStatus {

export interface HealthPayload {
ok: boolean;
/** True only after core.init() has fully wired all pipeline subscribers. */
pipelineReady?: boolean;
version?: string;
uptimeMs?: number;
agent?: string;
Expand Down Expand Up @@ -77,7 +79,7 @@ async function tick(): Promise<void> {
try {
const data = await api.get<HealthPayload>("/api/v1/health");
health.value = data;
healthStatus.value = data.ok ? "ok" : "degraded";
healthStatus.value = !data.ok ? "degraded" : data.pipelineReady === false ? "degraded" : "ok";
} catch {
health.value = null;
healthStatus.value = "down";
Expand Down