refactor(tasks): unify the shared task run registry (#57324)

* refactor(tasks): simplify shared task run registry

* refactor(tasks): remove legacy task registry aliases

* fix(cron): normalize timeout task status and harden ledger writes

* fix(cron): keep manual runs resilient to ledger failures
This commit is contained in:
Vincent Koc
2026-03-29 16:28:17 -07:00
committed by GitHub
parent e4466c72a2
commit 53bcd5769e
19 changed files with 569 additions and 159 deletions

View File

@@ -833,7 +833,7 @@ export class AcpSessionManager {
if (taskContext) {
const terminalResult = resolveBackgroundTaskTerminalResult(taskProgressSummary);
this.updateBackgroundTaskState(taskContext.runId, {
status: "done",
status: "succeeded",
endedAt: Date.now(),
lastEventAt: Date.now(),
error: undefined,
@@ -1880,13 +1880,12 @@ export class AcpSessionManager {
private createBackgroundTaskRecord(context: BackgroundTaskContext, startedAt: number): void {
try {
createTaskRecord({
source: "unknown",
runtime: "acp",
sourceId: context.runId,
requesterSessionKey: context.requesterSessionKey,
requesterOrigin: context.requesterOrigin,
childSessionKey: context.childSessionKey,
runId: context.runId,
bindingTargetKind: "session",
label: context.label,
task: context.task,
status: "running",

View File

@@ -304,13 +304,12 @@ describe("AcpSessionManager", () => {
});
expect(findTaskByRunId("direct-parented-run")).toMatchObject({
source: "unknown",
runtime: "acp",
requesterSessionKey: "agent:quant:telegram:quant:direct:822430204",
childSessionKey: "agent:codex:acp:child-1",
label: "Quant patch",
task: "Implement the feature and report back",
status: "done",
status: "succeeded",
progressSummary: "Write failed: permission denied for /root/oc-acp-write-should-fail.txt.",
terminalOutcome: "blocked",
terminalSummary: "Permission denied for /root/oc-acp-write-should-fail.txt.",

View File

@@ -954,19 +954,17 @@ export async function spawnAcpDirect(
parentRelay?.notifyStarted();
try {
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
sourceId: childRunId,
requesterSessionKey: requesterInternalKey,
requesterOrigin: requesterState.origin,
childSessionKey: sessionKey,
runId: childRunId,
bindingTargetKind: "session",
label: params.label,
task: params.task,
status: "running",
deliveryStatus: requesterInternalKey.trim() ? "pending" : "parent_missing",
startedAt: Date.now(),
streamLogPath,
});
} catch (error) {
log.warn("Failed to create background task for ACP spawn", {
@@ -987,13 +985,12 @@ export async function spawnAcpDirect(
try {
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
sourceId: childRunId,
requesterSessionKey: requesterInternalKey,
requesterOrigin: requesterState.origin,
childSessionKey: sessionKey,
runId: childRunId,
bindingTargetKind: "session",
label: params.label,
task: params.task,
status: "running",

View File

@@ -460,7 +460,7 @@ export function createSubagentRegistryLifecycleController(params: {
runId: entry.runId,
status:
completeParams.outcome.status === "ok"
? "done"
? "succeeded"
: completeParams.outcome.status === "timeout"
? "timed_out"
: "failed",

View File

@@ -318,13 +318,12 @@ export function createSubagentRunManager(params: {
});
try {
createTaskRecord({
source: "sessions_spawn",
runtime: "subagent",
sourceId: registerParams.runId,
requesterSessionKey: registerParams.requesterSessionKey,
requesterOrigin,
childSessionKey: registerParams.childSessionKey,
runId: registerParams.runId,
bindingTargetKind: "subagent",
label: registerParams.label,
task: registerParams.task,
status: "running",

View File

@@ -224,10 +224,10 @@ export function registerStatusHealthSessionsCommands(program: Command) {
.command("tasks")
.description("Inspect durable background task state")
.option("--json", "Output as JSON", false)
.option("--runtime <name>", "Filter by runtime (subagent, acp, cli)")
.option("--runtime <name>", "Filter by kind (subagent, acp, cron, cli)")
.option(
"--status <name>",
"Filter by status (accepted, running, done, failed, timed_out, cancelled, lost)",
"Filter by status (queued, running, succeeded, failed, timed_out, cancelled, lost)",
)
.action(async (opts) => {
await runCommandWithRuntime(defaultRuntime, async () => {
@@ -247,10 +247,10 @@ export function registerStatusHealthSessionsCommands(program: Command) {
.command("list")
.description("List tracked background tasks")
.option("--json", "Output as JSON", false)
.option("--runtime <name>", "Filter by runtime (subagent, acp, cli)")
.option("--runtime <name>", "Filter by kind (subagent, acp, cron, cli)")
.option(
"--status <name>",
"Filter by status (accepted, running, done, failed, timed_out, cancelled, lost)",
"Filter by status (queued, running, succeeded, failed, timed_out, cancelled, lost)",
)
.action(async (opts, command) => {
const parentOpts = command.parent?.opts() as

View File

@@ -37,8 +37,8 @@ let tasksCancelCommand: typeof import("./tasks.js").tasksCancelCommand;
const taskFixture = {
taskId: "task-12345678",
source: "sessions_spawn",
runtime: "acp",
sourceId: "run-12345678",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child",
runId: "run-12345678",

View File

@@ -38,7 +38,7 @@ function formatTaskStatusCell(status: string, rich: boolean) {
if (!rich) {
return padded;
}
if (status === "done") {
if (status === "succeeded") {
return theme.success(padded);
}
if (status === "failed" || status === "lost" || status === "timed_out") {
@@ -53,7 +53,7 @@ function formatTaskStatusCell(status: string, rich: boolean) {
function formatTaskRows(tasks: TaskRecord[], rich: boolean) {
const header = [
"Task".padEnd(ID_PAD),
"Runtime".padEnd(RUNTIME_PAD),
"Kind".padEnd(RUNTIME_PAD),
"Status".padEnd(STATUS_PAD),
"Delivery".padEnd(DELIVERY_PAD),
"Run".padEnd(RUN_PAD),
@@ -151,21 +151,24 @@ export async function tasksShowCommand(
const lines = [
"Background task:",
`taskId: ${task.taskId}`,
`runtime: ${task.runtime}`,
`kind: ${task.runtime}`,
`sourceId: ${task.sourceId ?? "n/a"}`,
`status: ${task.status}`,
`result: ${task.terminalOutcome ?? "n/a"}`,
`delivery: ${task.deliveryStatus}`,
`notify: ${task.notifyPolicy}`,
`source: ${task.source}`,
`requesterSessionKey: ${task.requesterSessionKey}`,
`childSessionKey: ${task.childSessionKey ?? "n/a"}`,
`parentTaskId: ${task.parentTaskId ?? "n/a"}`,
`agentId: ${task.agentId ?? "n/a"}`,
`runId: ${task.runId ?? "n/a"}`,
`bindingTargetKind: ${task.bindingTargetKind ?? "n/a"}`,
`label: ${task.label ?? "n/a"}`,
`task: ${task.task}`,
`createdAt: ${new Date(task.createdAt).toISOString()}`,
`startedAt: ${task.startedAt ? new Date(task.startedAt).toISOString() : "n/a"}`,
`endedAt: ${task.endedAt ? new Date(task.endedAt).toISOString() : "n/a"}`,
`lastEventAt: ${task.lastEventAt ? new Date(task.lastEventAt).toISOString() : "n/a"}`,
`cleanupAfter: ${task.cleanupAfter ? new Date(task.cleanupAfter).toISOString() : "n/a"}`,
...(task.error ? [`error: ${task.error}`] : []),
...(task.progressSummary ? [`progressSummary: ${task.progressSummary}`] : []),
...(task.terminalSummary ? [`terminalSummary: ${task.terminalSummary}`] : []),
@@ -177,10 +180,6 @@ export async function tasksShowCommand(
}`,
)
: []),
...(task.streamLogPath ? [`streamLogPath: ${task.streamLogPath}`] : []),
...(task.transcriptPath ? [`transcriptPath: ${task.transcriptPath}`] : []),
...(task.agentSessionId ? [`agentSessionId: ${task.agentSessionId}`] : []),
...(task.backendSessionId ? [`backendSessionId: ${task.backendSessionId}`] : []),
];
for (const line of lines) {
runtime.log(line);

View File

@@ -1,8 +1,11 @@
import fs from "node:fs/promises";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import * as taskRegistry from "../../tasks/task-registry.js";
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js";
import type { CronJob } from "../types.js";
import { start, stop } from "./ops.js";
import { run, start, stop } from "./ops.js";
import { createCronServiceState } from "./state.js";
const { logger, makeStorePath } = setupCronServiceSuite({
@@ -27,6 +30,40 @@ function createInterruptedMainJob(now: number): CronJob {
};
}
function createDueIsolatedJob(now: number): CronJob {
return {
id: "isolated-timeout",
name: "isolated timeout",
enabled: true,
createdAtMs: now - 60_000,
updatedAtMs: now - 60_000,
schedule: { kind: "every", everyMs: 60_000, anchorMs: now - 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "do work" },
sessionKey: "agent:main:main",
state: { nextRunAtMs: now - 1 },
};
}
function createMissedIsolatedJob(now: number): CronJob {
return {
id: "startup-timeout",
name: "startup timeout",
enabled: true,
createdAtMs: now - 86_400_000,
updatedAtMs: now - 30 * 60_000,
schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC" },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "should timeout" },
sessionKey: "agent:main:main",
state: {
nextRunAtMs: now - 60_000,
},
};
}
describe("cron service ops seam coverage", () => {
it("start clears stale running markers, skips startup replay, persists, and arms the timer", async () => {
const { storePath } = await makeStorePath();
@@ -77,4 +114,177 @@ describe("cron service ops seam coverage", () => {
timeoutSpy.mockRestore();
stop(state);
});
it("records timed out manual runs as timed_out in the shared task registry", async () => {
const { storePath } = await makeStorePath();
const stateRoot = path.dirname(path.dirname(storePath));
const now = Date.parse("2026-03-23T12:00:00.000Z");
const originalStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = stateRoot;
resetTaskRegistryForTests();
await writeCronStoreSnapshot({
storePath,
jobs: [createDueIsolatedJob(now)],
});
const state = createCronServiceState({
storePath,
cronEnabled: true,
log: logger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => {
throw new Error("cron: job execution timed out");
}),
});
await run(state, "isolated-timeout");
expect(findTaskByRunId(`cron:isolated-timeout:${now}`)).toMatchObject({
runtime: "cron",
status: "timed_out",
sourceId: "isolated-timeout",
});
if (originalStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = originalStateDir;
}
resetTaskRegistryForTests();
});
it("keeps manual cron runs progressing when task ledger creation fails", async () => {
const { storePath } = await makeStorePath();
const now = Date.parse("2026-03-23T12:00:00.000Z");
await writeCronStoreSnapshot({
storePath,
jobs: [createDueIsolatedJob(now)],
});
const createTaskRecordSpy = vi
.spyOn(taskRegistry, "createTaskRecord")
.mockImplementation(() => {
throw new Error("disk full");
});
const state = createCronServiceState({
storePath,
cronEnabled: true,
log: logger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const, summary: "done" })),
});
await expect(run(state, "isolated-timeout")).resolves.toEqual({ ok: true, ran: true });
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
jobs: CronJob[];
};
expect(persisted.jobs[0]?.state.runningAtMs).toBeUndefined();
expect(persisted.jobs[0]?.state.lastStatus).toBe("ok");
expect(logger.warn).toHaveBeenCalledWith(
expect.objectContaining({ jobId: "isolated-timeout" }),
"cron: failed to create task ledger record",
);
createTaskRecordSpy.mockRestore();
});
it("keeps manual cron cleanup progressing when task ledger updates fail", async () => {
const { storePath } = await makeStorePath();
const stateRoot = path.dirname(path.dirname(storePath));
const now = Date.parse("2026-03-23T12:00:00.000Z");
const originalStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = stateRoot;
resetTaskRegistryForTests();
await writeCronStoreSnapshot({
storePath,
jobs: [createDueIsolatedJob(now)],
});
const updateTaskRecordSpy = vi
.spyOn(taskRegistry, "updateTaskRecordById")
.mockImplementation(() => {
throw new Error("disk full");
});
const state = createCronServiceState({
storePath,
cronEnabled: true,
log: logger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const, summary: "done" })),
});
await expect(run(state, "isolated-timeout")).resolves.toEqual({ ok: true, ran: true });
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
jobs: CronJob[];
};
expect(persisted.jobs[0]?.state.runningAtMs).toBeUndefined();
expect(persisted.jobs[0]?.state.lastStatus).toBe("ok");
expect(logger.warn).toHaveBeenCalledWith(
expect.objectContaining({ jobStatus: "ok" }),
"cron: failed to update task ledger record",
);
updateTaskRecordSpy.mockRestore();
if (originalStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = originalStateDir;
}
resetTaskRegistryForTests();
});
it("records startup catch-up timeouts as timed_out in the shared task registry", async () => {
const { storePath } = await makeStorePath();
const stateRoot = path.dirname(path.dirname(storePath));
const now = Date.parse("2026-03-23T12:00:00.000Z");
const originalStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = stateRoot;
resetTaskRegistryForTests();
await writeCronStoreSnapshot({
storePath,
jobs: [createMissedIsolatedJob(now)],
});
const state = createCronServiceState({
storePath,
cronEnabled: true,
log: logger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => {
throw new Error("cron: job execution timed out");
}),
});
await start(state);
expect(findTaskByRunId(`cron:startup-timeout:${now}`)).toMatchObject({
runtime: "cron",
status: "timed_out",
sourceId: "startup-timeout",
});
if (originalStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = originalStateDir;
}
resetTaskRegistryForTests();
stop(state);
});
});

View File

@@ -1,5 +1,6 @@
import { enqueueCommandInLane } from "../../process/command-queue.js";
import { CommandLane } from "../../process/lanes.js";
import { createTaskRecord, updateTaskRecordById } from "../../tasks/task-registry.js";
import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js";
import { normalizeCronCreateDeliveryInput } from "./initial-delivery.js";
import {
@@ -20,6 +21,7 @@ import {
armTimer,
emit,
executeJobCoreWithTimeout,
normalizeCronRunErrorText,
runMissedJobs,
stopTimer,
wake,
@@ -358,6 +360,7 @@ type PreparedManualRun =
ok: true;
ran: true;
jobId: string;
taskId?: string;
startedAt: number;
executionJob: CronJob;
}
@@ -379,6 +382,71 @@ type ManualRunPreflightResult =
let nextManualRunId = 1;
function tryCreateManualTaskRecord(params: {
state: CronServiceState;
job: CronJob;
startedAt: number;
}): string | undefined {
try {
return createTaskRecord({
runtime: "cron",
sourceId: params.job.id,
requesterSessionKey: "",
childSessionKey: params.job.sessionKey,
agentId: params.job.agentId,
runId: `cron:${params.job.id}:${params.startedAt}`,
label: params.job.name,
task: params.job.name || params.job.id,
status: "running",
deliveryStatus: "not_applicable",
notifyPolicy: "silent",
startedAt: params.startedAt,
lastEventAt: params.startedAt,
}).taskId;
} catch (error) {
params.state.deps.log.warn(
{ jobId: params.job.id, error },
"cron: failed to create task ledger record",
);
return undefined;
}
}
function tryUpdateManualTaskRecord(
state: CronServiceState,
params: {
taskId?: string;
coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
endedAt: number;
},
): void {
if (!params.taskId) {
return;
}
try {
updateTaskRecordById(params.taskId, {
status:
params.coreResult.status === "ok" || params.coreResult.status === "skipped"
? "succeeded"
: normalizeCronRunErrorText(params.coreResult.error) === "cron: job execution timed out"
? "timed_out"
: "failed",
endedAt: params.endedAt,
lastEventAt: params.endedAt,
error:
params.coreResult.status === "error"
? normalizeCronRunErrorText(params.coreResult.error)
: undefined,
terminalSummary: params.coreResult.summary ?? undefined,
});
} catch (error) {
state.deps.log.warn(
{ taskId: params.taskId, jobStatus: params.coreResult.status, error },
"cron: failed to update task ledger record",
);
}
}
async function inspectManualRunPreflight(
state: CronServiceState,
id: string,
@@ -448,11 +516,17 @@ async function prepareManualRun(
// force-reload from disk cannot start the same job concurrently.
await persist(state);
emit(state, { jobId: job.id, action: "started", runAtMs: preflight.now });
const taskId = tryCreateManualTaskRecord({
state,
job,
startedAt: preflight.now,
});
const executionJob = JSON.parse(JSON.stringify(job)) as CronJob;
return {
ok: true,
ran: true,
jobId: job.id,
taskId,
startedAt: preflight.now,
executionJob,
} as const;
@@ -467,14 +541,20 @@ async function finishPreparedManualRun(
const executionJob = prepared.executionJob;
const startedAt = prepared.startedAt;
const jobId = prepared.jobId;
const taskId = prepared.taskId;
let coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
try {
coreResult = await executeJobCoreWithTimeout(state, executionJob);
} catch (err) {
coreResult = { status: "error", error: String(err) };
coreResult = { status: "error", error: normalizeCronRunErrorText(err) };
}
const endedAt = state.deps.nowMs();
tryUpdateManualTaskRecord(state, {
taskId,
coreResult,
endedAt,
});
await locked(state, async () => {
await ensureLoaded(state, { skipRecompute: true });

View File

@@ -1,9 +1,11 @@
import fs from "node:fs/promises";
import { describe, expect, it, vi } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/service.test-harness.js";
import { createCronServiceState } from "../../cron/service/state.js";
import { onTimer } from "../../cron/service/timer.js";
import type { CronJob } from "../../cron/types.js";
import * as taskRegistry from "../../tasks/task-registry.js";
import { resetTaskRegistryForTests } from "../../tasks/task-registry.js";
const { logger, makeStorePath } = setupCronServiceSuite({
prefix: "cron-service-timer-seam",
@@ -25,6 +27,10 @@ function createDueMainJob(params: { now: number; wakeMode: CronJob["wakeMode"] }
};
}
afterEach(() => {
resetTaskRegistryForTests();
});
describe("cron service timer seam coverage", () => {
it("persists the next schedule and hands off next-heartbeat main jobs", async () => {
const { storePath } = await makeStorePath();
@@ -73,8 +79,50 @@ describe("cron service timer seam coverage", () => {
const delays = timeoutSpy.mock.calls
.map(([, delay]) => delay)
.filter((delay): delay is number => typeof delay === "number");
expect(delays).toContain(60_000);
expect(delays.some((delay) => delay > 0)).toBe(true);
timeoutSpy.mockRestore();
});
it("keeps scheduler progress when task ledger creation fails", async () => {
const { storePath } = await makeStorePath();
const now = Date.parse("2026-03-23T12:00:00.000Z");
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
await writeCronStoreSnapshot({
storePath,
jobs: [createDueMainJob({ now, wakeMode: "next-heartbeat" })],
});
const createTaskRecordSpy = vi
.spyOn(taskRegistry, "createTaskRecord")
.mockImplementation(() => {
throw new Error("disk full");
});
const state = createCronServiceState({
storePath,
cronEnabled: true,
log: logger,
nowMs: () => now,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
});
await onTimer(state);
expect(logger.warn).toHaveBeenCalledWith(
expect.objectContaining({ jobId: "main-heartbeat-job" }),
"cron: failed to create task ledger record",
);
expect(enqueueSystemEvent).toHaveBeenCalledWith("heartbeat seam tick", {
agentId: undefined,
sessionKey: "agent:main:main",
contextKey: "cron:main-heartbeat-job",
});
createTaskRecordSpy.mockRestore();
});
});

View File

@@ -2,6 +2,7 @@ import { resolveFailoverReasonFromError } from "../../agents/failover-error.js";
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
import { createTaskRecord, updateTaskRecordById } from "../../tasks/task-registry.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
import { sweepCronRunSessions } from "../session-reaper.js";
import type {
@@ -46,6 +47,7 @@ const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour
type TimedCronRunOutcome = CronRunOutcome &
CronRunTelemetry & {
jobId: string;
taskId?: string;
delivered?: boolean;
deliveryAttempted?: boolean;
startedAt: number;
@@ -107,6 +109,74 @@ function isAbortError(err: unknown): boolean {
}
return err.name === "AbortError" || err.message === timeoutErrorMessage();
}
export function normalizeCronRunErrorText(err: unknown): string {
if (isAbortError(err)) {
return timeoutErrorMessage();
}
if (typeof err === "string") {
return err === `Error: ${timeoutErrorMessage()}` ? timeoutErrorMessage() : err;
}
return String(err);
}
function tryCreateCronTaskRecord(params: {
state: CronServiceState;
job: CronJob;
startedAt: number;
}): string | undefined {
try {
return createTaskRecord({
runtime: "cron",
sourceId: params.job.id,
requesterSessionKey: "",
childSessionKey: params.job.sessionKey,
agentId: params.job.agentId,
runId: `cron:${params.job.id}:${params.startedAt}`,
label: params.job.name,
task: params.job.name || params.job.id,
status: "running",
deliveryStatus: "not_applicable",
notifyPolicy: "silent",
startedAt: params.startedAt,
lastEventAt: params.startedAt,
}).taskId;
} catch (error) {
params.state.deps.log.warn(
{ jobId: params.job.id, error },
"cron: failed to create task ledger record",
);
return undefined;
}
}
function tryUpdateCronTaskRecord(
state: CronServiceState,
result: Pick<TimedCronRunOutcome, "taskId" | "status" | "error" | "endedAt" | "summary">,
): void {
if (!result.taskId) {
return;
}
try {
updateTaskRecordById(result.taskId, {
status:
result.status === "ok" || result.status === "skipped"
? "succeeded"
: normalizeCronRunErrorText(result.error) === timeoutErrorMessage()
? "timed_out"
: "failed",
endedAt: result.endedAt,
lastEventAt: result.endedAt,
error: result.status === "error" ? normalizeCronRunErrorText(result.error) : undefined,
terminalSummary: result.summary ?? undefined,
});
} catch (error) {
state.deps.log.warn(
{ taskId: result.taskId, jobStatus: result.status, error },
"cron: failed to update task ledger record",
);
}
}
/**
* Exponential backoff delays (in ms) indexed by consecutive error count.
* After the last entry the delay stays constant.
@@ -474,6 +544,7 @@ export function applyJobResult(
}
function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOutcome): void {
tryUpdateCronTaskRecord(state, result);
const store = state.store;
if (!store) {
return;
@@ -630,18 +701,26 @@ export async function onTimer(state: CronServiceState) {
job.state.runningAtMs = startedAt;
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
const taskId = tryCreateCronTaskRecord({ state, job, startedAt });
try {
const result = await executeJobCoreWithTimeout(state, job);
return { jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() };
return {
jobId: id,
taskId,
...result,
startedAt,
endedAt: state.deps.nowMs(),
};
} catch (err) {
const errorText = isAbortError(err) ? timeoutErrorMessage() : String(err);
const errorText = normalizeCronRunErrorText(err);
state.deps.log.warn(
{ jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null },
`cron: job failed: ${errorText}`,
);
return {
jobId: id,
taskId,
status: "error",
error: errorText,
startedAt,
@@ -926,11 +1005,17 @@ async function runStartupCatchupCandidate(
candidate: StartupCatchupCandidate,
): Promise<TimedCronRunOutcome> {
const startedAt = state.deps.nowMs();
const taskId = tryCreateCronTaskRecord({
state,
job: candidate.job,
startedAt,
});
emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt });
try {
const result = await executeJobCoreWithTimeout(state, candidate.job);
return {
jobId: candidate.jobId,
taskId,
status: result.status,
error: result.error,
summary: result.summary,
@@ -946,8 +1031,9 @@ async function runStartupCatchupCandidate(
} catch (err) {
return {
jobId: candidate.jobId,
taskId,
status: "error",
error: String(err),
error: normalizeCronRunErrorText(err),
startedAt,
endedAt: state.deps.nowMs(),
};

View File

@@ -846,7 +846,6 @@ describe("gateway agent handler", () => {
);
expect(findTaskByRunId("task-registry-agent-run")).toMatchObject({
source: "background_cli",
runtime: "cli",
childSessionKey: "agent:main:main",
status: "running",

View File

@@ -191,8 +191,8 @@ function dispatchAgentRunFromGateway(params: {
if (params.ingressOpts.sessionKey?.trim()) {
try {
createTaskRecord({
source: "background_cli",
runtime: "cli",
sourceId: params.runId,
requesterSessionKey: params.ingressOpts.sessionKey,
requesterOrigin: normalizeDeliveryContext({
channel: params.ingressOpts.channel,
@@ -202,7 +202,6 @@ function dispatchAgentRunFromGateway(params: {
}),
childSessionKey: params.ingressOpts.sessionKey,
runId: params.runId,
bindingTargetKind: "session",
task: params.ingressOpts.message,
status: "running",
deliveryStatus: "not_applicable",

View File

@@ -33,7 +33,7 @@ function findSessionEntryByKey(store: Record<string, unknown>, sessionKey: strin
}
function isActiveTask(task: TaskRecord): boolean {
return task.status === "accepted" || task.status === "running";
return task.status === "queued" || task.status === "running";
}
function isTerminalTask(task: TaskRecord): boolean {
@@ -82,6 +82,9 @@ function shouldPruneTerminalTask(task: TaskRecord, now: number): boolean {
if (!isTerminalTask(task)) {
return false;
}
if (typeof task.cleanupAfter === "number") {
return now >= task.cleanupAfter;
}
const terminalAt = task.endedAt ?? task.lastEventAt ?? task.createdAt;
return now - terminalAt >= TASK_RETENTION_MS;
}

View File

@@ -11,8 +11,8 @@ import type { TaskRecord } from "./task-registry.types.js";
function createStoredTask(): TaskRecord {
return {
taskId: "task-restored",
source: "sessions_spawn",
runtime: "acp",
sourceId: "run-restored",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:restored",
runId: "run-restored",
@@ -48,7 +48,6 @@ describe("task-registry store runtime", () => {
expect(loadSnapshot).toHaveBeenCalledTimes(1);
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:new",
@@ -80,7 +79,6 @@ describe("task-registry store runtime", () => {
expect(findTaskByRunId("run-restored")).toBeTruthy();
const created = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:new",

View File

@@ -107,7 +107,6 @@ describe("task-registry", () => {
resetTaskRegistryForTests();
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:acp:child",
@@ -136,7 +135,7 @@ describe("task-registry", () => {
expect(findTaskByRunId("run-1")).toMatchObject({
runtime: "acp",
status: "done",
status: "succeeded",
endedAt: 250,
});
});
@@ -153,7 +152,6 @@ describe("task-registry", () => {
});
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -180,7 +178,7 @@ describe("task-registry", () => {
await waitForAssertion(() =>
expect(findTaskByRunId("run-delivery")).toMatchObject({
status: "done",
status: "succeeded",
deliveryStatus: "delivered",
}),
);
@@ -208,7 +206,6 @@ describe("task-registry", () => {
hoisted.sendMessageMock.mockRejectedValueOnce(new Error("telegram unavailable"));
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -255,7 +252,6 @@ describe("task-registry", () => {
hoisted.sendMessageMock.mockRejectedValueOnce(new Error("telegram unavailable"));
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -265,7 +261,7 @@ describe("task-registry", () => {
childSessionKey: "agent:main:acp:child",
runId: "run-delivery-blocked",
task: "Port the repo changes",
status: "done",
status: "succeeded",
deliveryStatus: "pending",
terminalOutcome: "blocked",
terminalSummary: "Writable session or apply_patch authorization required.",
@@ -273,7 +269,7 @@ describe("task-registry", () => {
await waitForAssertion(() =>
expect(findTaskByRunId("run-delivery-blocked")).toMatchObject({
status: "done",
status: "succeeded",
deliveryStatus: "failed",
terminalOutcome: "blocked",
}),
@@ -292,7 +288,6 @@ describe("task-registry", () => {
resetTaskRegistryForTests();
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:acp:child",
@@ -314,7 +309,7 @@ describe("task-registry", () => {
await waitForAssertion(() =>
expect(findTaskByRunId("run-session-queued")).toMatchObject({
status: "done",
status: "succeeded",
deliveryStatus: "session_queued",
}),
);
@@ -331,13 +326,12 @@ describe("task-registry", () => {
resetTaskRegistryForTests();
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:acp:child",
runId: "run-session-blocked",
task: "Port the repo changes",
status: "done",
status: "succeeded",
deliveryStatus: "pending",
terminalOutcome: "blocked",
terminalSummary: "Writable session or apply_patch authorization required.",
@@ -345,7 +339,7 @@ describe("task-registry", () => {
await waitForAssertion(() =>
expect(findTaskByRunId("run-session-blocked")).toMatchObject({
status: "done",
status: "succeeded",
deliveryStatus: "session_queued",
}),
);
@@ -369,7 +363,6 @@ describe("task-registry", () => {
});
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -420,7 +413,6 @@ describe("task-registry", () => {
});
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -430,7 +422,7 @@ describe("task-registry", () => {
childSessionKey: "agent:main:acp:child",
runId: "run-blocked-outcome",
task: "Port the repo changes",
status: "done",
status: "succeeded",
deliveryStatus: "pending",
terminalOutcome: "blocked",
terminalSummary: "Writable session or apply_patch authorization required.",
@@ -462,7 +454,6 @@ describe("task-registry", () => {
});
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -472,7 +463,7 @@ describe("task-registry", () => {
childSessionKey: "agent:main:acp:child",
runId: "run-succeeded-outcome",
task: "Create the file and verify it",
status: "done",
status: "succeeded",
deliveryStatus: "pending",
terminalSummary: "Created /tmp/file.txt and verified contents.",
terminalOutcome: "succeeded",
@@ -497,7 +488,6 @@ describe("task-registry", () => {
resetTaskRegistryForTests();
createTaskRecord({
source: "background_cli",
runtime: "cli",
requesterSessionKey: "agent:codex:acp:child",
childSessionKey: "agent:codex:acp:child",
@@ -508,7 +498,6 @@ describe("task-registry", () => {
});
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child",
@@ -520,7 +509,6 @@ describe("task-registry", () => {
expect(listTaskRecords().filter((task) => task.runId === "run-shared")).toHaveLength(2);
expect(findTaskByRunId("run-shared")).toMatchObject({
source: "sessions_spawn",
runtime: "acp",
task: "Spawn ACP child",
});
@@ -538,7 +526,6 @@ describe("task-registry", () => {
});
const directTask = createTaskRecord({
source: "unknown",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -548,11 +535,10 @@ describe("task-registry", () => {
childSessionKey: "agent:main:acp:child",
runId: "run-shared-delivery",
task: "Direct ACP child",
status: "done",
status: "succeeded",
deliveryStatus: "pending",
});
const spawnedTask = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -562,7 +548,7 @@ describe("task-registry", () => {
childSessionKey: "agent:main:acp:child",
runId: "run-shared-delivery",
task: "Spawn ACP child",
status: "done",
status: "succeeded",
deliveryStatus: "pending",
});
@@ -575,7 +561,6 @@ describe("task-registry", () => {
);
expect(findTaskByRunId("run-shared-delivery")).toMatchObject({
taskId: directTask.taskId,
source: "sessions_spawn",
deliveryStatus: "delivered",
});
});
@@ -587,7 +572,6 @@ describe("task-registry", () => {
resetTaskRegistryForTests();
const spawnedTask = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -599,11 +583,9 @@ describe("task-registry", () => {
task: "Spawn ACP child",
status: "running",
deliveryStatus: "pending",
streamLogPath: "/tmp/stream.jsonl",
});
const directTask = createTaskRecord({
source: "unknown",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -619,9 +601,7 @@ describe("task-registry", () => {
expect(directTask.taskId).toBe(spawnedTask.taskId);
expect(listTaskRecords().filter((task) => task.runId === "run-collapse")).toHaveLength(1);
expect(findTaskByRunId("run-collapse")).toMatchObject({
source: "sessions_spawn",
task: "Spawn ACP child",
streamLogPath: "/tmp/stream.jsonl",
});
});
});
@@ -637,7 +617,6 @@ describe("task-registry", () => {
});
const task = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -647,7 +626,7 @@ describe("task-registry", () => {
childSessionKey: "agent:main:acp:child",
runId: "run-racing-delivery",
task: "Investigate issue",
status: "done",
status: "succeeded",
deliveryStatus: "pending",
terminalOutcome: "blocked",
terminalSummary: "Writable session or apply_patch authorization required.",
@@ -660,9 +639,9 @@ describe("task-registry", () => {
expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1);
expect(hoisted.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
idempotencyKey: `task-terminal:${task.taskId}:done:blocked`,
idempotencyKey: `task-terminal:${task.taskId}:succeeded:blocked`,
mirror: expect.objectContaining({
idempotencyKey: `task-terminal:${task.taskId}:done:blocked`,
idempotencyKey: `task-terminal:${task.taskId}:succeeded:blocked`,
}),
}),
);
@@ -678,7 +657,6 @@ describe("task-registry", () => {
resetTaskRegistryForTests();
const task = createTaskRecord({
source: "sessions_spawn",
runtime: "subagent",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:subagent:child",
@@ -706,7 +684,6 @@ describe("task-registry", () => {
resetTaskRegistryForTests();
const task = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:acp:missing",
@@ -738,13 +715,12 @@ describe("task-registry", () => {
resetTaskRegistryForTests();
const task = createTaskRecord({
source: "background_cli",
runtime: "cli",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:main:main",
runId: "run-prune",
task: "Old completed task",
status: "done",
status: "succeeded",
deliveryStatus: "not_applicable",
startedAt: Date.now() - 9 * 24 * 60 * 60_000,
});
@@ -772,7 +748,6 @@ describe("task-registry", () => {
});
const task = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -782,7 +757,7 @@ describe("task-registry", () => {
childSessionKey: "agent:codex:acp:child",
runId: "run-state-change",
task: "Investigate issue",
status: "accepted",
status: "queued",
notifyPolicy: "done_only",
});
@@ -838,7 +813,6 @@ describe("task-registry", () => {
vi.useFakeTimers();
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -911,7 +885,6 @@ describe("task-registry", () => {
});
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -963,7 +936,6 @@ describe("task-registry", () => {
vi.useFakeTimers();
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -1021,7 +993,6 @@ describe("task-registry", () => {
hoisted.cancelSessionMock.mockResolvedValue(undefined);
const task = registry.createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
@@ -1079,7 +1050,6 @@ describe("task-registry", () => {
});
const task = registry.createTaskRecord({
source: "sessions_spawn",
runtime: "subagent",
requesterSessionKey: "agent:main:main",
requesterOrigin: {

View File

@@ -16,7 +16,6 @@ import {
type TaskRegistryHookEvent,
} from "./task-registry.store.js";
import type {
TaskBindingTargetKind,
TaskDeliveryStatus,
TaskEventKind,
TaskEventRecord,
@@ -24,12 +23,12 @@ import type {
TaskRecord,
TaskRegistrySnapshot,
TaskRuntime,
TaskSource,
TaskStatus,
TaskTerminalOutcome,
} from "./task-registry.types.js";
const log = createSubsystemLogger("tasks/registry");
const DEFAULT_TASK_RETENTION_MS = 7 * 24 * 60 * 60_000;
const tasks = new Map<string, TaskRecord>();
const taskIdsByRunId = new Map<string, Set<string>>();
@@ -94,12 +93,35 @@ function normalizeTaskSummary(value: string | null | undefined): string | undefi
return normalized || undefined;
}
function normalizeTaskStatus(value: TaskStatus | null | undefined): TaskStatus {
return value === "running" ||
value === "queued" ||
value === "succeeded" ||
value === "failed" ||
value === "timed_out" ||
value === "cancelled" ||
value === "lost"
? value
: "queued";
}
function normalizeTaskTerminalOutcome(
value: TaskTerminalOutcome | null | undefined,
): TaskTerminalOutcome | undefined {
return value === "succeeded" || value === "blocked" ? value : undefined;
}
function resolveTaskTerminalOutcome(params: {
status: TaskStatus;
terminalOutcome?: TaskTerminalOutcome | null;
}): TaskTerminalOutcome | undefined {
const normalized = normalizeTaskTerminalOutcome(params.terminalOutcome);
if (normalized) {
return normalized;
}
return params.status === "succeeded" ? "succeeded" : undefined;
}
const TASK_RECENT_EVENT_LIMIT = 12;
function appendTaskEvent(
@@ -157,10 +179,8 @@ function getTasksByRunId(runId: string): TaskRecord[] {
}
function taskLookupPriority(task: TaskRecord): number {
const sourcePriority =
task.source === "sessions_spawn" ? 0 : task.source === "background_cli" ? 1 : 2;
const runtimePriority = task.runtime === "cli" ? 1 : 0;
return sourcePriority * 10 + runtimePriority;
return runtimePriority;
}
function pickPreferredRunIdTask(matches: TaskRecord[]): TaskRecord | undefined {
@@ -178,12 +198,10 @@ function normalizeComparableText(value: string | undefined): string {
}
function findExistingTaskForCreate(params: {
source: TaskSource;
runtime: TaskRuntime;
requesterSessionKey: string;
childSessionKey?: string;
runId?: string;
bindingTargetKind?: TaskBindingTargetKind;
label?: string;
task: string;
}): TaskRecord | undefined {
@@ -191,14 +209,11 @@ function findExistingTaskForCreate(params: {
const exact = runId
? getTasksByRunId(runId).find(
(task) =>
task.source === params.source &&
task.runtime === params.runtime &&
normalizeComparableText(task.requesterSessionKey) ===
normalizeComparableText(params.requesterSessionKey) &&
normalizeComparableText(task.childSessionKey) ===
normalizeComparableText(params.childSessionKey) &&
normalizeComparableText(task.bindingTargetKind) ===
normalizeComparableText(params.bindingTargetKind) &&
normalizeComparableText(task.label) === normalizeComparableText(params.label) &&
normalizeComparableText(task.task) === normalizeComparableText(params.task),
)
@@ -223,43 +238,36 @@ function findExistingTaskForCreate(params: {
return pickPreferredRunIdTask(siblingMatches);
}
function sourceUpgradePriority(source: TaskSource): number {
return source === "sessions_spawn" ? 0 : source === "background_cli" ? 1 : 2;
}
function mergeExistingTaskForCreate(
existing: TaskRecord,
params: {
source: TaskSource;
requesterOrigin?: TaskRecord["requesterOrigin"];
bindingTargetKind?: TaskBindingTargetKind;
sourceId?: string;
parentTaskId?: string;
agentId?: string;
label?: string;
task: string;
deliveryStatus?: TaskDeliveryStatus;
notifyPolicy?: TaskNotifyPolicy;
streamLogPath?: string;
},
): TaskRecord {
const patch: Partial<TaskRecord> = {};
if (sourceUpgradePriority(params.source) < sourceUpgradePriority(existing.source)) {
patch.source = params.source;
}
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
if (requesterOrigin && !existing.requesterOrigin) {
patch.requesterOrigin = requesterOrigin;
}
if (params.bindingTargetKind && !existing.bindingTargetKind) {
patch.bindingTargetKind = params.bindingTargetKind;
if (params.sourceId?.trim() && !existing.sourceId?.trim()) {
patch.sourceId = params.sourceId.trim();
}
if (params.parentTaskId?.trim() && !existing.parentTaskId?.trim()) {
patch.parentTaskId = params.parentTaskId.trim();
}
if (params.agentId?.trim() && !existing.agentId?.trim()) {
patch.agentId = params.agentId.trim();
}
if (params.label?.trim() && !existing.label?.trim()) {
patch.label = params.label.trim();
}
if (params.streamLogPath?.trim() && !existing.streamLogPath?.trim()) {
patch.streamLogPath = params.streamLogPath.trim();
}
if (params.source === "sessions_spawn" && existing.source !== "sessions_spawn") {
patch.task = params.task;
}
if (params.deliveryStatus === "pending" && existing.deliveryStatus !== "delivered") {
patch.deliveryStatus = "pending";
}
@@ -278,7 +286,7 @@ function mergeExistingTaskForCreate(
}
function taskTerminalDeliveryIdempotencyKey(task: TaskRecord): string {
const outcome = task.status === "done" ? (task.terminalOutcome ?? "default") : "default";
const outcome = task.status === "succeeded" ? (task.terminalOutcome ?? "default") : "default";
return `task-terminal:${task.taskId}:${task.status}:${outcome}`;
}
@@ -310,12 +318,26 @@ export function ensureTaskRegistryReady() {
ensureListener();
}
function isTerminalTaskStatus(status: TaskStatus): boolean {
return (
status === "succeeded" ||
status === "failed" ||
status === "timed_out" ||
status === "cancelled" ||
status === "lost"
);
}
function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | null {
const current = tasks.get(taskId);
if (!current) {
return null;
}
const next = { ...current, ...patch };
if (isTerminalTaskStatus(next.status) && typeof next.cleanupAfter !== "number") {
const terminalAt = next.endedAt ?? next.lastEventAt ?? Date.now();
next.cleanupAfter = terminalAt + DEFAULT_TASK_RETENTION_MS;
}
tasks.set(taskId, next);
if (patch.runId && patch.runId !== current.runId) {
rebuildRunIdIndex();
@@ -341,7 +363,7 @@ function formatTaskTerminalEvent(task: TaskRecord): string {
: task.task.trim() || "Background task");
const runLabel = task.runId ? ` (run ${task.runId.slice(0, 8)})` : "";
const summary = task.terminalSummary?.trim();
if (task.status === "done") {
if (task.status === "succeeded") {
if (task.terminalOutcome === "blocked") {
return summary
? `Background task blocked: ${title}${runLabel}. ${summary}`
@@ -391,7 +413,7 @@ function queueTaskSystemEvent(task: TaskRecord, text: string) {
}
function queueBlockedTaskFollowup(task: TaskRecord) {
if (task.status !== "done" || task.terminalOutcome !== "blocked") {
if (task.status !== "succeeded" || task.terminalOutcome !== "blocked") {
return false;
}
const requesterSessionKey = task.requesterSessionKey.trim();
@@ -444,7 +466,7 @@ function shouldAutoDeliverTaskUpdate(task: TaskRecord): boolean {
return false;
}
if (
task.status !== "done" &&
task.status !== "succeeded" &&
task.status !== "failed" &&
task.status !== "timed_out" &&
task.status !== "lost" &&
@@ -459,7 +481,7 @@ function shouldAutoDeliverTaskStateChange(task: TaskRecord): boolean {
return (
task.notifyPolicy === "state_changes" &&
task.deliveryStatus === "pending" &&
task.status !== "done" &&
task.status !== "succeeded" &&
task.status !== "failed" &&
task.status !== "timed_out" &&
task.status !== "lost" &&
@@ -688,7 +710,7 @@ function ensureListener() {
if (phase === "start") {
patch.status = "running";
} else if (phase === "end") {
patch.status = evt.data?.aborted === true ? "timed_out" : "done";
patch.status = evt.data?.aborted === true ? "timed_out" : "succeeded";
patch.endedAt = endedAt ?? now;
} else if (phase === "error") {
patch.status = "failed";
@@ -705,7 +727,7 @@ function ensureListener() {
summary:
patch.status === "failed"
? (patch.error ?? current.error)
: patch.status === "done"
: patch.status === "succeeded"
? current.terminalSummary
: undefined,
});
@@ -720,13 +742,14 @@ function ensureListener() {
}
export function createTaskRecord(params: {
source: TaskSource;
runtime: TaskRuntime;
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: TaskRecord["requesterOrigin"];
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
bindingTargetKind?: TaskBindingTargetKind;
label?: string;
task: string;
status?: TaskStatus;
@@ -734,14 +757,10 @@ export function createTaskRecord(params: {
notifyPolicy?: TaskNotifyPolicy;
startedAt?: number;
lastEventAt?: number;
cleanupAfter?: number;
progressSummary?: string | null;
terminalSummary?: string | null;
terminalOutcome?: TaskTerminalOutcome | null;
transcriptPath?: string;
streamLogPath?: string;
backend?: string;
agentSessionId?: string;
backendSessionId?: string;
}): TaskRecord {
ensureTaskRegistryReady();
const existing = findExistingTaskForCreate(params);
@@ -750,7 +769,7 @@ export function createTaskRecord(params: {
}
const now = Date.now();
const taskId = crypto.randomUUID();
const status = params.status ?? "accepted";
const status = normalizeTaskStatus(params.status);
const deliveryStatus = params.deliveryStatus ?? ensureDeliveryStatus(params.requesterSessionKey);
const notifyPolicy = ensureNotifyPolicy({
notifyPolicy: params.notifyPolicy,
@@ -760,13 +779,14 @@ export function createTaskRecord(params: {
const lastEventAt = params.lastEventAt ?? params.startedAt ?? now;
const record: TaskRecord = {
taskId,
source: params.source,
runtime: params.runtime,
sourceId: params.sourceId?.trim() || undefined,
requesterSessionKey: params.requesterSessionKey,
requesterOrigin: normalizeDeliveryContext(params.requesterOrigin),
childSessionKey: params.childSessionKey,
parentTaskId: params.parentTaskId?.trim() || undefined,
agentId: params.agentId?.trim() || undefined,
runId: params.runId?.trim() || undefined,
bindingTargetKind: params.bindingTargetKind,
label: params.label?.trim() || undefined,
task: params.task,
status,
@@ -775,13 +795,16 @@ export function createTaskRecord(params: {
createdAt: now,
startedAt: params.startedAt,
lastEventAt,
cleanupAfter: params.cleanupAfter,
progressSummary: normalizeTaskSummary(params.progressSummary),
terminalSummary: normalizeTaskSummary(params.terminalSummary),
terminalOutcome: normalizeTaskTerminalOutcome(params.terminalOutcome),
terminalOutcome: resolveTaskTerminalOutcome({
status,
terminalOutcome: params.terminalOutcome,
}),
recentEvents: appendTaskEvent(
{
taskId,
source: params.source,
runtime: params.runtime,
requesterSessionKey: params.requesterSessionKey,
task: params.task,
@@ -795,12 +818,11 @@ export function createTaskRecord(params: {
kind: status,
},
),
transcriptPath: params.transcriptPath,
streamLogPath: params.streamLogPath,
backend: params.backend,
agentSessionId: params.agentSessionId,
backendSessionId: params.backendSessionId,
};
if (isTerminalTaskStatus(record.status) && typeof record.cleanupAfter !== "number") {
record.cleanupAfter =
(record.endedAt ?? record.lastEventAt ?? record.createdAt) + DEFAULT_TASK_RETENTION_MS;
}
tasks.set(taskId, record);
addRunIdIndex(taskId, record.runId);
persistTaskRegistry();
@@ -808,6 +830,9 @@ export function createTaskRecord(params: {
kind: "upserted",
task: cloneTaskRecord(record),
}));
if (isTerminalTaskStatus(record.status)) {
void maybeDeliverTaskTerminalUpdate(taskId);
}
return cloneTaskRecord(record);
}
@@ -835,10 +860,10 @@ export function updateTaskStateByRunId(params: {
continue;
}
const patch: Partial<TaskRecord> = {};
const nextStatus = params.status ?? current.status;
const nextStatus = params.status ? normalizeTaskStatus(params.status) : current.status;
const eventAt = params.lastEventAt ?? params.endedAt ?? Date.now();
if (params.status) {
patch.status = params.status;
patch.status = normalizeTaskStatus(params.status);
}
if (params.startedAt != null) {
patch.startedAt = params.startedAt;
@@ -859,13 +884,16 @@ export function updateTaskStateByRunId(params: {
patch.terminalSummary = normalizeTaskSummary(params.terminalSummary);
}
if (params.terminalOutcome !== undefined) {
patch.terminalOutcome = normalizeTaskTerminalOutcome(params.terminalOutcome);
patch.terminalOutcome = resolveTaskTerminalOutcome({
status: nextStatus,
terminalOutcome: params.terminalOutcome,
});
}
const eventSummary =
normalizeTaskSummary(params.eventSummary) ??
(nextStatus === "failed"
? normalizeTaskSummary(params.error ?? current.error)
: nextStatus === "done"
: nextStatus === "succeeded"
? normalizeTaskSummary(params.terminalSummary ?? current.terminalSummary)
: undefined);
const shouldAppendEvent =
@@ -874,7 +902,10 @@ export function updateTaskStateByRunId(params: {
if (shouldAppendEvent) {
patch.recentEvents = appendTaskEvent(current, {
at: eventAt,
kind: params.status && params.status !== current.status ? params.status : "progress",
kind:
params.status && normalizeTaskStatus(params.status) !== current.status
? normalizeTaskStatus(params.status)
: "progress",
summary: eventSummary,
});
}
@@ -921,7 +952,7 @@ export async function cancelTaskById(params: {
return { found: false, cancelled: false, reason: "Task not found." };
}
if (
task.status === "done" ||
task.status === "succeeded" ||
task.status === "failed" ||
task.status === "timed_out" ||
task.status === "lost" ||

View File

@@ -1,11 +1,11 @@
import type { DeliveryContext } from "../utils/delivery-context.js";
export type TaskRuntime = "subagent" | "acp" | "cli";
export type TaskRuntime = "subagent" | "acp" | "cli" | "cron";
export type TaskStatus =
| "accepted"
| "queued"
| "running"
| "done"
| "succeeded"
| "failed"
| "timed_out"
| "cancelled"
@@ -23,10 +23,6 @@ export type TaskNotifyPolicy = "done_only" | "state_changes" | "silent";
export type TaskTerminalOutcome = "succeeded" | "blocked";
export type TaskBindingTargetKind = "subagent" | "session";
export type TaskSource = "sessions_spawn" | "background_cli" | "unknown";
export type TaskEventKind = TaskStatus | "progress";
export type TaskEventRecord = {
@@ -37,13 +33,14 @@ export type TaskEventRecord = {
export type TaskRecord = {
taskId: string;
source: TaskSource;
runtime: TaskRuntime;
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
bindingTargetKind?: TaskBindingTargetKind;
label?: string;
task: string;
status: TaskStatus;
@@ -53,17 +50,13 @@ export type TaskRecord = {
startedAt?: number;
endedAt?: number;
lastEventAt?: number;
cleanupAfter?: number;
error?: string;
progressSummary?: string;
terminalSummary?: string;
terminalOutcome?: TaskTerminalOutcome;
recentEvents?: TaskEventRecord[];
lastNotifiedEventAt?: number;
transcriptPath?: string;
streamLogPath?: string;
backend?: string;
agentSessionId?: string;
backendSessionId?: string;
};
export type TaskRegistrySnapshot = {