From 24322af4f75ad1076e43e9ea050a99a541d0e4ff Mon Sep 17 00:00:00 2001 From: Mariano Belinky Date: Tue, 21 Apr 2026 00:33:23 +0200 Subject: [PATCH] tasks: harden detached recovery seam --- .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- src/tasks/detached-task-runtime-contract.ts | 23 +++++-- src/tasks/detached-task-runtime.test.ts | 67 ++++++++++++++++--- src/tasks/detached-task-runtime.ts | 35 +++++++--- ...k-registry.maintenance.issue-60299.test.ts | 15 ++++- src/tasks/task-registry.maintenance.ts | 7 +- 6 files changed, 118 insertions(+), 33 deletions(-) diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index e086e690a94..eafa67247ca 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -4ec700ac180b7eca81ca48885bc7f645dbf5016e2438e44678f4c206eed4b643 plugin-sdk-api-baseline.json -ff0d1541e7220c67d97444304568285303e423770bd6af6227afdf470bf233cc plugin-sdk-api-baseline.jsonl +c1d52cab340bc37d6750de609ba565598cbfc883291786092b0bfebec9d0c926 plugin-sdk-api-baseline.json +d03fa4539c6a2f0a138da3c658543315ad56648c3fd375591d09253d5db09298 plugin-sdk-api-baseline.jsonl diff --git a/src/tasks/detached-task-runtime-contract.ts b/src/tasks/detached-task-runtime-contract.ts index c989003c3c0..018f8115a24 100644 --- a/src/tasks/detached-task-runtime-contract.ts +++ b/src/tasks/detached-task-runtime-contract.ts @@ -97,6 +97,17 @@ export type DetachedTaskCancelResult = { task?: TaskRecord; }; +export type DetachedTaskRecoveryAttemptParams = { + taskId: string; + runtime: TaskRuntime; + task: TaskRecord; + now: number; +}; + +export type DetachedTaskRecoveryAttemptResult = { + recovered: boolean; +}; + export type DetachedTaskLifecycleRuntime = { createQueuedTaskRun: (params: DetachedTaskCreateParams) => TaskRecord; createRunningTaskRun: (params: DetachedRunningTaskCreateParams) => TaskRecord; @@ -112,11 +123,13 @@ export type DetachedTaskLifecycleRuntime = { cancelDetachedTaskRunById: ( params: DetachedTaskCancelParams, ) => Promise; - onBeforeMarkLost?: (params: { - taskId: string; - runtime: TaskRuntime; - task: TaskRecord; - }) => { recovered: boolean } | Promise<{ recovered: boolean }>; + /** + * Give a registered detached runtime one last chance to recover a stale task + * before core marks it lost during maintenance. + */ + tryRecoverTaskBeforeMarkLost?: ( + params: DetachedTaskRecoveryAttemptParams, + ) => DetachedTaskRecoveryAttemptResult | Promise; }; export type DetachedTaskLifecycleRuntimeRegistration = { diff --git a/src/tasks/detached-task-runtime.test.ts b/src/tasks/detached-task-runtime.test.ts index 1645dfab504..01ae49d1376 100644 --- a/src/tasks/detached-task-runtime.test.ts +++ b/src/tasks/detached-task-runtime.test.ts @@ -7,13 +7,13 @@ import { failTaskRunByRunId, getDetachedTaskLifecycleRuntime, getDetachedTaskLifecycleRuntimeRegistration, - onBeforeMarkLost, registerDetachedTaskRuntime, recordTaskRunProgressByRunId, resetDetachedTaskLifecycleRuntimeForTests, setDetachedTaskLifecycleRuntime, setDetachedTaskDeliveryStatusByRunId, startTaskRunByRunId, + tryRecoverTaskBeforeMarkLost, } from "./detached-task-runtime.js"; import type { TaskRecord } from "./task-registry.types.js"; @@ -166,17 +166,18 @@ describe("detached-task-runtime", () => { expect(getDetachedTaskLifecycleRuntime()).toBe(runtime); }); - describe("onBeforeMarkLost", () => { + describe("tryRecoverTaskBeforeMarkLost", () => { it("returns recovered when hook returns recovered true", async () => { const task = createFakeTaskRecord({ taskId: "task-recover", runtime: "subagent" }); setDetachedTaskLifecycleRuntime({ ...getDetachedTaskLifecycleRuntime(), - onBeforeMarkLost: vi.fn(() => ({ recovered: true })), + tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ recovered: true })), }); - const result = await onBeforeMarkLost({ + const result = await tryRecoverTaskBeforeMarkLost({ taskId: task.taskId, runtime: task.runtime, task, + now: 123, }); expect(result).toEqual({ recovered: true }); }); @@ -185,22 +186,24 @@ describe("detached-task-runtime", () => { const task = createFakeTaskRecord({ taskId: "task-no-recover", runtime: "cron" }); setDetachedTaskLifecycleRuntime({ ...getDetachedTaskLifecycleRuntime(), - onBeforeMarkLost: vi.fn(() => ({ recovered: false })), + tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ recovered: false })), }); - const result = await onBeforeMarkLost({ + const result = await tryRecoverTaskBeforeMarkLost({ taskId: task.taskId, runtime: task.runtime, task, + now: 456, }); expect(result).toEqual({ recovered: false }); }); it("returns not recovered when hook is not provided", async () => { const task = createFakeTaskRecord({ taskId: "task-no-hook", runtime: "cli" }); - const result = await onBeforeMarkLost({ + const result = await tryRecoverTaskBeforeMarkLost({ taskId: task.taskId, runtime: task.runtime, task, + now: 789, }); expect(result).toEqual({ recovered: false }); }); @@ -209,20 +212,62 @@ describe("detached-task-runtime", () => { const task = createFakeTaskRecord({ taskId: "task-throw", runtime: "acp" }); setDetachedTaskLifecycleRuntime({ ...getDetachedTaskLifecycleRuntime(), - onBeforeMarkLost: vi.fn(() => { + tryRecoverTaskBeforeMarkLost: vi.fn(() => { throw new Error("plugin crashed"); }), }); - const result = await onBeforeMarkLost({ + const result = await tryRecoverTaskBeforeMarkLost({ taskId: task.taskId, runtime: task.runtime, task, + now: 1_000, }); expect(result).toEqual({ recovered: false }); expect(mockLogWarn).toHaveBeenCalledWith( - "onBeforeMarkLost hook threw, proceeding with markTaskLost", - expect.objectContaining({ taskId: "task-throw", runtime: "acp" }), + "Detached task recovery hook threw, proceeding with markTaskLost", + expect.objectContaining({ taskId: "task-throw", runtime: "acp", elapsedMs: 0 }), ); }); + + it("returns not recovered and logs warning when hook returns invalid result", async () => { + const task = createFakeTaskRecord({ taskId: "task-invalid", runtime: "cron" }); + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ nope: true }) as never), + }); + const result = await tryRecoverTaskBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + now: 2_000, + }); + expect(result).toEqual({ recovered: false }); + expect(mockLogWarn).toHaveBeenCalledWith( + "Detached task recovery hook returned invalid result, proceeding with markTaskLost", + expect.objectContaining({ taskId: "task-invalid", runtime: "cron" }), + ); + }); + + it("logs when the recovery hook is slow", async () => { + const task = createFakeTaskRecord({ taskId: "task-slow", runtime: "subagent" }); + const dateNowSpy = vi.spyOn(Date, "now"); + dateNowSpy.mockReturnValueOnce(10_000).mockReturnValueOnce(16_000); + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + tryRecoverTaskBeforeMarkLost: vi.fn(async () => ({ recovered: true })), + }); + const result = await tryRecoverTaskBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + now: 3_000, + }); + expect(result).toEqual({ recovered: true }); + expect(mockLogWarn).toHaveBeenCalledWith( + "Detached task recovery hook was slow", + expect.objectContaining({ taskId: "task-slow", runtime: "subagent", elapsedMs: 6_000 }), + ); + dateNowSpy.mockRestore(); + }); }); }); diff --git a/src/tasks/detached-task-runtime.ts b/src/tasks/detached-task-runtime.ts index d5fd8bf6c94..a62042177c8 100644 --- a/src/tasks/detached-task-runtime.ts +++ b/src/tasks/detached-task-runtime.ts @@ -1,5 +1,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import type { + DetachedTaskRecoveryAttemptParams, + DetachedTaskRecoveryAttemptResult, DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration, } from "./detached-task-runtime-contract.js"; @@ -19,9 +21,9 @@ import { setDetachedTaskDeliveryStatusByRunId as setDetachedTaskDeliveryStatusByRunIdFromExecutor, startTaskRunByRunId as startTaskRunByRunIdFromExecutor, } from "./task-executor.js"; -import type { TaskRecord, TaskRuntime } from "./task-registry.types.js"; const log = createSubsystemLogger("tasks/detached-runtime"); +const DETACHED_TASK_RECOVERY_WARN_MS = 5_000; export type { DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration }; @@ -109,25 +111,38 @@ export function cancelDetachedTaskRunById( return getDetachedTaskLifecycleRuntime().cancelDetachedTaskRunById(...args); } -export async function onBeforeMarkLost(params: { - taskId: string; - runtime: TaskRuntime; - task: TaskRecord; -}): Promise<{ recovered: boolean }> { - const hook = getDetachedTaskLifecycleRuntime().onBeforeMarkLost; +export async function tryRecoverTaskBeforeMarkLost( + params: DetachedTaskRecoveryAttemptParams, +): Promise { + const hook = getDetachedTaskLifecycleRuntime().tryRecoverTaskBeforeMarkLost; if (!hook) { return { recovered: false }; } + const startedAt = Date.now(); try { const result = await hook(params); + const elapsedMs = Date.now() - startedAt; + if (elapsedMs >= DETACHED_TASK_RECOVERY_WARN_MS) { + log.warn("Detached task recovery hook was slow", { + taskId: params.taskId, + runtime: params.runtime, + elapsedMs, + }); + } if (result && typeof result.recovered === "boolean") { return result; } - return { recovered: false }; - } catch (err) { - log.warn("onBeforeMarkLost hook threw, proceeding with markTaskLost", { + log.warn("Detached task recovery hook returned invalid result, proceeding with markTaskLost", { taskId: params.taskId, runtime: params.runtime, + result, + }); + return { recovered: false }; + } catch (err) { + log.warn("Detached task recovery hook threw, proceeding with markTaskLost", { + taskId: params.taskId, + runtime: params.runtime, + elapsedMs: Date.now() - startedAt, error: err, }); return { recovered: false }; diff --git a/src/tasks/task-registry.maintenance.issue-60299.test.ts b/src/tasks/task-registry.maintenance.issue-60299.test.ts index 2c0e4e2da14..239e7f97733 100644 --- a/src/tasks/task-registry.maintenance.issue-60299.test.ts +++ b/src/tasks/task-registry.maintenance.issue-60299.test.ts @@ -8,6 +8,7 @@ import { getDetachedTaskLifecycleRuntime, } from "./detached-task-runtime.js"; import { + previewTaskRegistryMaintenance, resetTaskRegistryMaintenanceRuntimeForTests, runTaskRegistryMaintenance, setTaskRegistryMaintenanceRuntimeForTests, @@ -204,7 +205,7 @@ describe("task-registry maintenance issue #60299", () => { expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" }); }); - it("skips markTaskLost and counts recovered when onBeforeMarkLost hook recovers a stale task", async () => { + it("skips markTaskLost and counts recovered when recovery hook recovers a stale task", async () => { const task = makeStaleTask({ runtime: "cron", sourceId: "cron-job-recovered", @@ -215,13 +216,23 @@ describe("task-registry maintenance issue #60299", () => { tasks: [task], }); + const recoveryHook = vi.fn(() => ({ recovered: true })); setDetachedTaskLifecycleRuntime({ ...getDetachedTaskLifecycleRuntime(), - onBeforeMarkLost: vi.fn(() => ({ recovered: true })), + tryRecoverTaskBeforeMarkLost: recoveryHook, }); + expect(previewTaskRegistryMaintenance()).toMatchObject({ reconciled: 1, recovered: 0 }); const result = await runTaskRegistryMaintenance(); expect(result).toMatchObject({ reconciled: 0, recovered: 1 }); expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" }); + expect(recoveryHook).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: task.taskId, + runtime: "cron", + task: expect.objectContaining({ taskId: task.taskId }), + now: expect.any(Number), + }), + ); }); }); diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index f00af95e10b..eb1257f4ceb 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -5,7 +5,7 @@ import { getAgentRunContext } from "../infra/agent-events.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { deriveSessionChatType } from "../sessions/session-chat-type.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; -import { onBeforeMarkLost } from "./detached-task-runtime.js"; +import { tryRecoverTaskBeforeMarkLost } from "./detached-task-runtime.js"; import { deleteTaskRecordById, ensureTaskRegistryReady, @@ -252,7 +252,7 @@ export function reconcileTaskLookupToken(token: string): TaskRecord | undefined return task ? reconcileTaskRecordForOperatorInspection(task) : undefined; } -// Preview is synchronous and cannot call the async onBeforeMarkLost hook, +// Preview is synchronous and cannot call the async detached-task recovery hook, // so recovered tasks are counted under reconciled here. The real sweep // in runTaskRegistryMaintenance splits them into reconciled vs recovered. export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary { @@ -312,10 +312,11 @@ export async function runTaskRegistryMaintenance(): Promise