From fa5827079f7236eaf2715f76c4d169784b2881bc Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 30 Mar 2026 13:03:05 +0900 Subject: [PATCH] refactor(tasks): split delivery state from task runs --- src/acp/control-plane/manager.core.ts | 56 +++- src/agents/acp-spawn-parent-stream.ts | 8 +- src/agents/subagent-registry-lifecycle.ts | 54 ++-- src/commands/tasks.test.ts | 8 - src/commands/tasks.ts | 8 - src/tasks/task-registry.store.sqlite.ts | 114 ++++++--- src/tasks/task-registry.store.test.ts | 18 +- src/tasks/task-registry.store.ts | 25 +- src/tasks/task-registry.test.ts | 27 +- src/tasks/task-registry.ts | 298 +++++++++++++++------- src/tasks/task-registry.types.ts | 10 +- 11 files changed, 419 insertions(+), 207 deletions(-) diff --git a/src/acp/control-plane/manager.core.ts b/src/acp/control-plane/manager.core.ts index 18c532919b3..a1f8173077e 100644 --- a/src/acp/control-plane/manager.core.ts +++ b/src/acp/control-plane/manager.core.ts @@ -3,7 +3,11 @@ import type { OpenClawConfig } from "../../config/config.js"; import { logVerbose } from "../../globals.js"; import { normalizeAgentId } from "../../routing/session-key.js"; import { isAcpSessionKey } from "../../sessions/session-key-utils.js"; -import { createTaskRecord, updateTaskStateByRunId } from "../../tasks/task-registry.js"; +import { + createTaskRecord, + markTaskRunningByRunId, + markTaskTerminalByRunId, +} from "../../tasks/task-registry.js"; import type { DeliveryContext } from "../../utils/delivery-context.js"; import { AcpRuntimeError, @@ -144,8 +148,6 @@ type BackgroundTaskContext = { task: string; }; -type BackgroundTaskStatePatch = Omit[0], "runId">; - export class AcpSessionManager { private readonly actorQueue = new SessionActorQueue(); private readonly actorTailBySession = this.actorQueue.getTailMapForTesting(); @@ -786,8 +788,7 @@ export class AcpSessionManager { ); } if (taskContext) { - this.updateBackgroundTaskState(taskContext.runId, { - status: "running", + this.markBackgroundTaskRunning(taskContext.runId, { lastEventAt: Date.now(), progressSummary: taskProgressSummary || null, }); @@ -832,7 +833,7 @@ export class AcpSessionManager { }); if (taskContext) { const terminalResult = resolveBackgroundTaskTerminalResult(taskProgressSummary); - this.updateBackgroundTaskState(taskContext.runId, { + this.markBackgroundTaskTerminal(taskContext.runId, { status: "succeeded", endedAt: Date.now(), lastEventAt: Date.now(), @@ -871,7 +872,7 @@ export class AcpSessionManager { errorCode: acpError.code, }); if (taskContext) { - this.updateBackgroundTaskState(taskContext.runId, { + this.markBackgroundTaskTerminal(taskContext.runId, { status: resolveBackgroundTaskFailureStatus(acpError), endedAt: Date.now(), lastEventAt: Date.now(), @@ -1898,11 +1899,46 @@ export class AcpSessionManager { } } - private updateBackgroundTaskState(runId: string, patch: BackgroundTaskStatePatch): void { + private markBackgroundTaskRunning( + runId: string, + params: { + lastEventAt?: number; + progressSummary?: string | null; + }, + ): void { try { - updateTaskStateByRunId({ - ...patch, + markTaskRunningByRunId({ runId, + lastEventAt: params.lastEventAt, + progressSummary: params.progressSummary, + }); + } catch (error) { + logVerbose(`acp-manager: failed updating background task for ${runId}: ${String(error)}`); + } + } + + private markBackgroundTaskTerminal( + runId: string, + params: { + status: "succeeded" | "failed" | "timed_out"; + endedAt: number; + lastEventAt?: number; + error?: string; + progressSummary?: string | null; + terminalSummary?: string | null; + terminalOutcome?: "succeeded" | "blocked" | null; + }, + ): void { + try { + markTaskTerminalByRunId({ + runId, + status: params.status, + endedAt: params.endedAt, + lastEventAt: params.lastEventAt, + error: params.error, + progressSummary: params.progressSummary, + terminalSummary: params.terminalSummary, + terminalOutcome: params.terminalOutcome, }); } catch (error) { logVerbose(`acp-manager: failed updating background task for ${runId}: ${String(error)}`); diff --git a/src/agents/acp-spawn-parent-stream.ts b/src/agents/acp-spawn-parent-stream.ts index ee4deeb1f28..55943c163cc 100644 --- a/src/agents/acp-spawn-parent-stream.ts +++ b/src/agents/acp-spawn-parent-stream.ts @@ -6,7 +6,7 @@ import { onAgentEvent } from "../infra/agent-events.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { scopedHeartbeatWakeOptions } from "../routing/session-key.js"; -import { updateTaskStateByRunId } from "../tasks/task-registry.js"; +import { recordTaskProgressByRunId } from "../tasks/task-registry.js"; const DEFAULT_STREAM_FLUSH_MS = 2_500; const DEFAULT_NO_OUTPUT_NOTICE_MS = 60_000; @@ -204,7 +204,7 @@ export function startAcpSpawnParentStreamRelay(params: { wake(); }; const emitStartNotice = () => { - updateTaskStateByRunId({ + recordTaskProgressByRunId({ runId, lastEventAt: Date.now(), eventSummary: "Started.", @@ -271,7 +271,7 @@ export function startAcpSpawnParentStreamRelay(params: { return; } stallNotified = true; - updateTaskStateByRunId({ + recordTaskProgressByRunId({ runId, lastEventAt: Date.now(), eventSummary: `No output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for input.`, @@ -317,7 +317,7 @@ export function startAcpSpawnParentStreamRelay(params: { if (stallNotified) { stallNotified = false; - updateTaskStateByRunId({ + recordTaskProgressByRunId({ runId, lastEventAt: Date.now(), eventSummary: "Resumed output.", diff --git a/src/agents/subagent-registry-lifecycle.ts b/src/agents/subagent-registry-lifecycle.ts index 437fde76c83..6080a3cb5ca 100644 --- a/src/agents/subagent-registry-lifecycle.ts +++ b/src/agents/subagent-registry-lifecycle.ts @@ -1,7 +1,10 @@ import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import { defaultRuntime } from "../runtime.js"; import { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js"; -import { updateTaskDeliveryByRunId, updateTaskStateByRunId } from "../tasks/task-registry.js"; +import { + markTaskTerminalByRunId, + setTaskRunDeliveryStatusByRunId, +} from "../tasks/task-registry.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.js"; import { captureSubagentCompletionReply, @@ -154,7 +157,7 @@ export function createSubagentRegistryLifecycleController(params: { entry: SubagentRunRecord; reason: "retry-limit" | "expiry"; }) => { - updateTaskDeliveryByRunId({ + setTaskRunDeliveryStatusByRunId({ runId: giveUpParams.runId, deliveryStatus: "failed", }); @@ -270,7 +273,7 @@ export function createSubagentRegistryLifecycleController(params: { return; } if (didAnnounce) { - updateTaskDeliveryByRunId({ + setTaskRunDeliveryStatusByRunId({ runId, deliveryStatus: "delivered", }); @@ -326,7 +329,7 @@ export function createSubagentRegistryLifecycleController(params: { } if (deferredDecision.kind === "give-up") { - updateTaskDeliveryByRunId({ + setTaskRunDeliveryStatusByRunId({ runId, deliveryStatus: "failed", }); @@ -377,26 +380,27 @@ export function createSubagentRegistryLifecycleController(params: { }); }; - void params.runSubagentAnnounceFlow({ - childSessionKey: entry.childSessionKey, - childRunId: entry.runId, - requesterSessionKey: entry.requesterSessionKey, - requesterOrigin, - requesterDisplayKey: entry.requesterDisplayKey, - task: entry.task, - timeoutMs: params.subagentAnnounceTimeoutMs, - cleanup: entry.cleanup, - roundOneReply: entry.frozenResultText ?? undefined, - fallbackReply: entry.fallbackFrozenResultText ?? undefined, - waitForCompletion: false, - startedAt: entry.startedAt, - endedAt: entry.endedAt, - label: entry.label, - outcome: entry.outcome, - spawnMode: entry.spawnMode, - expectsCompletionMessage: entry.expectsCompletionMessage, - wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true, - }) + void params + .runSubagentAnnounceFlow({ + childSessionKey: entry.childSessionKey, + childRunId: entry.runId, + requesterSessionKey: entry.requesterSessionKey, + requesterOrigin, + requesterDisplayKey: entry.requesterDisplayKey, + task: entry.task, + timeoutMs: params.subagentAnnounceTimeoutMs, + cleanup: entry.cleanup, + roundOneReply: entry.frozenResultText ?? undefined, + fallbackReply: entry.fallbackFrozenResultText ?? undefined, + waitForCompletion: false, + startedAt: entry.startedAt, + endedAt: entry.endedAt, + label: entry.label, + outcome: entry.outcome, + spawnMode: entry.spawnMode, + expectsCompletionMessage: entry.expectsCompletionMessage, + wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true, + }) .then((didAnnounce) => { finalizeAnnounceCleanup(didAnnounce); }) @@ -458,7 +462,7 @@ export function createSubagentRegistryLifecycleController(params: { if (mutated) { params.persist(); } - updateTaskStateByRunId({ + markTaskTerminalByRunId({ runId: entry.runId, status: completeParams.outcome.status === "ok" diff --git a/src/commands/tasks.test.ts b/src/commands/tasks.test.ts index 0f154008e59..4020787ab43 100644 --- a/src/commands/tasks.test.ts +++ b/src/commands/tasks.test.ts @@ -72,13 +72,6 @@ const taskFixture = { createdAt: Date.parse("2026-03-29T10:00:00.000Z"), lastEventAt: Date.parse("2026-03-29T10:00:10.000Z"), progressSummary: "No output for 60s. It may be waiting for input.", - recentEvents: [ - { - at: Date.parse("2026-03-29T10:00:10.000Z"), - kind: "progress", - summary: "No output for 60s. It may be waiting for input.", - }, - ], } as const; beforeAll(async () => { @@ -180,7 +173,6 @@ describe("tasks commands", () => { expect(runtimeLogs.join("\n")).toContain( "progressSummary: No output for 60s. It may be waiting for input.", ); - expect(runtimeLogs.join("\n")).toContain("recentEvent[0]: 2026-03-29T10:00:10.000Z progress"); }); it("updates notify policy for an existing task", async () => { diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts index 21d4634e90b..8d263b5e037 100644 --- a/src/commands/tasks.ts +++ b/src/commands/tasks.ts @@ -246,14 +246,6 @@ export async function tasksShowCommand( ...(task.error ? [`error: ${task.error}`] : []), ...(task.progressSummary ? [`progressSummary: ${task.progressSummary}`] : []), ...(task.terminalSummary ? [`terminalSummary: ${task.terminalSummary}`] : []), - ...(task.recentEvents?.length - ? task.recentEvents.map( - (event, index) => - `recentEvent[${index}]: ${new Date(event.at).toISOString()} ${event.kind}${ - event.summary ? ` ${event.summary}` : "" - }`, - ) - : []), ]; for (const line of lines) { runtime.log(line); diff --git a/src/tasks/task-registry.store.sqlite.ts b/src/tasks/task-registry.store.sqlite.ts index 42962655225..0def1fc78a8 100644 --- a/src/tasks/task-registry.store.sqlite.ts +++ b/src/tasks/task-registry.store.sqlite.ts @@ -3,14 +3,14 @@ import type { DatabaseSync, StatementSync } from "node:sqlite"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; import type { DeliveryContext } from "../utils/delivery-context.js"; import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js"; -import type { TaskEventRecord, TaskRecord } from "./task-registry.types.js"; +import type { TaskRegistryStoreSnapshot } from "./task-registry.store.js"; +import type { TaskDeliveryState, TaskRecord } from "./task-registry.types.js"; type TaskRegistryRow = { task_id: string; runtime: TaskRecord["runtime"]; source_id: string | null; requester_session_key: string; - requester_origin_json: string | null; child_session_key: string | null; parent_task_id: string | null; agent_id: string | null; @@ -29,15 +29,23 @@ type TaskRegistryRow = { progress_summary: string | null; terminal_summary: string | null; terminal_outcome: TaskRecord["terminalOutcome"] | null; - recent_events_json: string | null; +}; + +type TaskDeliveryStateRow = { + task_id: string; + requester_origin_json: string | null; last_notified_event_at: number | bigint | null; }; type TaskRegistryStatements = { selectAll: StatementSync; + selectAllDeliveryStates: StatementSync; replaceRow: StatementSync; + replaceDeliveryState: StatementSync; deleteRow: StatementSync; + deleteDeliveryState: StatementSync; clearRows: StatementSync; + clearDeliveryStates: StatementSync; }; type TaskRegistryDatabase = { @@ -74,19 +82,15 @@ function parseJsonValue(raw: string | null): T | undefined { } function rowToTaskRecord(row: TaskRegistryRow): TaskRecord { - const requesterOrigin = parseJsonValue(row.requester_origin_json); - const recentEvents = parseJsonValue(row.recent_events_json); const startedAt = normalizeNumber(row.started_at); const endedAt = normalizeNumber(row.ended_at); const lastEventAt = normalizeNumber(row.last_event_at); const cleanupAfter = normalizeNumber(row.cleanup_after); - const lastNotifiedEventAt = normalizeNumber(row.last_notified_event_at); return { taskId: row.task_id, runtime: row.runtime, ...(row.source_id ? { sourceId: row.source_id } : {}), requesterSessionKey: row.requester_session_key, - ...(requesterOrigin ? { requesterOrigin } : {}), ...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}), ...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}), ...(row.agent_id ? { agentId: row.agent_id } : {}), @@ -105,7 +109,15 @@ function rowToTaskRecord(row: TaskRegistryRow): TaskRecord { ...(row.progress_summary ? { progressSummary: row.progress_summary } : {}), ...(row.terminal_summary ? { terminalSummary: row.terminal_summary } : {}), ...(row.terminal_outcome ? { terminalOutcome: row.terminal_outcome } : {}), - ...(recentEvents?.length ? { recentEvents } : {}), + }; +} + +function rowToTaskDeliveryState(row: TaskDeliveryStateRow): TaskDeliveryState { + const requesterOrigin = parseJsonValue(row.requester_origin_json); + const lastNotifiedEventAt = normalizeNumber(row.last_notified_event_at); + return { + taskId: row.task_id, + ...(requesterOrigin ? { requesterOrigin } : {}), ...(lastNotifiedEventAt != null ? { lastNotifiedEventAt } : {}), }; } @@ -116,7 +128,6 @@ function bindTaskRecord(record: TaskRecord) { runtime: record.runtime, source_id: record.sourceId ?? null, requester_session_key: record.requesterSessionKey, - requester_origin_json: serializeJson(record.requesterOrigin), child_session_key: record.childSessionKey ?? null, parent_task_id: record.parentTaskId ?? null, agent_id: record.agentId ?? null, @@ -135,8 +146,14 @@ function bindTaskRecord(record: TaskRecord) { progress_summary: record.progressSummary ?? null, terminal_summary: record.terminalSummary ?? null, terminal_outcome: record.terminalOutcome ?? null, - recent_events_json: serializeJson(record.recentEvents), - last_notified_event_at: record.lastNotifiedEventAt ?? null, + }; +} + +function bindTaskDeliveryState(state: TaskDeliveryState) { + return { + task_id: state.taskId, + requester_origin_json: serializeJson(state.requesterOrigin), + last_notified_event_at: state.lastNotifiedEventAt ?? null, }; } @@ -148,7 +165,6 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { runtime, source_id, requester_session_key, - requester_origin_json, child_session_key, parent_task_id, agent_id, @@ -166,19 +182,24 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { error, progress_summary, terminal_summary, - terminal_outcome, - recent_events_json, - last_notified_event_at + terminal_outcome FROM task_runs ORDER BY created_at ASC, task_id ASC `), + selectAllDeliveryStates: db.prepare(` + SELECT + task_id, + requester_origin_json, + last_notified_event_at + FROM task_delivery_state + ORDER BY task_id ASC + `), replaceRow: db.prepare(` INSERT OR REPLACE INTO task_runs ( task_id, runtime, source_id, requester_session_key, - requester_origin_json, child_session_key, parent_task_id, agent_id, @@ -196,15 +217,12 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { error, progress_summary, terminal_summary, - terminal_outcome, - recent_events_json, - last_notified_event_at + terminal_outcome ) VALUES ( @task_id, @runtime, @source_id, @requester_session_key, - @requester_origin_json, @child_session_key, @parent_task_id, @agent_id, @@ -222,13 +240,24 @@ function createStatements(db: DatabaseSync): TaskRegistryStatements { @error, @progress_summary, @terminal_summary, - @terminal_outcome, - @recent_events_json, + @terminal_outcome + ) + `), + replaceDeliveryState: db.prepare(` + INSERT OR REPLACE INTO task_delivery_state ( + task_id, + requester_origin_json, + last_notified_event_at + ) VALUES ( + @task_id, + @requester_origin_json, @last_notified_event_at ) `), deleteRow: db.prepare(`DELETE FROM task_runs WHERE task_id = ?`), + deleteDeliveryState: db.prepare(`DELETE FROM task_delivery_state WHERE task_id = ?`), clearRows: db.prepare(`DELETE FROM task_runs`), + clearDeliveryStates: db.prepare(`DELETE FROM task_delivery_state`), }; } @@ -239,7 +268,6 @@ function ensureSchema(db: DatabaseSync) { runtime TEXT NOT NULL, source_id TEXT, requester_session_key TEXT NOT NULL, - requester_origin_json TEXT, child_session_key TEXT, parent_task_id TEXT, agent_id TEXT, @@ -257,8 +285,13 @@ function ensureSchema(db: DatabaseSync) { error TEXT, progress_summary TEXT, terminal_summary TEXT, - terminal_outcome TEXT, - recent_events_json TEXT, + terminal_outcome TEXT + ); + `); + db.exec(` + CREATE TABLE IF NOT EXISTS task_delivery_state ( + task_id TEXT PRIMARY KEY, + requester_origin_json TEXT, last_notified_event_at INTEGER ); `); @@ -323,18 +356,26 @@ function withWriteTransaction(write: (statements: TaskRegistryStatements) => voi } } -export function loadTaskRegistrySnapshotFromSqlite(): Map { +export function loadTaskRegistryStateFromSqlite(): TaskRegistryStoreSnapshot { const { statements } = openTaskRegistryDatabase(); - const rows = statements.selectAll.all() as TaskRegistryRow[]; - return new Map(rows.map((row) => [row.task_id, rowToTaskRecord(row)])); + const taskRows = statements.selectAll.all() as TaskRegistryRow[]; + const deliveryRows = statements.selectAllDeliveryStates.all() as TaskDeliveryStateRow[]; + return { + tasks: new Map(taskRows.map((row) => [row.task_id, rowToTaskRecord(row)])), + deliveryStates: new Map(deliveryRows.map((row) => [row.task_id, rowToTaskDeliveryState(row)])), + }; } -export function saveTaskRegistrySnapshotToSqlite(tasks: ReadonlyMap) { +export function saveTaskRegistryStateToSqlite(snapshot: TaskRegistryStoreSnapshot) { withWriteTransaction((statements) => { + statements.clearDeliveryStates.run(); statements.clearRows.run(); - for (const task of tasks.values()) { + for (const task of snapshot.tasks.values()) { statements.replaceRow.run(bindTaskRecord(task)); } + for (const state of snapshot.deliveryStates.values()) { + statements.replaceDeliveryState.run(bindTaskDeliveryState(state)); + } }); } @@ -347,6 +388,19 @@ export function upsertTaskRegistryRecordToSqlite(task: TaskRecord) { export function deleteTaskRegistryRecordFromSqlite(taskId: string) { const store = openTaskRegistryDatabase(); store.statements.deleteRow.run(taskId); + store.statements.deleteDeliveryState.run(taskId); + ensureTaskRegistryPermissions(store.path); +} + +export function upsertTaskDeliveryStateToSqlite(state: TaskDeliveryState) { + const store = openTaskRegistryDatabase(); + store.statements.replaceDeliveryState.run(bindTaskDeliveryState(state)); + ensureTaskRegistryPermissions(store.path); +} + +export function deleteTaskDeliveryStateFromSqlite(taskId: string) { + const store = openTaskRegistryDatabase(); + store.statements.deleteDeliveryState.run(taskId); ensureTaskRegistryPermissions(store.path); } diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index 5414e5d40d6..b4b3b6aca3c 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -37,7 +37,10 @@ describe("task-registry store runtime", () => { it("uses the configured task store for restore and save", () => { const storedTask = createStoredTask(); - const loadSnapshot = vi.fn(() => new Map([[storedTask.taskId, storedTask]])); + const loadSnapshot = vi.fn(() => ({ + tasks: new Map([[storedTask.taskId, storedTask]]), + deliveryStates: new Map(), + })); const saveSnapshot = vi.fn(); configureTaskRegistryRuntime({ store: { @@ -63,16 +66,21 @@ describe("task-registry store runtime", () => { }); expect(saveSnapshot).toHaveBeenCalled(); - const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as ReadonlyMap; - expect(latestSnapshot.size).toBe(2); - expect(latestSnapshot.get("task-restored")?.task).toBe("Restored task"); + const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as { + tasks: ReadonlyMap; + }; + expect(latestSnapshot.tasks.size).toBe(2); + expect(latestSnapshot.tasks.get("task-restored")?.task).toBe("Restored task"); }); it("emits incremental hook events for restore, mutation, and delete", () => { const events: TaskRegistryHookEvent[] = []; configureTaskRegistryRuntime({ store: { - loadSnapshot: () => new Map([[createStoredTask().taskId, createStoredTask()]]), + loadSnapshot: () => ({ + tasks: new Map([[createStoredTask().taskId, createStoredTask()]]), + deliveryStates: new Map(), + }), saveSnapshot: () => {}, }, hooks: { diff --git a/src/tasks/task-registry.store.ts b/src/tasks/task-registry.store.ts index 5580231b62c..a075272a1b0 100644 --- a/src/tasks/task-registry.store.ts +++ b/src/tasks/task-registry.store.ts @@ -1,17 +1,26 @@ import { closeTaskRegistrySqliteStore, + deleteTaskDeliveryStateFromSqlite, deleteTaskRegistryRecordFromSqlite, - loadTaskRegistrySnapshotFromSqlite, - saveTaskRegistrySnapshotToSqlite, + loadTaskRegistryStateFromSqlite, + saveTaskRegistryStateToSqlite, + upsertTaskDeliveryStateToSqlite, upsertTaskRegistryRecordToSqlite, } from "./task-registry.store.sqlite.js"; -import type { TaskRecord } from "./task-registry.types.js"; +import type { TaskDeliveryState, TaskRecord } from "./task-registry.types.js"; + +export type TaskRegistryStoreSnapshot = { + tasks: Map; + deliveryStates: Map; +}; export type TaskRegistryStore = { - loadSnapshot: () => Map; - saveSnapshot: (tasks: ReadonlyMap) => void; + loadSnapshot: () => TaskRegistryStoreSnapshot; + saveSnapshot: (snapshot: TaskRegistryStoreSnapshot) => void; upsertTask?: (task: TaskRecord) => void; deleteTask?: (taskId: string) => void; + upsertDeliveryState?: (state: TaskDeliveryState) => void; + deleteDeliveryState?: (taskId: string) => void; close?: () => void; }; @@ -37,10 +46,12 @@ export type TaskRegistryHooks = { }; const defaultTaskRegistryStore: TaskRegistryStore = { - loadSnapshot: loadTaskRegistrySnapshotFromSqlite, - saveSnapshot: saveTaskRegistrySnapshotToSqlite, + loadSnapshot: loadTaskRegistryStateFromSqlite, + saveSnapshot: saveTaskRegistryStateToSqlite, upsertTask: upsertTaskRegistryRecordToSqlite, deleteTask: deleteTaskRegistryRecordFromSqlite, + upsertDeliveryState: upsertTaskDeliveryStateToSqlite, + deleteDeliveryState: deleteTaskDeliveryStateFromSqlite, close: closeTaskRegistrySqliteStore, }; diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 0a540745ea4..d225e4596af 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -15,12 +15,13 @@ import { listTaskRecords, maybeDeliverTaskStateChangeUpdate, maybeDeliverTaskTerminalUpdate, + markTaskRunningByRunId, + recordTaskProgressByRunId, resetTaskRegistryForTests, resolveTaskForLookupToken, setTaskProgressById, setTaskTimingById, updateTaskNotifyPolicyById, - updateTaskStateByRunId, } from "./task-registry.js"; import { getInspectableTaskAuditSummary, @@ -890,8 +891,8 @@ describe("task-registry", () => { const now = Date.now(); configureTaskRegistryRuntime({ store: { - loadSnapshot: () => - new Map([ + loadSnapshot: () => ({ + tasks: new Map([ [ "task-missing-cleanup", { @@ -909,6 +910,8 @@ describe("task-registry", () => { }, ], ]), + deliveryStates: new Map(), + }), saveSnapshot: () => {}, }, }); @@ -935,8 +938,8 @@ describe("task-registry", () => { const now = Date.now(); configureTaskRegistryRuntime({ store: { - loadSnapshot: () => - new Map([ + loadSnapshot: () => ({ + tasks: new Map([ [ "task-audit-summary", { @@ -954,6 +957,8 @@ describe("task-registry", () => { }, ], ]), + deliveryStates: new Map(), + }), saveSnapshot: () => {}, }, }); @@ -998,9 +1003,8 @@ describe("task-registry", () => { notifyPolicy: "done_only", }); - updateTaskStateByRunId({ + markTaskRunningByRunId({ runId: "run-state-change", - status: "running", eventSummary: "Started.", }); await waitForAssertion(() => expect(hoisted.sendMessageMock).not.toHaveBeenCalled()); @@ -1009,7 +1013,7 @@ describe("task-registry", () => { taskId: task.taskId, notifyPolicy: "state_changes", }); - updateTaskStateByRunId({ + recordTaskProgressByRunId({ runId: "run-state-change", eventSummary: "No output for 60s. It may be waiting for input.", }); @@ -1024,13 +1028,6 @@ describe("task-registry", () => { ); expect(findTaskByRunId("run-state-change")).toMatchObject({ notifyPolicy: "state_changes", - lastNotifiedEventAt: expect.any(Number), - recentEvents: expect.arrayContaining([ - expect.objectContaining({ - kind: "progress", - summary: "No output for 60s. It may be waiting for input.", - }), - ]), }); await maybeDeliverTaskStateChangeUpdate(task.taskId); expect(hoisted.sendMessageMock).toHaveBeenCalledTimes(1); diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index a2aa43c97b2..010c87ec6b9 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -17,6 +17,7 @@ import { } from "./task-registry.store.js"; import { summarizeTaskRecords } from "./task-registry.summary.js"; import type { + TaskDeliveryState, TaskDeliveryStatus, TaskEventKind, TaskEventRecord, @@ -33,6 +34,7 @@ const log = createSubsystemLogger("tasks/registry"); const DEFAULT_TASK_RETENTION_MS = 7 * 24 * 60 * 60_000; const tasks = new Map(); +const taskDeliveryStates = new Map(); const taskIdsByRunId = new Map>(); const tasksWithPendingDelivery = new Set(); let listenerStarted = false; @@ -42,12 +44,13 @@ let deliveryRuntimePromise: Promise ({ ...event })) } - : {}), + ...state, + ...(state.requesterOrigin ? { requesterOrigin: { ...state.requesterOrigin } } : {}), }; } @@ -71,7 +74,10 @@ function emitTaskRegistryHookEvent(createEvent: () => TaskRegistryHookEvent): vo } function persistTaskRegistry() { - getTaskRegistryStore().saveSnapshot(tasks); + getTaskRegistryStore().saveSnapshot({ + tasks, + deliveryStates: taskDeliveryStates, + }); } function persistTaskUpsert(task: TaskRecord) { @@ -80,7 +86,10 @@ function persistTaskUpsert(task: TaskRecord) { store.upsertTask(task); return; } - store.saveSnapshot(tasks); + store.saveSnapshot({ + tasks, + deliveryStates: taskDeliveryStates, + }); } function persistTaskDelete(taskId: string) { @@ -89,7 +98,34 @@ function persistTaskDelete(taskId: string) { store.deleteTask(taskId); return; } - store.saveSnapshot(tasks); + store.saveSnapshot({ + tasks, + deliveryStates: taskDeliveryStates, + }); +} + +function persistTaskDeliveryStateUpsert(state: TaskDeliveryState) { + const store = getTaskRegistryStore(); + if (store.upsertDeliveryState) { + store.upsertDeliveryState(state); + return; + } + store.saveSnapshot({ + tasks, + deliveryStates: taskDeliveryStates, + }); +} + +function persistTaskDeliveryStateDelete(taskId: string) { + const store = getTaskRegistryStore(); + if (store.deleteDeliveryState) { + store.deleteDeliveryState(taskId); + return; + } + store.saveSnapshot({ + tasks, + deliveryStates: taskDeliveryStates, + }); } function ensureDeliveryStatus(requesterSessionKey: string): TaskDeliveryStatus { @@ -142,25 +178,17 @@ function resolveTaskTerminalOutcome(params: { return params.status === "succeeded" ? "succeeded" : undefined; } -const TASK_RECENT_EVENT_LIMIT = 12; - -function appendTaskEvent( - current: TaskRecord, - event: { - at: number; - kind: TaskEventKind; - summary?: string | null; - }, -): TaskEventRecord[] { +function appendTaskEvent(event: { + at: number; + kind: TaskEventKind; + summary?: string | null; +}): TaskEventRecord { const summary = normalizeTaskSummary(event.summary); - const nextEvent: TaskEventRecord = { + return { at: event.at, kind: event.kind, ...(summary ? { summary } : {}), }; - const previous = current.recentEvents ?? []; - const merged = [...previous, nextEvent]; - return merged.slice(-TASK_RECENT_EVENT_LIMIT); } function loadTaskRegistryDeliveryRuntime() { @@ -261,7 +289,7 @@ function findExistingTaskForCreate(params: { function mergeExistingTaskForCreate( existing: TaskRecord, params: { - requesterOrigin?: TaskRecord["requesterOrigin"]; + requesterOrigin?: TaskDeliveryState["requesterOrigin"]; sourceId?: string; parentTaskId?: string; agentId?: string; @@ -274,8 +302,13 @@ function mergeExistingTaskForCreate( ): TaskRecord { const patch: Partial = {}; const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin); - if (requesterOrigin && !existing.requesterOrigin) { - patch.requesterOrigin = requesterOrigin; + const currentDeliveryState = taskDeliveryStates.get(existing.taskId); + if (requesterOrigin && !currentDeliveryState?.requesterOrigin) { + upsertTaskDeliveryState({ + taskId: existing.taskId, + requesterOrigin, + lastNotifiedEventAt: currentDeliveryState?.lastNotifiedEventAt, + }); } if (params.sourceId?.trim() && !existing.sourceId?.trim()) { patch.sourceId = params.sourceId.trim(); @@ -327,12 +360,15 @@ function restoreTaskRegistryOnce() { restoreAttempted = true; try { const restored = getTaskRegistryStore().loadSnapshot(); - if (restored.size === 0) { + if (restored.tasks.size === 0 && restored.deliveryStates.size === 0) { return; } - for (const [taskId, task] of restored.entries()) { + for (const [taskId, task] of restored.tasks.entries()) { tasks.set(taskId, task); } + for (const [taskId, state] of restored.deliveryStates.entries()) { + taskDeliveryStates.set(taskId, state); + } rebuildRunIdIndex(); emitTaskRegistryHookEvent(() => ({ kind: "restored", @@ -381,6 +417,30 @@ function updateTask(taskId: string, patch: Partial): TaskRecord | nu return cloneTaskRecord(next); } +function upsertTaskDeliveryState(state: TaskDeliveryState): TaskDeliveryState { + const current = taskDeliveryStates.get(state.taskId); + const next: TaskDeliveryState = { + taskId: state.taskId, + ...(state.requesterOrigin + ? { requesterOrigin: normalizeDeliveryContext(state.requesterOrigin) } + : {}), + ...(state.lastNotifiedEventAt != null + ? { lastNotifiedEventAt: state.lastNotifiedEventAt } + : {}), + }; + if (!next.requesterOrigin && typeof next.lastNotifiedEventAt !== "number" && !current) { + return cloneTaskDeliveryState({ taskId: state.taskId }); + } + taskDeliveryStates.set(state.taskId, next); + persistTaskDeliveryStateUpsert(next); + return cloneTaskDeliveryState(next); +} + +function getTaskDeliveryState(taskId: string): TaskDeliveryState | undefined { + const state = taskDeliveryStates.get(taskId); + return state ? cloneTaskDeliveryState(state) : undefined; +} + function formatTaskTerminalEvent(task: TaskRecord): string { // User-facing task notifications stay intentionally terse. Detailed runtime chatter lives // in task metadata for inspection, not in the default channel ping. @@ -419,7 +479,7 @@ function formatTaskTerminalEvent(task: TaskRecord): string { } function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean { - const origin = normalizeDeliveryContext(task.requesterOrigin); + const origin = normalizeDeliveryContext(taskDeliveryStates.get(task.taskId)?.requesterOrigin); const channel = origin?.channel?.trim(); const to = origin?.to?.trim(); return Boolean(channel && to && isDeliverableMessageChannel(channel)); @@ -433,7 +493,7 @@ function queueTaskSystemEvent(task: TaskRecord, text: string) { enqueueSystemEvent(text, { sessionKey: requesterSessionKey, contextKey: `task:${task.taskId}`, - deliveryContext: task.requesterOrigin, + deliveryContext: taskDeliveryStates.get(task.taskId)?.requesterOrigin, }); requestHeartbeatNow({ reason: "background-task", @@ -462,7 +522,7 @@ function queueBlockedTaskFollowup(task: TaskRecord) { enqueueSystemEvent(`Task needs follow-up: ${title}${runLabel}. ${summary}`, { sessionKey: requesterSessionKey, contextKey: `task:${task.taskId}:blocked-followup`, - deliveryContext: task.requesterOrigin, + deliveryContext: taskDeliveryStates.get(task.taskId)?.requesterOrigin, }); requestHeartbeatNow({ reason: "background-task-blocked", @@ -579,7 +639,7 @@ export async function maybeDeliverTaskTerminalUpdate(taskId: string): Promise { ensureTaskRegistryReady(); const current = tasks.get(taskId); if (!current || !shouldAutoDeliverTaskStateChange(current)) { return current ? cloneTaskRecord(current) : null; } - const latestEvent = current.recentEvents?.at(-1); - if (!latestEvent || (current.lastNotifiedEventAt ?? 0) >= latestEvent.at) { + const deliveryState = getTaskDeliveryState(taskId); + if (!latestEvent || (deliveryState?.lastNotifiedEventAt ?? 0) >= latestEvent.at) { return cloneTaskRecord(current); } const eventText = formatTaskStateChangeEvent(current, latestEvent); @@ -650,13 +711,17 @@ export async function maybeDeliverTaskStateChangeUpdate( try { if (!canDeliverTaskToRequesterOrigin(current)) { queueTaskSystemEvent(current, eventText); - return updateTask(taskId, { + upsertTaskDeliveryState({ + taskId, + requesterOrigin: deliveryState?.requesterOrigin, lastNotifiedEventAt: latestEvent.at, + }); + return updateTask(taskId, { lastEventAt: Date.now(), }); } const { sendMessage } = await loadTaskRegistryDeliveryRuntime(); - const origin = normalizeDeliveryContext(current.requesterOrigin); + const origin = normalizeDeliveryContext(deliveryState?.requesterOrigin); const requesterAgentId = parseAgentSessionKey(current.requesterSessionKey)?.agentId; await sendMessage({ channel: origin?.channel, @@ -672,8 +737,12 @@ export async function maybeDeliverTaskStateChangeUpdate( idempotencyKey: `task-event:${current.taskId}:${latestEvent.at}:${latestEvent.kind}`, }, }); - return updateTask(taskId, { + upsertTaskDeliveryState({ + taskId, + requesterOrigin: deliveryState?.requesterOrigin, lastNotifiedEventAt: latestEvent.at, + }); + return updateTask(taskId, { lastEventAt: Date.now(), }); } catch (error) { @@ -834,21 +903,22 @@ function ensureListener() { } else if (evt.stream === "error") { patch.error = typeof evt.data?.error === "string" ? evt.data.error : current.error; } - if (patch.status && patch.status !== current.status) { - patch.recentEvents = appendTaskEvent(current, { - at: now, - kind: patch.status, - summary: - patch.status === "failed" - ? (patch.error ?? current.error) - : patch.status === "succeeded" - ? current.terminalSummary - : undefined, - }); - } + const stateChangeEvent = + patch.status && patch.status !== current.status + ? appendTaskEvent({ + at: now, + kind: patch.status, + summary: + patch.status === "failed" + ? (patch.error ?? current.error) + : patch.status === "succeeded" + ? current.terminalSummary + : undefined, + }) + : undefined; const updated = updateTask(taskId, patch); if (updated) { - void maybeDeliverTaskStateChangeUpdate(taskId); + void maybeDeliverTaskStateChangeUpdate(taskId, stateChangeEvent); void maybeDeliverTaskTerminalUpdate(taskId); } } @@ -859,7 +929,7 @@ export function createTaskRecord(params: { runtime: TaskRuntime; sourceId?: string; requesterSessionKey: string; - requesterOrigin?: TaskRecord["requesterOrigin"]; + requesterOrigin?: TaskDeliveryState["requesterOrigin"]; childSessionKey?: string; parentTaskId?: string; agentId?: string; @@ -897,7 +967,6 @@ export function createTaskRecord(params: { runtime: params.runtime, sourceId: params.sourceId?.trim() || undefined, requesterSessionKey: params.requesterSessionKey, - requesterOrigin: normalizeDeliveryContext(params.requesterOrigin), childSessionKey: params.childSessionKey, parentTaskId: params.parentTaskId?.trim() || undefined, agentId: params.agentId?.trim() || undefined, @@ -917,28 +986,16 @@ export function createTaskRecord(params: { status, terminalOutcome: params.terminalOutcome, }), - recentEvents: appendTaskEvent( - { - taskId, - runtime: params.runtime, - requesterSessionKey: params.requesterSessionKey, - task: params.task, - status, - deliveryStatus, - notifyPolicy, - createdAt: now, - } as TaskRecord, - { - at: lastEventAt, - kind: status, - }, - ), }; if (isTerminalTaskStatus(record.status) && typeof record.cleanupAfter !== "number") { record.cleanupAfter = (record.endedAt ?? record.lastEventAt ?? record.createdAt) + DEFAULT_TASK_RETENTION_MS; } tasks.set(taskId, record); + upsertTaskDeliveryState({ + taskId, + requesterOrigin: normalizeDeliveryContext(params.requesterOrigin), + }); addRunIdIndex(taskId, record.runId); persistTaskUpsert(record); emitTaskRegistryHookEvent(() => ({ @@ -951,7 +1008,7 @@ export function createTaskRecord(params: { return cloneTaskRecord(record); } -export function updateTaskStateByRunId(params: { +function updateTaskStateByRunId(params: { runId: string; status?: TaskStatus; startedAt?: number; @@ -1014,38 +1071,95 @@ export function updateTaskStateByRunId(params: { const shouldAppendEvent = (params.status && params.status !== current.status) || Boolean(normalizeTaskSummary(params.eventSummary)); - if (shouldAppendEvent) { - patch.recentEvents = appendTaskEvent(current, { - at: eventAt, - kind: - params.status && normalizeTaskStatus(params.status) !== current.status - ? normalizeTaskStatus(params.status) - : "progress", - summary: eventSummary, - }); - } + const nextEvent = shouldAppendEvent + ? appendTaskEvent({ + at: eventAt, + kind: + params.status && normalizeTaskStatus(params.status) !== current.status + ? normalizeTaskStatus(params.status) + : "progress", + summary: eventSummary, + }) + : undefined; const task = updateTask(taskId, patch); if (task) { updated.push(task); + void maybeDeliverTaskStateChangeUpdate(task.taskId, nextEvent); + void maybeDeliverTaskTerminalUpdate(task.taskId); } } - for (const task of updated) { - void maybeDeliverTaskStateChangeUpdate(task.taskId); - void maybeDeliverTaskTerminalUpdate(task.taskId); - } return updated; } -export function updateTaskDeliveryByRunId(params: { - runId: string; - deliveryStatus: TaskDeliveryStatus; -}) { +function updateTaskDeliveryByRunId(params: { runId: string; deliveryStatus: TaskDeliveryStatus }) { ensureTaskRegistryReady(); return updateTasksByRunId(params.runId, { deliveryStatus: params.deliveryStatus, }); } +export function markTaskRunningByRunId(params: { + runId: string; + startedAt?: number; + lastEventAt?: number; + progressSummary?: string | null; + eventSummary?: string | null; +}) { + return updateTaskStateByRunId({ + runId: params.runId, + status: "running", + startedAt: params.startedAt, + lastEventAt: params.lastEventAt, + progressSummary: params.progressSummary, + eventSummary: params.eventSummary, + }); +} + +export function recordTaskProgressByRunId(params: { + runId: string; + lastEventAt?: number; + progressSummary?: string | null; + eventSummary?: string | null; +}) { + return updateTaskStateByRunId({ + runId: params.runId, + lastEventAt: params.lastEventAt, + progressSummary: params.progressSummary, + eventSummary: params.eventSummary, + }); +} + +export function markTaskTerminalByRunId(params: { + runId: string; + status: Extract; + startedAt?: number; + endedAt: number; + lastEventAt?: number; + error?: string; + progressSummary?: string | null; + terminalSummary?: string | null; + terminalOutcome?: TaskTerminalOutcome | null; +}) { + return updateTaskStateByRunId({ + runId: params.runId, + status: params.status, + startedAt: params.startedAt, + endedAt: params.endedAt, + lastEventAt: params.lastEventAt, + error: params.error, + progressSummary: params.progressSummary, + terminalSummary: params.terminalSummary, + terminalOutcome: params.terminalOutcome, + }); +} + +export function setTaskRunDeliveryStatusByRunId(params: { + runId: string; + deliveryStatus: TaskDeliveryStatus; +}) { + return updateTaskDeliveryByRunId(params); +} + export function updateTaskNotifyPolicyById(params: { taskId: string; notifyPolicy: TaskNotifyPolicy; @@ -1122,11 +1236,6 @@ export async function cancelTaskById(params: { endedAt: Date.now(), lastEventAt: Date.now(), error: "Cancelled by operator.", - recentEvents: appendTaskEvent(task, { - at: Date.now(), - kind: "cancelled", - summary: "Cancelled by operator.", - }), }); if (updated) { void maybeDeliverTaskTerminalUpdate(updated.taskId); @@ -1161,6 +1270,7 @@ export function getTaskRegistrySummary(): TaskRegistrySummary { export function getTaskRegistrySnapshot(): TaskRegistrySnapshot { return { tasks: listTaskRecords(), + deliveryStates: [...taskDeliveryStates.values()].map((state) => cloneTaskDeliveryState(state)), }; } @@ -1201,8 +1311,10 @@ export function deleteTaskRecordById(taskId: string): boolean { return false; } tasks.delete(taskId); + taskDeliveryStates.delete(taskId); rebuildRunIdIndex(); persistTaskDelete(taskId); + persistTaskDeliveryStateDelete(taskId); emitTaskRegistryHookEvent(() => ({ kind: "deleted", taskId: current.taskId, @@ -1213,7 +1325,9 @@ export function deleteTaskRecordById(taskId: string): boolean { export function resetTaskRegistryForTests(opts?: { persist?: boolean }) { tasks.clear(); + taskDeliveryStates.clear(); taskIdsByRunId.clear(); + tasksWithPendingDelivery.clear(); restoreAttempted = false; resetTaskRegistryRuntimeForTests(); if (listenerStop) { diff --git a/src/tasks/task-registry.types.ts b/src/tasks/task-registry.types.ts index a3415821db1..2228f16d998 100644 --- a/src/tasks/task-registry.types.ts +++ b/src/tasks/task-registry.types.ts @@ -43,12 +43,17 @@ export type TaskEventRecord = { summary?: string; }; +export type TaskDeliveryState = { + taskId: string; + requesterOrigin?: DeliveryContext; + lastNotifiedEventAt?: number; +}; + export type TaskRecord = { taskId: string; runtime: TaskRuntime; sourceId?: string; requesterSessionKey: string; - requesterOrigin?: DeliveryContext; childSessionKey?: string; parentTaskId?: string; agentId?: string; @@ -67,10 +72,9 @@ export type TaskRecord = { progressSummary?: string; terminalSummary?: string; terminalOutcome?: TaskTerminalOutcome; - recentEvents?: TaskEventRecord[]; - lastNotifiedEventAt?: number; }; export type TaskRegistrySnapshot = { tasks: TaskRecord[]; + deliveryStates: TaskDeliveryState[]; };