Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions packages/core/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { compact, shouldCompact } from './compaction/index.js';
import type { PermissionRules } from './config/types.js';
import { dispatchToolCall, type DispatchVerdict } from './harness/tool-dispatcher.js';
import { TaskManager } from './tasks/manager.js';
import type { HookDispatcher } from './hooks/index.js';
import type { Mode } from './types.js';
import type { Provider } from './providers/types.js';
Expand Down Expand Up @@ -102,6 +103,14 @@ const SUBAGENT_TOOL_DENYLIST = new Set([
'EnterPlanMode',
'ExitPlanMode',
'AskUserQuestion',
// Background tasks are top-level only (a sub-agent has no task manager).
'TaskCreate',
'TaskList',
'TaskGet',
'TaskOutput',
'TaskUpdate',
'TaskStop',
'Monitor',
]);
/** Default turn cap for a sub-agent run when its frontmatter doesn't set one. */
const DEFAULT_SUBAGENT_MAX_TURNS = 12;
Expand Down Expand Up @@ -209,7 +218,7 @@ export async function runAgent(opts: RunAgentOptions): Promise<RunAgentResult> {
// tool, see the denylist below; this is belt-and-suspenders).
const depth = opts.subAgentDepth ?? 0;
if (depth < MAX_SUBAGENT_DEPTH) {
toolCtx.runSubAgent = async ({ prompt, agentType }) => {
toolCtx.runSubAgent = async ({ prompt, agentType, signal }) => {
// Resolve a named sub-agent from disk (lazy import keeps node:fs out of
// browser bundles; failures degrade to a generic sub-agent prompt).
let systemPrompt =
Expand Down Expand Up @@ -268,7 +277,9 @@ export async function runAgent(opts: RunAgentOptions): Promise<RunAgentResult> {
temperature: opts.temperature,
maxTurns: subMaxTurns,
cwd: opts.cwd,
signal: opts.signal,
// A background task passes its own signal so TaskStop can cancel just
// that task; foreground sub-agents inherit the main run's signal.
signal: signal ?? opts.signal,
mode: opts.mode,
permissions: opts.permissions,
hooks: opts.hooks,
Expand Down Expand Up @@ -353,6 +364,22 @@ export async function runAgent(opts: RunAgentOptions): Promise<RunAgentResult> {
}
}

// Background tasks (TaskCreate family) — only at the top level, backed by the
// sub-agent runner. Each task gets its own AbortController so TaskStop cancels
// just that task. A sub-agent (depth ≥ 1) gets no manager → can't spawn tasks.
if (depth === 0 && toolCtx.runSubAgent) {
const runSub = toolCtx.runSubAgent;
toolCtx.tasks = new TaskManager((spec) => {
const ac = new AbortController();
const done = runSub({
prompt: spec.prompt,
agentType: spec.agentType,
signal: ac.signal,
}).then((r) => r.text);
return { done, abort: () => ac.abort() };
});
}

const totalUsage = { inputTokens: 0, outputTokens: 0, reasoningTokens: 0 };
let turnsUsed = 0;

Expand Down
10 changes: 10 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ export {
type ProviderImagePayload,
} from './vision/index.js';

// Background tasks (M3.15.3 — TaskCreate family + Monitor)
export {
TaskManager,
type Task,
type TaskStatus,
type TaskRunner,
type TaskRunHandle,
type CreateTaskSpec,
} from './tasks/manager.js';

// IPC protocol (M6-rest — renderer ↔ main process type-safe channels)
export {
newTurnId,
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/mcp/serve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ export const MCP_SERVE_EXCLUDE = new Set<string>([
'CronCreate',
'CronList',
'CronDelete',
'TaskCreate',
'TaskList',
'TaskGet',
'TaskOutput',
'TaskUpdate',
'TaskStop',
'Monitor',
]);

/** The subset of `tools` that is safe to expose over an MCP stdio server. */
Expand Down
84 changes: 84 additions & 0 deletions packages/core/src/tasks/manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { describe, expect, it } from 'vitest';
import { TaskManager, type TaskRunHandle } from './manager.js';

function deferred(): {
promise: Promise<string>;
resolve: (v: string) => void;
reject: (e: unknown) => void;
} {
let resolve!: (v: string) => void;
let reject!: (e: unknown) => void;
const promise = new Promise<string>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}

describe('TaskManager', () => {
it('create returns a running task; wait → completed + final output', async () => {
const d = deferred();
const mgr = new TaskManager(() => ({ done: d.promise, abort: () => {} }));
const t = mgr.create({ description: 'x', prompt: 'do x' });
expect(t.status).toBe('running');
expect(t.id).toMatch(/^task-/);

d.resolve('the result');
const done = await mgr.wait(t.id);
expect(done?.status).toBe('completed');
expect(done?.output).toBe('the result');
expect(done?.finishedAt).toBeTruthy();
});

it('a failed runner marks the task failed', async () => {
const d = deferred();
const mgr = new TaskManager(() => ({ done: d.promise, abort: () => {} }));
const t = mgr.create({ description: 'x', prompt: 'y' });
d.reject(new Error('boom'));
const done = await mgr.wait(t.id);
expect(done?.status).toBe('failed');
expect(done?.output).toMatch(/boom/);
});

it('stop aborts the run + marks stopped; second stop is a no-op', async () => {
let aborted = false;
const mgr = new TaskManager(
() =>
({ done: new Promise<string>(() => {}), abort: () => (aborted = true) }) as TaskRunHandle,
);
const t = mgr.create({ description: 'x', prompt: 'y' });
expect(mgr.stop(t.id)).toBe(true);
expect(aborted).toBe(true);
expect(mgr.get(t.id)?.status).toBe('stopped');
expect(mgr.stop(t.id)).toBe(false);
});

it('a streaming runner accumulates output via onChunk (kept over final text)', async () => {
const d = deferred();
let emit!: (c: string) => void;
const mgr = new TaskManager(() => ({
done: d.promise,
abort: () => {},
onChunk: (cb) => (emit = cb),
}));
const t = mgr.create({ description: 'x', prompt: 'y' });
emit('chunk1 ');
emit('chunk2');
expect(mgr.output(t.id)).toBe('chunk1 chunk2');
d.resolve('final-ignored');
await mgr.wait(t.id);
expect(mgr.output(t.id)).toBe('chunk1 chunk2');
expect(mgr.get(t.id)?.status).toBe('completed');
});

it('list / get / update / unknown-id behaviour', async () => {
const mgr = new TaskManager(() => ({ done: Promise.resolve('r'), abort: () => {} }));
const t = mgr.create({ description: 'orig', prompt: 'p' });
expect(mgr.list().map((x) => x.id)).toEqual([t.id]);
expect(mgr.update(t.id, { description: 'renamed' })).toBe(true);
expect(mgr.get(t.id)?.description).toBe('renamed');
expect(mgr.get('nope')).toBeUndefined();
expect(mgr.update('nope', { description: 'x' })).toBe(false);
expect(mgr.output('nope')).toBeUndefined();
});
});
121 changes: 121 additions & 0 deletions packages/core/src/tasks/manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Background tasks — the agent spawns a long-running sub-agent that runs
// concurrently with the main turn; TaskList/Get/Output/Stop/Monitor inspect
// and control it. Spec: docs/DEVELOPMENT_PLAN.md §3.15.3 (TaskCreate family).
//
// The manager is runner-agnostic: TaskCreate hands it a `runner` (the agent
// loop wires one backed by runSubAgent) which it invokes WITHOUT blocking the
// caller, piping streamed output into the task buffer and flipping status when
// it settles.

export type TaskStatus = 'running' | 'completed' | 'failed' | 'stopped';

export interface Task {
id: string;
description: string;
status: TaskStatus;
/** Accumulated output (streamed chunks for a live runner, else final text). */
output: string;
createdAt: string;
finishedAt?: string;
}

export interface TaskRunHandle {
/** Resolves with the task's final text when the run completes. */
done: Promise<string>;
/** Abort the run (best-effort). */
abort: () => void;
/** Register a streamed-output sink (optional — runners may not stream). */
onChunk?: (cb: (chunk: string) => void) => void;
}

export interface CreateTaskSpec {
description: string;
prompt: string;
agentType?: string;
}

/** Runner the host supplies: starts the work and returns a handle. */
export type TaskRunner = (spec: CreateTaskSpec) => TaskRunHandle;

export class TaskManager {
private readonly tasks = new Map<string, Task>();
private readonly handles = new Map<string, TaskRunHandle>();
private seq = 0;

constructor(private readonly runner: TaskRunner) {}

private newId(): string {
return `task-${(this.seq++).toString(36)}`;
}

/** Start a background task; returns the task record immediately. */
create(spec: CreateTaskSpec): Task {
const id = this.newId();
const task: Task = {
id,
description: spec.description,
status: 'running',
output: '',
createdAt: new Date().toISOString(),
};
this.tasks.set(id, task);
const handle = this.runner(spec);
this.handles.set(id, handle);
handle.onChunk?.((chunk) => {
const t = this.tasks.get(id);
if (t && t.status === 'running') t.output += chunk;
});
handle.done.then(
(text) => this.settle(id, 'completed', text),
(err) => this.settle(id, 'failed', `Error: ${(err as Error).message}`),
);
return { ...task };
}

private settle(id: string, status: TaskStatus, finalText: string): void {
const t = this.tasks.get(id);
if (!t || t.status !== 'running') return; // already stopped/settled
// For non-streaming runners output is empty until now → use the final text.
if (!t.output) t.output = finalText;
t.status = status;
t.finishedAt = new Date().toISOString();
}

get(id: string): Task | undefined {
const t = this.tasks.get(id);
return t ? { ...t } : undefined;
}

list(): Task[] {
return [...this.tasks.values()].map((t) => ({ ...t }));
}

output(id: string): string | undefined {
return this.tasks.get(id)?.output;
}

/** Abort a running task. Returns false if unknown or already finished. */
stop(id: string): boolean {
const t = this.tasks.get(id);
if (!t || t.status !== 'running') return false;
this.handles.get(id)?.abort();
t.status = 'stopped';
t.finishedAt = new Date().toISOString();
return true;
}

/** Update mutable task metadata (currently the description). */
update(id: string, patch: { description?: string }): boolean {
const t = this.tasks.get(id);
if (!t) return false;
if (patch.description !== undefined) t.description = patch.description;
return true;
}

/** Await a task's completion (resolves immediately if already settled). */
async wait(id: string): Promise<Task | undefined> {
const handle = this.handles.get(id);
if (handle) await handle.done.catch(() => undefined);
return this.get(id);
}
}
2 changes: 2 additions & 0 deletions packages/core/src/tools/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AskUserQuestionTool } from './ask-user.js';
import { BashTool } from './bash.js';
import { CronCreateTool, CronDeleteTool, CronListTool } from './cron-tools.js';
import { EditTool } from './edit.js';
import { TASK_TOOLS } from './task-manage.js';
import { EnterPlanModeTool } from './enter-plan.js';
import { ExitPlanModeTool } from './exit-plan.js';
import { GlobTool } from './glob.js';
Expand Down Expand Up @@ -47,6 +48,7 @@ export const BUILTIN_TOOLS: ToolHandler[] = [
CronCreateTool,
CronListTool,
CronDeleteTool,
...TASK_TOOLS,
];

export class ToolRegistry {
Expand Down
71 changes: 71 additions & 0 deletions packages/core/src/tools/task-manage.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { describe, expect, it } from 'vitest';
import { TaskManager } from '../tasks/manager.js';
import type { ToolContext } from '../types.js';
import {
MonitorTool,
TaskCreateTool,
TaskListTool,
TaskOutputTool,
TaskStopTool,
} from './task-manage.js';

function ctxWith(manager?: TaskManager): ToolContext {
return { cwd: '/tmp', tasks: manager };
}

describe('task tools', () => {
it('TaskCreate starts a task and Monitor awaits its output', async () => {
const mgr = new TaskManager((spec) => ({
done: Promise.resolve(`did: ${spec.prompt}`),
abort: () => {},
}));
const ctx = ctxWith(mgr);
const created = await TaskCreateTool.execute({ prompt: 'analyze logs' }, ctx);
const id = (created.data as { id: string }).id;
expect(created.content).toMatch(/Started background task/);

const mon = await MonitorTool.execute({ id }, ctx);
expect(mon.isError ?? false).toBe(false);
expect(mon.content).toContain('completed');
expect(mon.content).toContain('did: analyze logs');
});

it('TaskList + TaskOutput reflect created tasks', async () => {
const mgr = new TaskManager(() => ({ done: Promise.resolve('out'), abort: () => {} }));
const ctx = ctxWith(mgr);
const { data } = await TaskCreateTool.execute({ prompt: 'p', description: 'job A' }, ctx);
const id = (data as { id: string }).id;
await mgr.wait(id);

const list = await TaskListTool.execute({}, ctx);
expect(list.content).toContain('job A');
expect(list.content).toContain('[completed]');

const out = await TaskOutputTool.execute({ id }, ctx);
expect(out.content).toBe('out');
});

it('TaskStop cancels a running task', async () => {
const mgr = new TaskManager(() => ({ done: new Promise<string>(() => {}), abort: () => {} }));
const ctx = ctxWith(mgr);
const { data } = await TaskCreateTool.execute({ prompt: 'long' }, ctx);
const id = (data as { id: string }).id;
const stop = await TaskStopTool.execute({ id }, ctx);
expect(stop.content).toMatch(/Stopped task/);
expect(mgr.get(id)?.status).toBe('stopped');
});

it('tools error gracefully without a task manager (e.g. sub-agent)', async () => {
const ctx = ctxWith(undefined);
const r = await TaskCreateTool.execute({ prompt: 'x' }, ctx);
expect(r.isError).toBe(true);
expect(r.content).toMatch(/unavailable here/);
expect((await TaskListTool.execute({}, ctx)).isError).toBe(true);
});

it('Monitor/TaskOutput report unknown ids', async () => {
const ctx = ctxWith(new TaskManager(() => ({ done: Promise.resolve(''), abort: () => {} })));
expect((await MonitorTool.execute({ id: 'nope' }, ctx)).isError).toBe(true);
expect((await TaskOutputTool.execute({ id: 'nope' }, ctx)).isError).toBe(true);
});
});
Loading
Loading