diff --git a/src/acp/control-plane/manager.core.ts b/src/acp/control-plane/manager.core.ts index b5dc6e5296a..18c532919b3 100644 --- a/src/acp/control-plane/manager.core.ts +++ b/src/acp/control-plane/manager.core.ts @@ -833,7 +833,7 @@ export class AcpSessionManager { if (taskContext) { const terminalResult = resolveBackgroundTaskTerminalResult(taskProgressSummary); this.updateBackgroundTaskState(taskContext.runId, { - status: "done", + status: "succeeded", endedAt: Date.now(), lastEventAt: Date.now(), error: undefined, @@ -1880,13 +1880,12 @@ export class AcpSessionManager { private createBackgroundTaskRecord(context: BackgroundTaskContext, startedAt: number): void { try { createTaskRecord({ - source: "unknown", runtime: "acp", + sourceId: context.runId, requesterSessionKey: context.requesterSessionKey, requesterOrigin: context.requesterOrigin, childSessionKey: context.childSessionKey, runId: context.runId, - bindingTargetKind: "session", label: context.label, task: context.task, status: "running", diff --git a/src/acp/control-plane/manager.test.ts b/src/acp/control-plane/manager.test.ts index c96df64af9a..179e5551434 100644 --- a/src/acp/control-plane/manager.test.ts +++ b/src/acp/control-plane/manager.test.ts @@ -304,13 +304,12 @@ describe("AcpSessionManager", () => { }); expect(findTaskByRunId("direct-parented-run")).toMatchObject({ - source: "unknown", runtime: "acp", requesterSessionKey: "agent:quant:telegram:quant:direct:822430204", childSessionKey: "agent:codex:acp:child-1", label: "Quant patch", task: "Implement the feature and report back", - status: "done", + status: "succeeded", progressSummary: "Write failed: permission denied for /root/oc-acp-write-should-fail.txt.", terminalOutcome: "blocked", terminalSummary: "Permission denied for /root/oc-acp-write-should-fail.txt.", diff --git a/src/agents/acp-spawn.ts b/src/agents/acp-spawn.ts index e0230e17ab2..30aa7255664 100644 --- a/src/agents/acp-spawn.ts +++ b/src/agents/acp-spawn.ts @@ -954,19 +954,17 @@ export async function spawnAcpDirect( parentRelay?.notifyStarted(); try { createTaskRecord({ - source: "sessions_spawn", runtime: "acp", + sourceId: childRunId, requesterSessionKey: requesterInternalKey, requesterOrigin: requesterState.origin, childSessionKey: sessionKey, runId: childRunId, - bindingTargetKind: "session", label: params.label, task: params.task, status: "running", deliveryStatus: requesterInternalKey.trim() ? "pending" : "parent_missing", startedAt: Date.now(), - streamLogPath, }); } catch (error) { log.warn("Failed to create background task for ACP spawn", { @@ -987,13 +985,12 @@ export async function spawnAcpDirect( try { createTaskRecord({ - source: "sessions_spawn", runtime: "acp", + sourceId: childRunId, requesterSessionKey: requesterInternalKey, requesterOrigin: requesterState.origin, childSessionKey: sessionKey, runId: childRunId, - bindingTargetKind: "session", label: params.label, task: params.task, status: "running", diff --git a/src/agents/subagent-registry-lifecycle.ts b/src/agents/subagent-registry-lifecycle.ts index ae146141662..9feac77937d 100644 --- a/src/agents/subagent-registry-lifecycle.ts +++ b/src/agents/subagent-registry-lifecycle.ts @@ -460,7 +460,7 @@ export function createSubagentRegistryLifecycleController(params: { runId: entry.runId, status: completeParams.outcome.status === "ok" - ? "done" + ? "succeeded" : completeParams.outcome.status === "timeout" ? "timed_out" : "failed", diff --git a/src/agents/subagent-registry-run-manager.ts b/src/agents/subagent-registry-run-manager.ts index 4085ae60562..17071752b58 100644 --- a/src/agents/subagent-registry-run-manager.ts +++ b/src/agents/subagent-registry-run-manager.ts @@ -318,13 +318,12 @@ export function createSubagentRunManager(params: { }); try { createTaskRecord({ - source: "sessions_spawn", runtime: "subagent", + sourceId: registerParams.runId, requesterSessionKey: registerParams.requesterSessionKey, requesterOrigin, childSessionKey: registerParams.childSessionKey, runId: registerParams.runId, - bindingTargetKind: "subagent", label: registerParams.label, task: registerParams.task, status: "running", diff --git a/src/cli/program/register.status-health-sessions.ts b/src/cli/program/register.status-health-sessions.ts index 4ffc8cb9eac..cbb3069591d 100644 --- a/src/cli/program/register.status-health-sessions.ts +++ b/src/cli/program/register.status-health-sessions.ts @@ -224,10 +224,10 @@ export function registerStatusHealthSessionsCommands(program: Command) { .command("tasks") .description("Inspect durable background task state") .option("--json", "Output as JSON", false) - .option("--runtime ", "Filter by runtime (subagent, acp, cli)") + .option("--runtime ", "Filter by kind (subagent, acp, cron, cli)") .option( "--status ", - "Filter by status (accepted, running, done, failed, timed_out, cancelled, lost)", + "Filter by status (queued, running, succeeded, failed, timed_out, cancelled, lost)", ) .action(async (opts) => { await runCommandWithRuntime(defaultRuntime, async () => { @@ -247,10 +247,10 @@ export function registerStatusHealthSessionsCommands(program: Command) { .command("list") .description("List tracked background tasks") .option("--json", "Output as JSON", false) - .option("--runtime ", "Filter by runtime (subagent, acp, cli)") + .option("--runtime ", "Filter by kind (subagent, acp, cron, cli)") .option( "--status ", - "Filter by status (accepted, running, done, failed, timed_out, cancelled, lost)", + "Filter by status (queued, running, succeeded, failed, timed_out, cancelled, lost)", ) .action(async (opts, command) => { const parentOpts = command.parent?.opts() as diff --git a/src/commands/tasks.test.ts b/src/commands/tasks.test.ts index 58fad7080f0..c0391ccd1c9 100644 --- a/src/commands/tasks.test.ts +++ b/src/commands/tasks.test.ts @@ -37,8 +37,8 @@ let tasksCancelCommand: typeof import("./tasks.js").tasksCancelCommand; const taskFixture = { taskId: "task-12345678", - source: "sessions_spawn", runtime: "acp", + sourceId: "run-12345678", requesterSessionKey: "agent:main:main", childSessionKey: "agent:codex:acp:child", runId: "run-12345678", diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts index fb5877c250d..113910faa45 100644 --- a/src/commands/tasks.ts +++ b/src/commands/tasks.ts @@ -38,7 +38,7 @@ function formatTaskStatusCell(status: string, rich: boolean) { if (!rich) { return padded; } - if (status === "done") { + if (status === "succeeded") { return theme.success(padded); } if (status === "failed" || status === "lost" || status === "timed_out") { @@ -53,7 +53,7 @@ function formatTaskStatusCell(status: string, rich: boolean) { function formatTaskRows(tasks: TaskRecord[], rich: boolean) { const header = [ "Task".padEnd(ID_PAD), - "Runtime".padEnd(RUNTIME_PAD), + "Kind".padEnd(RUNTIME_PAD), "Status".padEnd(STATUS_PAD), "Delivery".padEnd(DELIVERY_PAD), "Run".padEnd(RUN_PAD), @@ -151,21 +151,24 @@ export async function tasksShowCommand( const lines = [ "Background task:", `taskId: ${task.taskId}`, - `runtime: ${task.runtime}`, + `kind: ${task.runtime}`, + `sourceId: ${task.sourceId ?? "n/a"}`, `status: ${task.status}`, + `result: ${task.terminalOutcome ?? "n/a"}`, `delivery: ${task.deliveryStatus}`, `notify: ${task.notifyPolicy}`, - `source: ${task.source}`, `requesterSessionKey: ${task.requesterSessionKey}`, `childSessionKey: ${task.childSessionKey ?? "n/a"}`, + `parentTaskId: ${task.parentTaskId ?? "n/a"}`, + `agentId: ${task.agentId ?? "n/a"}`, `runId: ${task.runId ?? "n/a"}`, - `bindingTargetKind: ${task.bindingTargetKind ?? "n/a"}`, `label: ${task.label ?? "n/a"}`, `task: ${task.task}`, `createdAt: ${new Date(task.createdAt).toISOString()}`, `startedAt: ${task.startedAt ? new Date(task.startedAt).toISOString() : "n/a"}`, `endedAt: ${task.endedAt ? new Date(task.endedAt).toISOString() : "n/a"}`, `lastEventAt: ${task.lastEventAt ? new Date(task.lastEventAt).toISOString() : "n/a"}`, + `cleanupAfter: ${task.cleanupAfter ? new Date(task.cleanupAfter).toISOString() : "n/a"}`, ...(task.error ? [`error: ${task.error}`] : []), ...(task.progressSummary ? [`progressSummary: ${task.progressSummary}`] : []), ...(task.terminalSummary ? [`terminalSummary: ${task.terminalSummary}`] : []), @@ -177,10 +180,6 @@ export async function tasksShowCommand( }`, ) : []), - ...(task.streamLogPath ? [`streamLogPath: ${task.streamLogPath}`] : []), - ...(task.transcriptPath ? [`transcriptPath: ${task.transcriptPath}`] : []), - ...(task.agentSessionId ? [`agentSessionId: ${task.agentSessionId}`] : []), - ...(task.backendSessionId ? [`backendSessionId: ${task.backendSessionId}`] : []), ]; for (const line of lines) { runtime.log(line); diff --git a/src/cron/service/ops.test.ts b/src/cron/service/ops.test.ts index 8b2627d01d9..24562e0c55f 100644 --- a/src/cron/service/ops.test.ts +++ b/src/cron/service/ops.test.ts @@ -1,8 +1,11 @@ import fs from "node:fs/promises"; +import path from "node:path"; import { describe, expect, it, vi } from "vitest"; +import * as taskRegistry from "../../tasks/task-registry.js"; +import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js"; import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js"; import type { CronJob } from "../types.js"; -import { start, stop } from "./ops.js"; +import { run, start, stop } from "./ops.js"; import { createCronServiceState } from "./state.js"; const { logger, makeStorePath } = setupCronServiceSuite({ @@ -27,6 +30,40 @@ function createInterruptedMainJob(now: number): CronJob { }; } +function createDueIsolatedJob(now: number): CronJob { + return { + id: "isolated-timeout", + name: "isolated timeout", + enabled: true, + createdAtMs: now - 60_000, + updatedAtMs: now - 60_000, + schedule: { kind: "every", everyMs: 60_000, anchorMs: now - 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "do work" }, + sessionKey: "agent:main:main", + state: { nextRunAtMs: now - 1 }, + }; +} + +function createMissedIsolatedJob(now: number): CronJob { + return { + id: "startup-timeout", + name: "startup timeout", + enabled: true, + createdAtMs: now - 86_400_000, + updatedAtMs: now - 30 * 60_000, + schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC" }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "should timeout" }, + sessionKey: "agent:main:main", + state: { + nextRunAtMs: now - 60_000, + }, + }; +} + describe("cron service ops seam coverage", () => { it("start clears stale running markers, skips startup replay, persists, and arms the timer", async () => { const { storePath } = await makeStorePath(); @@ -77,4 +114,177 @@ describe("cron service ops seam coverage", () => { timeoutSpy.mockRestore(); stop(state); }); + + it("records timed out manual runs as timed_out in the shared task registry", async () => { + const { storePath } = await makeStorePath(); + const stateRoot = path.dirname(path.dirname(storePath)); + const now = Date.parse("2026-03-23T12:00:00.000Z"); + const originalStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = stateRoot; + resetTaskRegistryForTests(); + + await writeCronStoreSnapshot({ + storePath, + jobs: [createDueIsolatedJob(now)], + }); + + const state = createCronServiceState({ + storePath, + cronEnabled: true, + log: logger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => { + throw new Error("cron: job execution timed out"); + }), + }); + + await run(state, "isolated-timeout"); + + expect(findTaskByRunId(`cron:isolated-timeout:${now}`)).toMatchObject({ + runtime: "cron", + status: "timed_out", + sourceId: "isolated-timeout", + }); + + if (originalStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = originalStateDir; + } + resetTaskRegistryForTests(); + }); + + it("keeps manual cron runs progressing when task ledger creation fails", async () => { + const { storePath } = await makeStorePath(); + const now = Date.parse("2026-03-23T12:00:00.000Z"); + + await writeCronStoreSnapshot({ + storePath, + jobs: [createDueIsolatedJob(now)], + }); + + const createTaskRecordSpy = vi + .spyOn(taskRegistry, "createTaskRecord") + .mockImplementation(() => { + throw new Error("disk full"); + }); + + const state = createCronServiceState({ + storePath, + cronEnabled: true, + log: logger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const, summary: "done" })), + }); + + await expect(run(state, "isolated-timeout")).resolves.toEqual({ ok: true, ran: true }); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { + jobs: CronJob[]; + }; + expect(persisted.jobs[0]?.state.runningAtMs).toBeUndefined(); + expect(persisted.jobs[0]?.state.lastStatus).toBe("ok"); + expect(logger.warn).toHaveBeenCalledWith( + expect.objectContaining({ jobId: "isolated-timeout" }), + "cron: failed to create task ledger record", + ); + + createTaskRecordSpy.mockRestore(); + }); + + it("keeps manual cron cleanup progressing when task ledger updates fail", async () => { + const { storePath } = await makeStorePath(); + const stateRoot = path.dirname(path.dirname(storePath)); + const now = Date.parse("2026-03-23T12:00:00.000Z"); + const originalStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = stateRoot; + resetTaskRegistryForTests(); + + await writeCronStoreSnapshot({ + storePath, + jobs: [createDueIsolatedJob(now)], + }); + + const updateTaskRecordSpy = vi + .spyOn(taskRegistry, "updateTaskRecordById") + .mockImplementation(() => { + throw new Error("disk full"); + }); + + const state = createCronServiceState({ + storePath, + cronEnabled: true, + log: logger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const, summary: "done" })), + }); + + await expect(run(state, "isolated-timeout")).resolves.toEqual({ ok: true, ran: true }); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { + jobs: CronJob[]; + }; + expect(persisted.jobs[0]?.state.runningAtMs).toBeUndefined(); + expect(persisted.jobs[0]?.state.lastStatus).toBe("ok"); + expect(logger.warn).toHaveBeenCalledWith( + expect.objectContaining({ jobStatus: "ok" }), + "cron: failed to update task ledger record", + ); + + updateTaskRecordSpy.mockRestore(); + if (originalStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = originalStateDir; + } + resetTaskRegistryForTests(); + }); + + it("records startup catch-up timeouts as timed_out in the shared task registry", async () => { + const { storePath } = await makeStorePath(); + const stateRoot = path.dirname(path.dirname(storePath)); + const now = Date.parse("2026-03-23T12:00:00.000Z"); + const originalStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = stateRoot; + resetTaskRegistryForTests(); + + await writeCronStoreSnapshot({ + storePath, + jobs: [createMissedIsolatedJob(now)], + }); + + const state = createCronServiceState({ + storePath, + cronEnabled: true, + log: logger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => { + throw new Error("cron: job execution timed out"); + }), + }); + + await start(state); + + expect(findTaskByRunId(`cron:startup-timeout:${now}`)).toMatchObject({ + runtime: "cron", + status: "timed_out", + sourceId: "startup-timeout", + }); + + if (originalStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = originalStateDir; + } + resetTaskRegistryForTests(); + stop(state); + }); }); diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index e683b57b1e7..47420bf9558 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -1,5 +1,6 @@ import { enqueueCommandInLane } from "../../process/command-queue.js"; import { CommandLane } from "../../process/lanes.js"; +import { createTaskRecord, updateTaskRecordById } from "../../tasks/task-registry.js"; import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js"; import { normalizeCronCreateDeliveryInput } from "./initial-delivery.js"; import { @@ -20,6 +21,7 @@ import { armTimer, emit, executeJobCoreWithTimeout, + normalizeCronRunErrorText, runMissedJobs, stopTimer, wake, @@ -358,6 +360,7 @@ type PreparedManualRun = ok: true; ran: true; jobId: string; + taskId?: string; startedAt: number; executionJob: CronJob; } @@ -379,6 +382,71 @@ type ManualRunPreflightResult = let nextManualRunId = 1; +function tryCreateManualTaskRecord(params: { + state: CronServiceState; + job: CronJob; + startedAt: number; +}): string | undefined { + try { + return createTaskRecord({ + runtime: "cron", + sourceId: params.job.id, + requesterSessionKey: "", + childSessionKey: params.job.sessionKey, + agentId: params.job.agentId, + runId: `cron:${params.job.id}:${params.startedAt}`, + label: params.job.name, + task: params.job.name || params.job.id, + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + startedAt: params.startedAt, + lastEventAt: params.startedAt, + }).taskId; + } catch (error) { + params.state.deps.log.warn( + { jobId: params.job.id, error }, + "cron: failed to create task ledger record", + ); + return undefined; + } +} + +function tryUpdateManualTaskRecord( + state: CronServiceState, + params: { + taskId?: string; + coreResult: Awaited>; + endedAt: number; + }, +): void { + if (!params.taskId) { + return; + } + try { + updateTaskRecordById(params.taskId, { + status: + params.coreResult.status === "ok" || params.coreResult.status === "skipped" + ? "succeeded" + : normalizeCronRunErrorText(params.coreResult.error) === "cron: job execution timed out" + ? "timed_out" + : "failed", + endedAt: params.endedAt, + lastEventAt: params.endedAt, + error: + params.coreResult.status === "error" + ? normalizeCronRunErrorText(params.coreResult.error) + : undefined, + terminalSummary: params.coreResult.summary ?? undefined, + }); + } catch (error) { + state.deps.log.warn( + { taskId: params.taskId, jobStatus: params.coreResult.status, error }, + "cron: failed to update task ledger record", + ); + } +} + async function inspectManualRunPreflight( state: CronServiceState, id: string, @@ -448,11 +516,17 @@ async function prepareManualRun( // force-reload from disk cannot start the same job concurrently. await persist(state); emit(state, { jobId: job.id, action: "started", runAtMs: preflight.now }); + const taskId = tryCreateManualTaskRecord({ + state, + job, + startedAt: preflight.now, + }); const executionJob = JSON.parse(JSON.stringify(job)) as CronJob; return { ok: true, ran: true, jobId: job.id, + taskId, startedAt: preflight.now, executionJob, } as const; @@ -467,14 +541,20 @@ async function finishPreparedManualRun( const executionJob = prepared.executionJob; const startedAt = prepared.startedAt; const jobId = prepared.jobId; + const taskId = prepared.taskId; let coreResult: Awaited>; try { coreResult = await executeJobCoreWithTimeout(state, executionJob); } catch (err) { - coreResult = { status: "error", error: String(err) }; + coreResult = { status: "error", error: normalizeCronRunErrorText(err) }; } const endedAt = state.deps.nowMs(); + tryUpdateManualTaskRecord(state, { + taskId, + coreResult, + endedAt, + }); await locked(state, async () => { await ensureLoaded(state, { skipRecompute: true }); diff --git a/src/cron/service/timer.test.ts b/src/cron/service/timer.test.ts index c5888aff5db..357a8422515 100644 --- a/src/cron/service/timer.test.ts +++ b/src/cron/service/timer.test.ts @@ -1,9 +1,11 @@ import fs from "node:fs/promises"; -import { describe, expect, it, vi } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/service.test-harness.js"; import { createCronServiceState } from "../../cron/service/state.js"; import { onTimer } from "../../cron/service/timer.js"; import type { CronJob } from "../../cron/types.js"; +import * as taskRegistry from "../../tasks/task-registry.js"; +import { resetTaskRegistryForTests } from "../../tasks/task-registry.js"; const { logger, makeStorePath } = setupCronServiceSuite({ prefix: "cron-service-timer-seam", @@ -25,6 +27,10 @@ function createDueMainJob(params: { now: number; wakeMode: CronJob["wakeMode"] } }; } +afterEach(() => { + resetTaskRegistryForTests(); +}); + describe("cron service timer seam coverage", () => { it("persists the next schedule and hands off next-heartbeat main jobs", async () => { const { storePath } = await makeStorePath(); @@ -73,8 +79,50 @@ describe("cron service timer seam coverage", () => { const delays = timeoutSpy.mock.calls .map(([, delay]) => delay) .filter((delay): delay is number => typeof delay === "number"); - expect(delays).toContain(60_000); + expect(delays.some((delay) => delay > 0)).toBe(true); timeoutSpy.mockRestore(); }); + + it("keeps scheduler progress when task ledger creation fails", async () => { + const { storePath } = await makeStorePath(); + const now = Date.parse("2026-03-23T12:00:00.000Z"); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + await writeCronStoreSnapshot({ + storePath, + jobs: [createDueMainJob({ now, wakeMode: "next-heartbeat" })], + }); + + const createTaskRecordSpy = vi + .spyOn(taskRegistry, "createTaskRecord") + .mockImplementation(() => { + throw new Error("disk full"); + }); + + const state = createCronServiceState({ + storePath, + cronEnabled: true, + log: logger, + nowMs: () => now, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await onTimer(state); + + expect(logger.warn).toHaveBeenCalledWith( + expect.objectContaining({ jobId: "main-heartbeat-job" }), + "cron: failed to create task ledger record", + ); + expect(enqueueSystemEvent).toHaveBeenCalledWith("heartbeat seam tick", { + agentId: undefined, + sessionKey: "agent:main:main", + contextKey: "cron:main-heartbeat-job", + }); + + createTaskRecordSpy.mockRestore(); + }); }); diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 604ec7fdb68..4b78dcd48d2 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -2,6 +2,7 @@ import { resolveFailoverReasonFromError } from "../../agents/failover-error.js"; import type { CronConfig, CronRetryOn } from "../../config/types.cron.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; +import { createTaskRecord, updateTaskRecordById } from "../../tasks/task-registry.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; import { sweepCronRunSessions } from "../session-reaper.js"; import type { @@ -46,6 +47,7 @@ const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour type TimedCronRunOutcome = CronRunOutcome & CronRunTelemetry & { jobId: string; + taskId?: string; delivered?: boolean; deliveryAttempted?: boolean; startedAt: number; @@ -107,6 +109,74 @@ function isAbortError(err: unknown): boolean { } return err.name === "AbortError" || err.message === timeoutErrorMessage(); } + +export function normalizeCronRunErrorText(err: unknown): string { + if (isAbortError(err)) { + return timeoutErrorMessage(); + } + if (typeof err === "string") { + return err === `Error: ${timeoutErrorMessage()}` ? timeoutErrorMessage() : err; + } + return String(err); +} + +function tryCreateCronTaskRecord(params: { + state: CronServiceState; + job: CronJob; + startedAt: number; +}): string | undefined { + try { + return createTaskRecord({ + runtime: "cron", + sourceId: params.job.id, + requesterSessionKey: "", + childSessionKey: params.job.sessionKey, + agentId: params.job.agentId, + runId: `cron:${params.job.id}:${params.startedAt}`, + label: params.job.name, + task: params.job.name || params.job.id, + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + startedAt: params.startedAt, + lastEventAt: params.startedAt, + }).taskId; + } catch (error) { + params.state.deps.log.warn( + { jobId: params.job.id, error }, + "cron: failed to create task ledger record", + ); + return undefined; + } +} + +function tryUpdateCronTaskRecord( + state: CronServiceState, + result: Pick, +): void { + if (!result.taskId) { + return; + } + try { + updateTaskRecordById(result.taskId, { + status: + result.status === "ok" || result.status === "skipped" + ? "succeeded" + : normalizeCronRunErrorText(result.error) === timeoutErrorMessage() + ? "timed_out" + : "failed", + endedAt: result.endedAt, + lastEventAt: result.endedAt, + error: result.status === "error" ? normalizeCronRunErrorText(result.error) : undefined, + terminalSummary: result.summary ?? undefined, + }); + } catch (error) { + state.deps.log.warn( + { taskId: result.taskId, jobStatus: result.status, error }, + "cron: failed to update task ledger record", + ); + } +} /** * Exponential backoff delays (in ms) indexed by consecutive error count. * After the last entry the delay stays constant. @@ -474,6 +544,7 @@ export function applyJobResult( } function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOutcome): void { + tryUpdateCronTaskRecord(state, result); const store = state.store; if (!store) { return; @@ -630,18 +701,26 @@ export async function onTimer(state: CronServiceState) { job.state.runningAtMs = startedAt; emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); const jobTimeoutMs = resolveCronJobTimeoutMs(job); + const taskId = tryCreateCronTaskRecord({ state, job, startedAt }); try { const result = await executeJobCoreWithTimeout(state, job); - return { jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() }; + return { + jobId: id, + taskId, + ...result, + startedAt, + endedAt: state.deps.nowMs(), + }; } catch (err) { - const errorText = isAbortError(err) ? timeoutErrorMessage() : String(err); + const errorText = normalizeCronRunErrorText(err); state.deps.log.warn( { jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null }, `cron: job failed: ${errorText}`, ); return { jobId: id, + taskId, status: "error", error: errorText, startedAt, @@ -926,11 +1005,17 @@ async function runStartupCatchupCandidate( candidate: StartupCatchupCandidate, ): Promise { const startedAt = state.deps.nowMs(); + const taskId = tryCreateCronTaskRecord({ + state, + job: candidate.job, + startedAt, + }); emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt }); try { const result = await executeJobCoreWithTimeout(state, candidate.job); return { jobId: candidate.jobId, + taskId, status: result.status, error: result.error, summary: result.summary, @@ -946,8 +1031,9 @@ async function runStartupCatchupCandidate( } catch (err) { return { jobId: candidate.jobId, + taskId, status: "error", - error: String(err), + error: normalizeCronRunErrorText(err), startedAt, endedAt: state.deps.nowMs(), }; diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index d7ab6d23956..7eb2fade2d3 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -846,7 +846,6 @@ describe("gateway agent handler", () => { ); expect(findTaskByRunId("task-registry-agent-run")).toMatchObject({ - source: "background_cli", runtime: "cli", childSessionKey: "agent:main:main", status: "running", diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 1693f754917..f69a5e81098 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -191,8 +191,8 @@ function dispatchAgentRunFromGateway(params: { if (params.ingressOpts.sessionKey?.trim()) { try { createTaskRecord({ - source: "background_cli", runtime: "cli", + sourceId: params.runId, requesterSessionKey: params.ingressOpts.sessionKey, requesterOrigin: normalizeDeliveryContext({ channel: params.ingressOpts.channel, @@ -202,7 +202,6 @@ function dispatchAgentRunFromGateway(params: { }), childSessionKey: params.ingressOpts.sessionKey, runId: params.runId, - bindingTargetKind: "session", task: params.ingressOpts.message, status: "running", deliveryStatus: "not_applicable", diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index 2685761e0f8..fc476fe42d8 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -33,7 +33,7 @@ function findSessionEntryByKey(store: Record, sessionKey: strin } function isActiveTask(task: TaskRecord): boolean { - return task.status === "accepted" || task.status === "running"; + return task.status === "queued" || task.status === "running"; } function isTerminalTask(task: TaskRecord): boolean { @@ -82,6 +82,9 @@ function shouldPruneTerminalTask(task: TaskRecord, now: number): boolean { if (!isTerminalTask(task)) { return false; } + if (typeof task.cleanupAfter === "number") { + return now >= task.cleanupAfter; + } const terminalAt = task.endedAt ?? task.lastEventAt ?? task.createdAt; return now - terminalAt >= TASK_RETENTION_MS; } diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index c87fa1abf5b..edb98a204c2 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -11,8 +11,8 @@ import type { TaskRecord } from "./task-registry.types.js"; function createStoredTask(): TaskRecord { return { taskId: "task-restored", - source: "sessions_spawn", runtime: "acp", + sourceId: "run-restored", requesterSessionKey: "agent:main:main", childSessionKey: "agent:codex:acp:restored", runId: "run-restored", @@ -48,7 +48,6 @@ describe("task-registry store runtime", () => { expect(loadSnapshot).toHaveBeenCalledTimes(1); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", childSessionKey: "agent:codex:acp:new", @@ -80,7 +79,6 @@ describe("task-registry store runtime", () => { expect(findTaskByRunId("run-restored")).toBeTruthy(); const created = createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", childSessionKey: "agent:codex:acp:new", diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 46b4e60db2c..62ac081a84c 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -107,7 +107,6 @@ describe("task-registry", () => { resetTaskRegistryForTests(); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", childSessionKey: "agent:main:acp:child", @@ -136,7 +135,7 @@ describe("task-registry", () => { expect(findTaskByRunId("run-1")).toMatchObject({ runtime: "acp", - status: "done", + status: "succeeded", endedAt: 250, }); }); @@ -153,7 +152,6 @@ describe("task-registry", () => { }); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -180,7 +178,7 @@ describe("task-registry", () => { await waitForAssertion(() => expect(findTaskByRunId("run-delivery")).toMatchObject({ - status: "done", + status: "succeeded", deliveryStatus: "delivered", }), ); @@ -208,7 +206,6 @@ describe("task-registry", () => { hoisted.sendMessageMock.mockRejectedValueOnce(new Error("telegram unavailable")); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -255,7 +252,6 @@ describe("task-registry", () => { hoisted.sendMessageMock.mockRejectedValueOnce(new Error("telegram unavailable")); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -265,7 +261,7 @@ describe("task-registry", () => { childSessionKey: "agent:main:acp:child", runId: "run-delivery-blocked", task: "Port the repo changes", - status: "done", + status: "succeeded", deliveryStatus: "pending", terminalOutcome: "blocked", terminalSummary: "Writable session or apply_patch authorization required.", @@ -273,7 +269,7 @@ describe("task-registry", () => { await waitForAssertion(() => expect(findTaskByRunId("run-delivery-blocked")).toMatchObject({ - status: "done", + status: "succeeded", deliveryStatus: "failed", terminalOutcome: "blocked", }), @@ -292,7 +288,6 @@ describe("task-registry", () => { resetTaskRegistryForTests(); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", childSessionKey: "agent:main:acp:child", @@ -314,7 +309,7 @@ describe("task-registry", () => { await waitForAssertion(() => expect(findTaskByRunId("run-session-queued")).toMatchObject({ - status: "done", + status: "succeeded", deliveryStatus: "session_queued", }), ); @@ -331,13 +326,12 @@ describe("task-registry", () => { resetTaskRegistryForTests(); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", childSessionKey: "agent:main:acp:child", runId: "run-session-blocked", task: "Port the repo changes", - status: "done", + status: "succeeded", deliveryStatus: "pending", terminalOutcome: "blocked", terminalSummary: "Writable session or apply_patch authorization required.", @@ -345,7 +339,7 @@ describe("task-registry", () => { await waitForAssertion(() => expect(findTaskByRunId("run-session-blocked")).toMatchObject({ - status: "done", + status: "succeeded", deliveryStatus: "session_queued", }), ); @@ -369,7 +363,6 @@ describe("task-registry", () => { }); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -420,7 +413,6 @@ describe("task-registry", () => { }); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -430,7 +422,7 @@ describe("task-registry", () => { childSessionKey: "agent:main:acp:child", runId: "run-blocked-outcome", task: "Port the repo changes", - status: "done", + status: "succeeded", deliveryStatus: "pending", terminalOutcome: "blocked", terminalSummary: "Writable session or apply_patch authorization required.", @@ -462,7 +454,6 @@ describe("task-registry", () => { }); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -472,7 +463,7 @@ describe("task-registry", () => { childSessionKey: "agent:main:acp:child", runId: "run-succeeded-outcome", task: "Create the file and verify it", - status: "done", + status: "succeeded", deliveryStatus: "pending", terminalSummary: "Created /tmp/file.txt and verified contents.", terminalOutcome: "succeeded", @@ -497,7 +488,6 @@ describe("task-registry", () => { resetTaskRegistryForTests(); createTaskRecord({ - source: "background_cli", runtime: "cli", requesterSessionKey: "agent:codex:acp:child", childSessionKey: "agent:codex:acp:child", @@ -508,7 +498,6 @@ describe("task-registry", () => { }); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", childSessionKey: "agent:codex:acp:child", @@ -520,7 +509,6 @@ describe("task-registry", () => { expect(listTaskRecords().filter((task) => task.runId === "run-shared")).toHaveLength(2); expect(findTaskByRunId("run-shared")).toMatchObject({ - source: "sessions_spawn", runtime: "acp", task: "Spawn ACP child", }); @@ -538,7 +526,6 @@ describe("task-registry", () => { }); const directTask = createTaskRecord({ - source: "unknown", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -548,11 +535,10 @@ describe("task-registry", () => { childSessionKey: "agent:main:acp:child", runId: "run-shared-delivery", task: "Direct ACP child", - status: "done", + status: "succeeded", deliveryStatus: "pending", }); const spawnedTask = createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -562,7 +548,7 @@ describe("task-registry", () => { childSessionKey: "agent:main:acp:child", runId: "run-shared-delivery", task: "Spawn ACP child", - status: "done", + status: "succeeded", deliveryStatus: "pending", }); @@ -575,7 +561,6 @@ describe("task-registry", () => { ); expect(findTaskByRunId("run-shared-delivery")).toMatchObject({ taskId: directTask.taskId, - source: "sessions_spawn", deliveryStatus: "delivered", }); }); @@ -587,7 +572,6 @@ describe("task-registry", () => { resetTaskRegistryForTests(); const spawnedTask = createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -599,11 +583,9 @@ describe("task-registry", () => { task: "Spawn ACP child", status: "running", deliveryStatus: "pending", - streamLogPath: "/tmp/stream.jsonl", }); const directTask = createTaskRecord({ - source: "unknown", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -619,9 +601,7 @@ describe("task-registry", () => { expect(directTask.taskId).toBe(spawnedTask.taskId); expect(listTaskRecords().filter((task) => task.runId === "run-collapse")).toHaveLength(1); expect(findTaskByRunId("run-collapse")).toMatchObject({ - source: "sessions_spawn", task: "Spawn ACP child", - streamLogPath: "/tmp/stream.jsonl", }); }); }); @@ -637,7 +617,6 @@ describe("task-registry", () => { }); const task = createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -647,7 +626,7 @@ describe("task-registry", () => { childSessionKey: "agent:main:acp:child", runId: "run-racing-delivery", task: "Investigate issue", - status: "done", + status: "succeeded", deliveryStatus: "pending", terminalOutcome: "blocked", terminalSummary: "Writable session or apply_patch authorization required.", @@ -660,9 +639,9 @@ describe("task-registry", () => { expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1); expect(hoisted.sendMessageMock).toHaveBeenCalledWith( expect.objectContaining({ - idempotencyKey: `task-terminal:${task.taskId}:done:blocked`, + idempotencyKey: `task-terminal:${task.taskId}:succeeded:blocked`, mirror: expect.objectContaining({ - idempotencyKey: `task-terminal:${task.taskId}:done:blocked`, + idempotencyKey: `task-terminal:${task.taskId}:succeeded:blocked`, }), }), ); @@ -678,7 +657,6 @@ describe("task-registry", () => { resetTaskRegistryForTests(); const task = createTaskRecord({ - source: "sessions_spawn", runtime: "subagent", requesterSessionKey: "agent:main:main", childSessionKey: "agent:main:subagent:child", @@ -706,7 +684,6 @@ describe("task-registry", () => { resetTaskRegistryForTests(); const task = createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", childSessionKey: "agent:main:acp:missing", @@ -738,13 +715,12 @@ describe("task-registry", () => { resetTaskRegistryForTests(); const task = createTaskRecord({ - source: "background_cli", runtime: "cli", requesterSessionKey: "agent:main:main", childSessionKey: "agent:main:main", runId: "run-prune", task: "Old completed task", - status: "done", + status: "succeeded", deliveryStatus: "not_applicable", startedAt: Date.now() - 9 * 24 * 60 * 60_000, }); @@ -772,7 +748,6 @@ describe("task-registry", () => { }); const task = createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -782,7 +757,7 @@ describe("task-registry", () => { childSessionKey: "agent:codex:acp:child", runId: "run-state-change", task: "Investigate issue", - status: "accepted", + status: "queued", notifyPolicy: "done_only", }); @@ -838,7 +813,6 @@ describe("task-registry", () => { vi.useFakeTimers(); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -911,7 +885,6 @@ describe("task-registry", () => { }); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -963,7 +936,6 @@ describe("task-registry", () => { vi.useFakeTimers(); createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -1021,7 +993,6 @@ describe("task-registry", () => { hoisted.cancelSessionMock.mockResolvedValue(undefined); const task = registry.createTaskRecord({ - source: "sessions_spawn", runtime: "acp", requesterSessionKey: "agent:main:main", requesterOrigin: { @@ -1079,7 +1050,6 @@ describe("task-registry", () => { }); const task = registry.createTaskRecord({ - source: "sessions_spawn", runtime: "subagent", requesterSessionKey: "agent:main:main", requesterOrigin: { diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 2e8f1f91e2e..767550fec1c 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -16,7 +16,6 @@ import { type TaskRegistryHookEvent, } from "./task-registry.store.js"; import type { - TaskBindingTargetKind, TaskDeliveryStatus, TaskEventKind, TaskEventRecord, @@ -24,12 +23,12 @@ import type { TaskRecord, TaskRegistrySnapshot, TaskRuntime, - TaskSource, TaskStatus, TaskTerminalOutcome, } from "./task-registry.types.js"; const log = createSubsystemLogger("tasks/registry"); +const DEFAULT_TASK_RETENTION_MS = 7 * 24 * 60 * 60_000; const tasks = new Map(); const taskIdsByRunId = new Map>(); @@ -94,12 +93,35 @@ function normalizeTaskSummary(value: string | null | undefined): string | undefi return normalized || undefined; } +function normalizeTaskStatus(value: TaskStatus | null | undefined): TaskStatus { + return value === "running" || + value === "queued" || + value === "succeeded" || + value === "failed" || + value === "timed_out" || + value === "cancelled" || + value === "lost" + ? value + : "queued"; +} + function normalizeTaskTerminalOutcome( value: TaskTerminalOutcome | null | undefined, ): TaskTerminalOutcome | undefined { return value === "succeeded" || value === "blocked" ? value : undefined; } +function resolveTaskTerminalOutcome(params: { + status: TaskStatus; + terminalOutcome?: TaskTerminalOutcome | null; +}): TaskTerminalOutcome | undefined { + const normalized = normalizeTaskTerminalOutcome(params.terminalOutcome); + if (normalized) { + return normalized; + } + return params.status === "succeeded" ? "succeeded" : undefined; +} + const TASK_RECENT_EVENT_LIMIT = 12; function appendTaskEvent( @@ -157,10 +179,8 @@ function getTasksByRunId(runId: string): TaskRecord[] { } function taskLookupPriority(task: TaskRecord): number { - const sourcePriority = - task.source === "sessions_spawn" ? 0 : task.source === "background_cli" ? 1 : 2; const runtimePriority = task.runtime === "cli" ? 1 : 0; - return sourcePriority * 10 + runtimePriority; + return runtimePriority; } function pickPreferredRunIdTask(matches: TaskRecord[]): TaskRecord | undefined { @@ -178,12 +198,10 @@ function normalizeComparableText(value: string | undefined): string { } function findExistingTaskForCreate(params: { - source: TaskSource; runtime: TaskRuntime; requesterSessionKey: string; childSessionKey?: string; runId?: string; - bindingTargetKind?: TaskBindingTargetKind; label?: string; task: string; }): TaskRecord | undefined { @@ -191,14 +209,11 @@ function findExistingTaskForCreate(params: { const exact = runId ? getTasksByRunId(runId).find( (task) => - task.source === params.source && task.runtime === params.runtime && normalizeComparableText(task.requesterSessionKey) === normalizeComparableText(params.requesterSessionKey) && normalizeComparableText(task.childSessionKey) === normalizeComparableText(params.childSessionKey) && - normalizeComparableText(task.bindingTargetKind) === - normalizeComparableText(params.bindingTargetKind) && normalizeComparableText(task.label) === normalizeComparableText(params.label) && normalizeComparableText(task.task) === normalizeComparableText(params.task), ) @@ -223,43 +238,36 @@ function findExistingTaskForCreate(params: { return pickPreferredRunIdTask(siblingMatches); } -function sourceUpgradePriority(source: TaskSource): number { - return source === "sessions_spawn" ? 0 : source === "background_cli" ? 1 : 2; -} - function mergeExistingTaskForCreate( existing: TaskRecord, params: { - source: TaskSource; requesterOrigin?: TaskRecord["requesterOrigin"]; - bindingTargetKind?: TaskBindingTargetKind; + sourceId?: string; + parentTaskId?: string; + agentId?: string; label?: string; task: string; deliveryStatus?: TaskDeliveryStatus; notifyPolicy?: TaskNotifyPolicy; - streamLogPath?: string; }, ): TaskRecord { const patch: Partial = {}; - if (sourceUpgradePriority(params.source) < sourceUpgradePriority(existing.source)) { - patch.source = params.source; - } const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin); if (requesterOrigin && !existing.requesterOrigin) { patch.requesterOrigin = requesterOrigin; } - if (params.bindingTargetKind && !existing.bindingTargetKind) { - patch.bindingTargetKind = params.bindingTargetKind; + if (params.sourceId?.trim() && !existing.sourceId?.trim()) { + patch.sourceId = params.sourceId.trim(); + } + if (params.parentTaskId?.trim() && !existing.parentTaskId?.trim()) { + patch.parentTaskId = params.parentTaskId.trim(); + } + if (params.agentId?.trim() && !existing.agentId?.trim()) { + patch.agentId = params.agentId.trim(); } if (params.label?.trim() && !existing.label?.trim()) { patch.label = params.label.trim(); } - if (params.streamLogPath?.trim() && !existing.streamLogPath?.trim()) { - patch.streamLogPath = params.streamLogPath.trim(); - } - if (params.source === "sessions_spawn" && existing.source !== "sessions_spawn") { - patch.task = params.task; - } if (params.deliveryStatus === "pending" && existing.deliveryStatus !== "delivered") { patch.deliveryStatus = "pending"; } @@ -278,7 +286,7 @@ function mergeExistingTaskForCreate( } function taskTerminalDeliveryIdempotencyKey(task: TaskRecord): string { - const outcome = task.status === "done" ? (task.terminalOutcome ?? "default") : "default"; + const outcome = task.status === "succeeded" ? (task.terminalOutcome ?? "default") : "default"; return `task-terminal:${task.taskId}:${task.status}:${outcome}`; } @@ -310,12 +318,26 @@ export function ensureTaskRegistryReady() { ensureListener(); } +function isTerminalTaskStatus(status: TaskStatus): boolean { + return ( + status === "succeeded" || + status === "failed" || + status === "timed_out" || + status === "cancelled" || + status === "lost" + ); +} + function updateTask(taskId: string, patch: Partial): TaskRecord | null { const current = tasks.get(taskId); if (!current) { return null; } const next = { ...current, ...patch }; + if (isTerminalTaskStatus(next.status) && typeof next.cleanupAfter !== "number") { + const terminalAt = next.endedAt ?? next.lastEventAt ?? Date.now(); + next.cleanupAfter = terminalAt + DEFAULT_TASK_RETENTION_MS; + } tasks.set(taskId, next); if (patch.runId && patch.runId !== current.runId) { rebuildRunIdIndex(); @@ -341,7 +363,7 @@ function formatTaskTerminalEvent(task: TaskRecord): string { : task.task.trim() || "Background task"); const runLabel = task.runId ? ` (run ${task.runId.slice(0, 8)})` : ""; const summary = task.terminalSummary?.trim(); - if (task.status === "done") { + if (task.status === "succeeded") { if (task.terminalOutcome === "blocked") { return summary ? `Background task blocked: ${title}${runLabel}. ${summary}` @@ -391,7 +413,7 @@ function queueTaskSystemEvent(task: TaskRecord, text: string) { } function queueBlockedTaskFollowup(task: TaskRecord) { - if (task.status !== "done" || task.terminalOutcome !== "blocked") { + if (task.status !== "succeeded" || task.terminalOutcome !== "blocked") { return false; } const requesterSessionKey = task.requesterSessionKey.trim(); @@ -444,7 +466,7 @@ function shouldAutoDeliverTaskUpdate(task: TaskRecord): boolean { return false; } if ( - task.status !== "done" && + task.status !== "succeeded" && task.status !== "failed" && task.status !== "timed_out" && task.status !== "lost" && @@ -459,7 +481,7 @@ function shouldAutoDeliverTaskStateChange(task: TaskRecord): boolean { return ( task.notifyPolicy === "state_changes" && task.deliveryStatus === "pending" && - task.status !== "done" && + task.status !== "succeeded" && task.status !== "failed" && task.status !== "timed_out" && task.status !== "lost" && @@ -688,7 +710,7 @@ function ensureListener() { if (phase === "start") { patch.status = "running"; } else if (phase === "end") { - patch.status = evt.data?.aborted === true ? "timed_out" : "done"; + patch.status = evt.data?.aborted === true ? "timed_out" : "succeeded"; patch.endedAt = endedAt ?? now; } else if (phase === "error") { patch.status = "failed"; @@ -705,7 +727,7 @@ function ensureListener() { summary: patch.status === "failed" ? (patch.error ?? current.error) - : patch.status === "done" + : patch.status === "succeeded" ? current.terminalSummary : undefined, }); @@ -720,13 +742,14 @@ function ensureListener() { } export function createTaskRecord(params: { - source: TaskSource; runtime: TaskRuntime; + sourceId?: string; requesterSessionKey: string; requesterOrigin?: TaskRecord["requesterOrigin"]; childSessionKey?: string; + parentTaskId?: string; + agentId?: string; runId?: string; - bindingTargetKind?: TaskBindingTargetKind; label?: string; task: string; status?: TaskStatus; @@ -734,14 +757,10 @@ export function createTaskRecord(params: { notifyPolicy?: TaskNotifyPolicy; startedAt?: number; lastEventAt?: number; + cleanupAfter?: number; progressSummary?: string | null; terminalSummary?: string | null; terminalOutcome?: TaskTerminalOutcome | null; - transcriptPath?: string; - streamLogPath?: string; - backend?: string; - agentSessionId?: string; - backendSessionId?: string; }): TaskRecord { ensureTaskRegistryReady(); const existing = findExistingTaskForCreate(params); @@ -750,7 +769,7 @@ export function createTaskRecord(params: { } const now = Date.now(); const taskId = crypto.randomUUID(); - const status = params.status ?? "accepted"; + const status = normalizeTaskStatus(params.status); const deliveryStatus = params.deliveryStatus ?? ensureDeliveryStatus(params.requesterSessionKey); const notifyPolicy = ensureNotifyPolicy({ notifyPolicy: params.notifyPolicy, @@ -760,13 +779,14 @@ export function createTaskRecord(params: { const lastEventAt = params.lastEventAt ?? params.startedAt ?? now; const record: TaskRecord = { taskId, - source: params.source, runtime: params.runtime, + sourceId: params.sourceId?.trim() || undefined, requesterSessionKey: params.requesterSessionKey, requesterOrigin: normalizeDeliveryContext(params.requesterOrigin), childSessionKey: params.childSessionKey, + parentTaskId: params.parentTaskId?.trim() || undefined, + agentId: params.agentId?.trim() || undefined, runId: params.runId?.trim() || undefined, - bindingTargetKind: params.bindingTargetKind, label: params.label?.trim() || undefined, task: params.task, status, @@ -775,13 +795,16 @@ export function createTaskRecord(params: { createdAt: now, startedAt: params.startedAt, lastEventAt, + cleanupAfter: params.cleanupAfter, progressSummary: normalizeTaskSummary(params.progressSummary), terminalSummary: normalizeTaskSummary(params.terminalSummary), - terminalOutcome: normalizeTaskTerminalOutcome(params.terminalOutcome), + terminalOutcome: resolveTaskTerminalOutcome({ + status, + terminalOutcome: params.terminalOutcome, + }), recentEvents: appendTaskEvent( { taskId, - source: params.source, runtime: params.runtime, requesterSessionKey: params.requesterSessionKey, task: params.task, @@ -795,12 +818,11 @@ export function createTaskRecord(params: { kind: status, }, ), - transcriptPath: params.transcriptPath, - streamLogPath: params.streamLogPath, - backend: params.backend, - agentSessionId: params.agentSessionId, - backendSessionId: params.backendSessionId, }; + if (isTerminalTaskStatus(record.status) && typeof record.cleanupAfter !== "number") { + record.cleanupAfter = + (record.endedAt ?? record.lastEventAt ?? record.createdAt) + DEFAULT_TASK_RETENTION_MS; + } tasks.set(taskId, record); addRunIdIndex(taskId, record.runId); persistTaskRegistry(); @@ -808,6 +830,9 @@ export function createTaskRecord(params: { kind: "upserted", task: cloneTaskRecord(record), })); + if (isTerminalTaskStatus(record.status)) { + void maybeDeliverTaskTerminalUpdate(taskId); + } return cloneTaskRecord(record); } @@ -835,10 +860,10 @@ export function updateTaskStateByRunId(params: { continue; } const patch: Partial = {}; - const nextStatus = params.status ?? current.status; + const nextStatus = params.status ? normalizeTaskStatus(params.status) : current.status; const eventAt = params.lastEventAt ?? params.endedAt ?? Date.now(); if (params.status) { - patch.status = params.status; + patch.status = normalizeTaskStatus(params.status); } if (params.startedAt != null) { patch.startedAt = params.startedAt; @@ -859,13 +884,16 @@ export function updateTaskStateByRunId(params: { patch.terminalSummary = normalizeTaskSummary(params.terminalSummary); } if (params.terminalOutcome !== undefined) { - patch.terminalOutcome = normalizeTaskTerminalOutcome(params.terminalOutcome); + patch.terminalOutcome = resolveTaskTerminalOutcome({ + status: nextStatus, + terminalOutcome: params.terminalOutcome, + }); } const eventSummary = normalizeTaskSummary(params.eventSummary) ?? (nextStatus === "failed" ? normalizeTaskSummary(params.error ?? current.error) - : nextStatus === "done" + : nextStatus === "succeeded" ? normalizeTaskSummary(params.terminalSummary ?? current.terminalSummary) : undefined); const shouldAppendEvent = @@ -874,7 +902,10 @@ export function updateTaskStateByRunId(params: { if (shouldAppendEvent) { patch.recentEvents = appendTaskEvent(current, { at: eventAt, - kind: params.status && params.status !== current.status ? params.status : "progress", + kind: + params.status && normalizeTaskStatus(params.status) !== current.status + ? normalizeTaskStatus(params.status) + : "progress", summary: eventSummary, }); } @@ -921,7 +952,7 @@ export async function cancelTaskById(params: { return { found: false, cancelled: false, reason: "Task not found." }; } if ( - task.status === "done" || + task.status === "succeeded" || task.status === "failed" || task.status === "timed_out" || task.status === "lost" || diff --git a/src/tasks/task-registry.types.ts b/src/tasks/task-registry.types.ts index e659728312e..b2b46ef97d8 100644 --- a/src/tasks/task-registry.types.ts +++ b/src/tasks/task-registry.types.ts @@ -1,11 +1,11 @@ import type { DeliveryContext } from "../utils/delivery-context.js"; -export type TaskRuntime = "subagent" | "acp" | "cli"; +export type TaskRuntime = "subagent" | "acp" | "cli" | "cron"; export type TaskStatus = - | "accepted" + | "queued" | "running" - | "done" + | "succeeded" | "failed" | "timed_out" | "cancelled" @@ -23,10 +23,6 @@ export type TaskNotifyPolicy = "done_only" | "state_changes" | "silent"; export type TaskTerminalOutcome = "succeeded" | "blocked"; -export type TaskBindingTargetKind = "subagent" | "session"; - -export type TaskSource = "sessions_spawn" | "background_cli" | "unknown"; - export type TaskEventKind = TaskStatus | "progress"; export type TaskEventRecord = { @@ -37,13 +33,14 @@ export type TaskEventRecord = { export type TaskRecord = { taskId: string; - source: TaskSource; runtime: TaskRuntime; + sourceId?: string; requesterSessionKey: string; requesterOrigin?: DeliveryContext; childSessionKey?: string; + parentTaskId?: string; + agentId?: string; runId?: string; - bindingTargetKind?: TaskBindingTargetKind; label?: string; task: string; status: TaskStatus; @@ -53,17 +50,13 @@ export type TaskRecord = { startedAt?: number; endedAt?: number; lastEventAt?: number; + cleanupAfter?: number; error?: string; progressSummary?: string; terminalSummary?: string; terminalOutcome?: TaskTerminalOutcome; recentEvents?: TaskEventRecord[]; lastNotifiedEventAt?: number; - transcriptPath?: string; - streamLogPath?: string; - backend?: string; - agentSessionId?: string; - backendSessionId?: string; }; export type TaskRegistrySnapshot = {