diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts index cb111ed647c..65dc46778d7 100644 --- a/src/commands/tasks.ts +++ b/src/commands/tasks.ts @@ -531,7 +531,7 @@ export async function tasksMaintenanceCommand( runtime.log( info( - `Tasks maintenance (${opts.apply ? "applied" : "preview"}): tasks ${taskMaintenance.reconciled} reconcile · ${taskMaintenance.cleanupStamped} cleanup stamp · ${taskMaintenance.pruned} prune; task-flows ${flowMaintenance.reconciled} reconcile · ${flowMaintenance.pruned} prune`, + `Tasks maintenance (${opts.apply ? "applied" : "preview"}): tasks ${taskMaintenance.reconciled} reconcile · ${taskMaintenance.recovered} recovered · ${taskMaintenance.cleanupStamped} cleanup stamp · ${taskMaintenance.pruned} prune; task-flows ${flowMaintenance.reconciled} reconcile · ${flowMaintenance.pruned} prune`, ), ); runtime.log( diff --git a/src/tasks/detached-task-runtime-contract.ts b/src/tasks/detached-task-runtime-contract.ts index e1e82d9f09f..c989003c3c0 100644 --- a/src/tasks/detached-task-runtime-contract.ts +++ b/src/tasks/detached-task-runtime-contract.ts @@ -112,6 +112,11 @@ export type DetachedTaskLifecycleRuntime = { cancelDetachedTaskRunById: ( params: DetachedTaskCancelParams, ) => Promise; + onBeforeMarkLost?: (params: { + taskId: string; + runtime: TaskRuntime; + task: TaskRecord; + }) => { recovered: boolean } | Promise<{ recovered: boolean }>; }; export type DetachedTaskLifecycleRuntimeRegistration = { diff --git a/src/tasks/detached-task-runtime.test.ts b/src/tasks/detached-task-runtime.test.ts index 26eaea3280f..e3f0f072297 100644 --- a/src/tasks/detached-task-runtime.test.ts +++ b/src/tasks/detached-task-runtime.test.ts @@ -7,6 +7,7 @@ import { failTaskRunByRunId, getDetachedTaskLifecycleRuntime, getDetachedTaskLifecycleRuntimeRegistration, + onBeforeMarkLost, registerDetachedTaskRuntime, recordTaskRunProgressByRunId, resetDetachedTaskLifecycleRuntimeForTests, @@ -145,4 +146,60 @@ describe("detached-task-runtime", () => { }); expect(getDetachedTaskLifecycleRuntime()).toBe(runtime); }); + + describe("onBeforeMarkLost", () => { + it("returns recovered when hook returns recovered true", async () => { + const task = createFakeTaskRecord({ taskId: "task-recover", runtime: "subagent" }); + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + onBeforeMarkLost: vi.fn(() => ({ recovered: true })), + }); + const result = await onBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + }); + expect(result).toEqual({ recovered: true }); + }); + + it("returns not recovered when hook returns recovered false", async () => { + const task = createFakeTaskRecord({ taskId: "task-no-recover", runtime: "cron" }); + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + onBeforeMarkLost: vi.fn(() => ({ recovered: false })), + }); + const result = await onBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + }); + expect(result).toEqual({ recovered: false }); + }); + + it("returns not recovered when hook is not provided", async () => { + const task = createFakeTaskRecord({ taskId: "task-no-hook", runtime: "cli" }); + const result = await onBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + }); + expect(result).toEqual({ recovered: false }); + }); + + it("returns not recovered and logs warning when hook throws", async () => { + const task = createFakeTaskRecord({ taskId: "task-throw", runtime: "acp" }); + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + onBeforeMarkLost: vi.fn(() => { + throw new Error("plugin crashed"); + }), + }); + const result = await onBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + }); + expect(result).toEqual({ recovered: false }); + }); + }); }); diff --git a/src/tasks/detached-task-runtime.ts b/src/tasks/detached-task-runtime.ts index 643f15af13f..0cfc32debe1 100644 --- a/src/tasks/detached-task-runtime.ts +++ b/src/tasks/detached-task-runtime.ts @@ -1,3 +1,4 @@ +import { createSubsystemLogger } from "../logging/subsystem.js"; import type { DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration, @@ -18,6 +19,9 @@ import { setDetachedTaskDeliveryStatusByRunId as setDetachedTaskDeliveryStatusByRunIdFromExecutor, startTaskRunByRunId as startTaskRunByRunIdFromExecutor, } from "./task-executor.js"; +import type { TaskRecord, TaskRuntime } from "./task-registry.types.js"; + +const log = createSubsystemLogger("tasks/detached-runtime"); export type { DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration }; @@ -104,3 +108,24 @@ export function cancelDetachedTaskRunById( ): ReturnType { return getDetachedTaskLifecycleRuntime().cancelDetachedTaskRunById(...args); } + +export async function onBeforeMarkLost(params: { + taskId: string; + runtime: TaskRuntime; + task: TaskRecord; +}): Promise<{ recovered: boolean }> { + const hook = getDetachedTaskLifecycleRuntime().onBeforeMarkLost; + if (!hook) { + return { recovered: false }; + } + try { + return await hook(params); + } catch (err) { + log.warn("onBeforeMarkLost hook threw, proceeding with markTaskLost", { + taskId: params.taskId, + runtime: params.runtime, + error: err, + }); + return { recovered: false }; + } +} diff --git a/src/tasks/task-registry.maintenance.issue-60299.test.ts b/src/tasks/task-registry.maintenance.issue-60299.test.ts index 59af8e61658..2c0e4e2da14 100644 --- a/src/tasks/task-registry.maintenance.issue-60299.test.ts +++ b/src/tasks/task-registry.maintenance.issue-60299.test.ts @@ -1,7 +1,12 @@ -import { afterEach, describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js"; import type { SessionEntry } from "../config/sessions.js"; import type { ParsedAgentSessionKey } from "../routing/session-key.js"; +import { + resetDetachedTaskLifecycleRuntimeForTests, + setDetachedTaskLifecycleRuntime, + getDetachedTaskLifecycleRuntime, +} from "./detached-task-runtime.js"; import { resetTaskRegistryMaintenanceRuntimeForTests, runTaskRegistryMaintenance, @@ -38,6 +43,7 @@ type TaskRegistryMaintenanceRuntime = Parameters< afterEach(() => { stopTaskRegistryMaintenanceForTests(); resetTaskRegistryMaintenanceRuntimeForTests(); + resetDetachedTaskLifecycleRuntimeForTests(); }); function createTaskRegistryMaintenanceHarness(params: { @@ -197,4 +203,25 @@ describe("task-registry maintenance issue #60299", () => { expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 }); expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" }); }); + + it("skips markTaskLost and counts recovered when onBeforeMarkLost hook recovers a stale task", async () => { + const task = makeStaleTask({ + runtime: "cron", + sourceId: "cron-job-recovered", + childSessionKey: undefined, + }); + + const { currentTasks } = createTaskRegistryMaintenanceHarness({ + tasks: [task], + }); + + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + onBeforeMarkLost: vi.fn(() => ({ recovered: true })), + }); + + const result = await runTaskRegistryMaintenance(); + expect(result).toMatchObject({ reconciled: 0, recovered: 1 }); + expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" }); + }); }); diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index afb47b075aa..7b657cbcd6a 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -5,6 +5,7 @@ import { getAgentRunContext } from "../infra/agent-events.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { deriveSessionChatType } from "../sessions/session-chat-type.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; +import { onBeforeMarkLost } from "./detached-task-runtime.js"; import { deleteTaskRecordById, ensureTaskRegistryReady, @@ -77,6 +78,7 @@ let taskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = export type TaskRegistryMaintenanceSummary = { reconciled: number; + recovered: number; cleanupStamped: number; pruned: number; }; @@ -269,7 +271,7 @@ export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary cleanupStamped += 1; } } - return { reconciled, cleanupStamped, pruned }; + return { reconciled, recovered: 0, cleanupStamped, pruned }; } /** @@ -296,6 +298,7 @@ export async function runTaskRegistryMaintenance(): Promise {