tasks: harden detached recovery seam

This commit is contained in:
Mariano Belinky
2026-04-21 00:33:23 +02:00
parent 1462ce50b2
commit 24322af4f7
6 changed files with 118 additions and 33 deletions

View File

@@ -1,2 +1,2 @@
4ec700ac180b7eca81ca48885bc7f645dbf5016e2438e44678f4c206eed4b643 plugin-sdk-api-baseline.json
ff0d1541e7220c67d97444304568285303e423770bd6af6227afdf470bf233cc plugin-sdk-api-baseline.jsonl
c1d52cab340bc37d6750de609ba565598cbfc883291786092b0bfebec9d0c926 plugin-sdk-api-baseline.json
d03fa4539c6a2f0a138da3c658543315ad56648c3fd375591d09253d5db09298 plugin-sdk-api-baseline.jsonl

View File

@@ -97,6 +97,17 @@ export type DetachedTaskCancelResult = {
task?: TaskRecord;
};
export type DetachedTaskRecoveryAttemptParams = {
taskId: string;
runtime: TaskRuntime;
task: TaskRecord;
now: number;
};
export type DetachedTaskRecoveryAttemptResult = {
recovered: boolean;
};
export type DetachedTaskLifecycleRuntime = {
createQueuedTaskRun: (params: DetachedTaskCreateParams) => TaskRecord;
createRunningTaskRun: (params: DetachedRunningTaskCreateParams) => TaskRecord;
@@ -112,11 +123,13 @@ export type DetachedTaskLifecycleRuntime = {
cancelDetachedTaskRunById: (
params: DetachedTaskCancelParams,
) => Promise<DetachedTaskCancelResult>;
onBeforeMarkLost?: (params: {
taskId: string;
runtime: TaskRuntime;
task: TaskRecord;
}) => { recovered: boolean } | Promise<{ recovered: boolean }>;
/**
* Give a registered detached runtime one last chance to recover a stale task
* before core marks it lost during maintenance.
*/
tryRecoverTaskBeforeMarkLost?: (
params: DetachedTaskRecoveryAttemptParams,
) => DetachedTaskRecoveryAttemptResult | Promise<DetachedTaskRecoveryAttemptResult>;
};
export type DetachedTaskLifecycleRuntimeRegistration = {

View File

@@ -7,13 +7,13 @@ import {
failTaskRunByRunId,
getDetachedTaskLifecycleRuntime,
getDetachedTaskLifecycleRuntimeRegistration,
onBeforeMarkLost,
registerDetachedTaskRuntime,
recordTaskRunProgressByRunId,
resetDetachedTaskLifecycleRuntimeForTests,
setDetachedTaskLifecycleRuntime,
setDetachedTaskDeliveryStatusByRunId,
startTaskRunByRunId,
tryRecoverTaskBeforeMarkLost,
} from "./detached-task-runtime.js";
import type { TaskRecord } from "./task-registry.types.js";
@@ -166,17 +166,18 @@ describe("detached-task-runtime", () => {
expect(getDetachedTaskLifecycleRuntime()).toBe(runtime);
});
describe("onBeforeMarkLost", () => {
describe("tryRecoverTaskBeforeMarkLost", () => {
it("returns recovered when hook returns recovered true", async () => {
const task = createFakeTaskRecord({ taskId: "task-recover", runtime: "subagent" });
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
onBeforeMarkLost: vi.fn(() => ({ recovered: true })),
tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ recovered: true })),
});
const result = await onBeforeMarkLost({
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 123,
});
expect(result).toEqual({ recovered: true });
});
@@ -185,22 +186,24 @@ describe("detached-task-runtime", () => {
const task = createFakeTaskRecord({ taskId: "task-no-recover", runtime: "cron" });
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
onBeforeMarkLost: vi.fn(() => ({ recovered: false })),
tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ recovered: false })),
});
const result = await onBeforeMarkLost({
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 456,
});
expect(result).toEqual({ recovered: false });
});
it("returns not recovered when hook is not provided", async () => {
const task = createFakeTaskRecord({ taskId: "task-no-hook", runtime: "cli" });
const result = await onBeforeMarkLost({
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 789,
});
expect(result).toEqual({ recovered: false });
});
@@ -209,20 +212,62 @@ describe("detached-task-runtime", () => {
const task = createFakeTaskRecord({ taskId: "task-throw", runtime: "acp" });
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
onBeforeMarkLost: vi.fn(() => {
tryRecoverTaskBeforeMarkLost: vi.fn(() => {
throw new Error("plugin crashed");
}),
});
const result = await onBeforeMarkLost({
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 1_000,
});
expect(result).toEqual({ recovered: false });
expect(mockLogWarn).toHaveBeenCalledWith(
"onBeforeMarkLost hook threw, proceeding with markTaskLost",
expect.objectContaining({ taskId: "task-throw", runtime: "acp" }),
"Detached task recovery hook threw, proceeding with markTaskLost",
expect.objectContaining({ taskId: "task-throw", runtime: "acp", elapsedMs: 0 }),
);
});
it("returns not recovered and logs warning when hook returns invalid result", async () => {
const task = createFakeTaskRecord({ taskId: "task-invalid", runtime: "cron" });
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ nope: true }) as never),
});
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 2_000,
});
expect(result).toEqual({ recovered: false });
expect(mockLogWarn).toHaveBeenCalledWith(
"Detached task recovery hook returned invalid result, proceeding with markTaskLost",
expect.objectContaining({ taskId: "task-invalid", runtime: "cron" }),
);
});
it("logs when the recovery hook is slow", async () => {
const task = createFakeTaskRecord({ taskId: "task-slow", runtime: "subagent" });
const dateNowSpy = vi.spyOn(Date, "now");
dateNowSpy.mockReturnValueOnce(10_000).mockReturnValueOnce(16_000);
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
tryRecoverTaskBeforeMarkLost: vi.fn(async () => ({ recovered: true })),
});
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 3_000,
});
expect(result).toEqual({ recovered: true });
expect(mockLogWarn).toHaveBeenCalledWith(
"Detached task recovery hook was slow",
expect.objectContaining({ taskId: "task-slow", runtime: "subagent", elapsedMs: 6_000 }),
);
dateNowSpy.mockRestore();
});
});
});

View File

@@ -1,5 +1,7 @@
import { createSubsystemLogger } from "../logging/subsystem.js";
import type {
DetachedTaskRecoveryAttemptParams,
DetachedTaskRecoveryAttemptResult,
DetachedTaskLifecycleRuntime,
DetachedTaskLifecycleRuntimeRegistration,
} from "./detached-task-runtime-contract.js";
@@ -19,9 +21,9 @@ import {
setDetachedTaskDeliveryStatusByRunId as setDetachedTaskDeliveryStatusByRunIdFromExecutor,
startTaskRunByRunId as startTaskRunByRunIdFromExecutor,
} from "./task-executor.js";
import type { TaskRecord, TaskRuntime } from "./task-registry.types.js";
const log = createSubsystemLogger("tasks/detached-runtime");
const DETACHED_TASK_RECOVERY_WARN_MS = 5_000;
export type { DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration };
@@ -109,25 +111,38 @@ export function cancelDetachedTaskRunById(
return getDetachedTaskLifecycleRuntime().cancelDetachedTaskRunById(...args);
}
export async function onBeforeMarkLost(params: {
taskId: string;
runtime: TaskRuntime;
task: TaskRecord;
}): Promise<{ recovered: boolean }> {
const hook = getDetachedTaskLifecycleRuntime().onBeforeMarkLost;
export async function tryRecoverTaskBeforeMarkLost(
params: DetachedTaskRecoveryAttemptParams,
): Promise<DetachedTaskRecoveryAttemptResult> {
const hook = getDetachedTaskLifecycleRuntime().tryRecoverTaskBeforeMarkLost;
if (!hook) {
return { recovered: false };
}
const startedAt = Date.now();
try {
const result = await hook(params);
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= DETACHED_TASK_RECOVERY_WARN_MS) {
log.warn("Detached task recovery hook was slow", {
taskId: params.taskId,
runtime: params.runtime,
elapsedMs,
});
}
if (result && typeof result.recovered === "boolean") {
return result;
}
return { recovered: false };
} catch (err) {
log.warn("onBeforeMarkLost hook threw, proceeding with markTaskLost", {
log.warn("Detached task recovery hook returned invalid result, proceeding with markTaskLost", {
taskId: params.taskId,
runtime: params.runtime,
result,
});
return { recovered: false };
} catch (err) {
log.warn("Detached task recovery hook threw, proceeding with markTaskLost", {
taskId: params.taskId,
runtime: params.runtime,
elapsedMs: Date.now() - startedAt,
error: err,
});
return { recovered: false };

View File

@@ -8,6 +8,7 @@ import {
getDetachedTaskLifecycleRuntime,
} from "./detached-task-runtime.js";
import {
previewTaskRegistryMaintenance,
resetTaskRegistryMaintenanceRuntimeForTests,
runTaskRegistryMaintenance,
setTaskRegistryMaintenanceRuntimeForTests,
@@ -204,7 +205,7 @@ describe("task-registry maintenance issue #60299", () => {
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
});
it("skips markTaskLost and counts recovered when onBeforeMarkLost hook recovers a stale task", async () => {
it("skips markTaskLost and counts recovered when recovery hook recovers a stale task", async () => {
const task = makeStaleTask({
runtime: "cron",
sourceId: "cron-job-recovered",
@@ -215,13 +216,23 @@ describe("task-registry maintenance issue #60299", () => {
tasks: [task],
});
const recoveryHook = vi.fn(() => ({ recovered: true }));
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
onBeforeMarkLost: vi.fn(() => ({ recovered: true })),
tryRecoverTaskBeforeMarkLost: recoveryHook,
});
expect(previewTaskRegistryMaintenance()).toMatchObject({ reconciled: 1, recovered: 0 });
const result = await runTaskRegistryMaintenance();
expect(result).toMatchObject({ reconciled: 0, recovered: 1 });
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
expect(recoveryHook).toHaveBeenCalledWith(
expect.objectContaining({
taskId: task.taskId,
runtime: "cron",
task: expect.objectContaining({ taskId: task.taskId }),
now: expect.any(Number),
}),
);
});
});

View File

@@ -5,7 +5,7 @@ import { getAgentRunContext } from "../infra/agent-events.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { deriveSessionChatType } from "../sessions/session-chat-type.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { onBeforeMarkLost } from "./detached-task-runtime.js";
import { tryRecoverTaskBeforeMarkLost } from "./detached-task-runtime.js";
import {
deleteTaskRecordById,
ensureTaskRegistryReady,
@@ -252,7 +252,7 @@ export function reconcileTaskLookupToken(token: string): TaskRecord | undefined
return task ? reconcileTaskRecordForOperatorInspection(task) : undefined;
}
// Preview is synchronous and cannot call the async onBeforeMarkLost hook,
// Preview is synchronous and cannot call the async detached-task recovery hook,
// so recovered tasks are counted under reconciled here. The real sweep
// in runTaskRegistryMaintenance splits them into reconciled vs recovered.
export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary {
@@ -312,10 +312,11 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
continue;
}
if (shouldMarkLost(current, now)) {
const recovery = await onBeforeMarkLost({
const recovery = await tryRecoverTaskBeforeMarkLost({
taskId: current.taskId,
runtime: current.runtime,
task: current,
now,
});
const freshAfterHook = taskRegistryMaintenanceRuntime.getTaskById(current.taskId);
if (!freshAfterHook || !shouldMarkLost(freshAfterHook, now)) {