tasks: add detached task recovery hook before markLost (#69313)

Merged via squash.

Prepared head SHA: 24322af4f7
Co-authored-by: garrytan <19957+garrytan@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
Garry Tan
2026-04-21 06:58:20 +08:00
committed by GitHub
parent 871aa9d0b9
commit c8086b731a
8 changed files with 266 additions and 7 deletions

View File

@@ -1,2 +1,2 @@
4ec700ac180b7eca81ca48885bc7f645dbf5016e2438e44678f4c206eed4b643 plugin-sdk-api-baseline.json
ff0d1541e7220c67d97444304568285303e423770bd6af6227afdf470bf233cc plugin-sdk-api-baseline.jsonl
c1d52cab340bc37d6750de609ba565598cbfc883291786092b0bfebec9d0c926 plugin-sdk-api-baseline.json
d03fa4539c6a2f0a138da3c658543315ad56648c3fd375591d09253d5db09298 plugin-sdk-api-baseline.jsonl

View File

@@ -531,7 +531,7 @@ export async function tasksMaintenanceCommand(
runtime.log(
info(
`Tasks maintenance (${opts.apply ? "applied" : "preview"}): tasks ${taskMaintenance.reconciled} reconcile · ${taskMaintenance.cleanupStamped} cleanup stamp · ${taskMaintenance.pruned} prune; task-flows ${flowMaintenance.reconciled} reconcile · ${flowMaintenance.pruned} prune`,
`Tasks maintenance (${opts.apply ? "applied" : "preview"}): tasks ${taskMaintenance.reconciled} reconcile · ${taskMaintenance.recovered} recovered · ${taskMaintenance.cleanupStamped} cleanup stamp · ${taskMaintenance.pruned} prune; task-flows ${flowMaintenance.reconciled} reconcile · ${flowMaintenance.pruned} prune`,
),
);
runtime.log(

View File

@@ -97,6 +97,17 @@ export type DetachedTaskCancelResult = {
task?: TaskRecord;
};
export type DetachedTaskRecoveryAttemptParams = {
taskId: string;
runtime: TaskRuntime;
task: TaskRecord;
now: number;
};
export type DetachedTaskRecoveryAttemptResult = {
recovered: boolean;
};
export type DetachedTaskLifecycleRuntime = {
createQueuedTaskRun: (params: DetachedTaskCreateParams) => TaskRecord;
createRunningTaskRun: (params: DetachedRunningTaskCreateParams) => TaskRecord;
@@ -112,6 +123,13 @@ export type DetachedTaskLifecycleRuntime = {
cancelDetachedTaskRunById: (
params: DetachedTaskCancelParams,
) => Promise<DetachedTaskCancelResult>;
/**
* Give a registered detached runtime one last chance to recover a stale task
* before core marks it lost during maintenance.
*/
tryRecoverTaskBeforeMarkLost?: (
params: DetachedTaskRecoveryAttemptParams,
) => DetachedTaskRecoveryAttemptResult | Promise<DetachedTaskRecoveryAttemptResult>;
};
export type DetachedTaskLifecycleRuntimeRegistration = {

View File

@@ -13,9 +13,28 @@ import {
setDetachedTaskLifecycleRuntime,
setDetachedTaskDeliveryStatusByRunId,
startTaskRunByRunId,
tryRecoverTaskBeforeMarkLost,
} from "./detached-task-runtime.js";
import type { TaskRecord } from "./task-registry.types.js";
const { mockLogWarn } = vi.hoisted(() => ({
mockLogWarn: vi.fn(),
}));
vi.mock("../logging/subsystem.js", () => ({
createSubsystemLogger: () => ({
subsystem: "tasks/detached-runtime",
isEnabled: () => true,
trace: vi.fn(),
debug: vi.fn(),
info: vi.fn(),
warn: mockLogWarn,
error: vi.fn(),
fatal: vi.fn(),
raw: vi.fn(),
child: vi.fn(),
}),
}));
function createFakeTaskRecord(overrides?: Partial<TaskRecord>): TaskRecord {
return {
taskId: "task-fake",
@@ -36,6 +55,7 @@ function createFakeTaskRecord(overrides?: Partial<TaskRecord>): TaskRecord {
describe("detached-task-runtime", () => {
afterEach(() => {
resetDetachedTaskLifecycleRuntimeForTests();
mockLogWarn.mockClear();
});
it("dispatches lifecycle operations through the installed runtime", async () => {
@@ -145,4 +165,109 @@ describe("detached-task-runtime", () => {
});
expect(getDetachedTaskLifecycleRuntime()).toBe(runtime);
});
describe("tryRecoverTaskBeforeMarkLost", () => {
it("returns recovered when hook returns recovered true", async () => {
const task = createFakeTaskRecord({ taskId: "task-recover", runtime: "subagent" });
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ recovered: true })),
});
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 123,
});
expect(result).toEqual({ recovered: true });
});
it("returns not recovered when hook returns recovered false", async () => {
const task = createFakeTaskRecord({ taskId: "task-no-recover", runtime: "cron" });
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ recovered: false })),
});
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 456,
});
expect(result).toEqual({ recovered: false });
});
it("returns not recovered when hook is not provided", async () => {
const task = createFakeTaskRecord({ taskId: "task-no-hook", runtime: "cli" });
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 789,
});
expect(result).toEqual({ recovered: false });
});
it("returns not recovered and logs warning when hook throws", async () => {
const task = createFakeTaskRecord({ taskId: "task-throw", runtime: "acp" });
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
tryRecoverTaskBeforeMarkLost: vi.fn(() => {
throw new Error("plugin crashed");
}),
});
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 1_000,
});
expect(result).toEqual({ recovered: false });
expect(mockLogWarn).toHaveBeenCalledWith(
"Detached task recovery hook threw, proceeding with markTaskLost",
expect.objectContaining({ taskId: "task-throw", runtime: "acp", elapsedMs: 0 }),
);
});
it("returns not recovered and logs warning when hook returns invalid result", async () => {
const task = createFakeTaskRecord({ taskId: "task-invalid", runtime: "cron" });
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ nope: true }) as never),
});
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 2_000,
});
expect(result).toEqual({ recovered: false });
expect(mockLogWarn).toHaveBeenCalledWith(
"Detached task recovery hook returned invalid result, proceeding with markTaskLost",
expect.objectContaining({ taskId: "task-invalid", runtime: "cron" }),
);
});
it("logs when the recovery hook is slow", async () => {
const task = createFakeTaskRecord({ taskId: "task-slow", runtime: "subagent" });
const dateNowSpy = vi.spyOn(Date, "now");
dateNowSpy.mockReturnValueOnce(10_000).mockReturnValueOnce(16_000);
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
tryRecoverTaskBeforeMarkLost: vi.fn(async () => ({ recovered: true })),
});
const result = await tryRecoverTaskBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
now: 3_000,
});
expect(result).toEqual({ recovered: true });
expect(mockLogWarn).toHaveBeenCalledWith(
"Detached task recovery hook was slow",
expect.objectContaining({ taskId: "task-slow", runtime: "subagent", elapsedMs: 6_000 }),
);
dateNowSpy.mockRestore();
});
});
});

View File

@@ -1,4 +1,7 @@
import { createSubsystemLogger } from "../logging/subsystem.js";
import type {
DetachedTaskRecoveryAttemptParams,
DetachedTaskRecoveryAttemptResult,
DetachedTaskLifecycleRuntime,
DetachedTaskLifecycleRuntimeRegistration,
} from "./detached-task-runtime-contract.js";
@@ -19,6 +22,9 @@ import {
startTaskRunByRunId as startTaskRunByRunIdFromExecutor,
} from "./task-executor.js";
const log = createSubsystemLogger("tasks/detached-runtime");
const DETACHED_TASK_RECOVERY_WARN_MS = 5_000;
export type { DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration };
const DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME: DetachedTaskLifecycleRuntime = {
@@ -104,3 +110,41 @@ export function cancelDetachedTaskRunById(
): ReturnType<DetachedTaskLifecycleRuntime["cancelDetachedTaskRunById"]> {
return getDetachedTaskLifecycleRuntime().cancelDetachedTaskRunById(...args);
}
export async function tryRecoverTaskBeforeMarkLost(
params: DetachedTaskRecoveryAttemptParams,
): Promise<DetachedTaskRecoveryAttemptResult> {
const hook = getDetachedTaskLifecycleRuntime().tryRecoverTaskBeforeMarkLost;
if (!hook) {
return { recovered: false };
}
const startedAt = Date.now();
try {
const result = await hook(params);
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= DETACHED_TASK_RECOVERY_WARN_MS) {
log.warn("Detached task recovery hook was slow", {
taskId: params.taskId,
runtime: params.runtime,
elapsedMs,
});
}
if (result && typeof result.recovered === "boolean") {
return result;
}
log.warn("Detached task recovery hook returned invalid result, proceeding with markTaskLost", {
taskId: params.taskId,
runtime: params.runtime,
result,
});
return { recovered: false };
} catch (err) {
log.warn("Detached task recovery hook threw, proceeding with markTaskLost", {
taskId: params.taskId,
runtime: params.runtime,
elapsedMs: Date.now() - startedAt,
error: err,
});
return { recovered: false };
}
}

View File

@@ -1,8 +1,14 @@
import { afterEach, describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js";
import type { SessionEntry } from "../config/sessions.js";
import type { ParsedAgentSessionKey } from "../routing/session-key.js";
import {
resetDetachedTaskLifecycleRuntimeForTests,
setDetachedTaskLifecycleRuntime,
getDetachedTaskLifecycleRuntime,
} from "./detached-task-runtime.js";
import {
previewTaskRegistryMaintenance,
resetTaskRegistryMaintenanceRuntimeForTests,
runTaskRegistryMaintenance,
setTaskRegistryMaintenanceRuntimeForTests,
@@ -38,6 +44,7 @@ type TaskRegistryMaintenanceRuntime = Parameters<
afterEach(() => {
stopTaskRegistryMaintenanceForTests();
resetTaskRegistryMaintenanceRuntimeForTests();
resetDetachedTaskLifecycleRuntimeForTests();
});
function createTaskRegistryMaintenanceHarness(params: {
@@ -197,4 +204,35 @@ describe("task-registry maintenance issue #60299", () => {
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
});
it("skips markTaskLost and counts recovered when recovery hook recovers a stale task", async () => {
const task = makeStaleTask({
runtime: "cron",
sourceId: "cron-job-recovered",
childSessionKey: undefined,
});
const { currentTasks } = createTaskRegistryMaintenanceHarness({
tasks: [task],
});
const recoveryHook = vi.fn(() => ({ recovered: true }));
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
tryRecoverTaskBeforeMarkLost: recoveryHook,
});
expect(previewTaskRegistryMaintenance()).toMatchObject({ reconciled: 1, recovered: 0 });
const result = await runTaskRegistryMaintenance();
expect(result).toMatchObject({ reconciled: 0, recovered: 1 });
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
expect(recoveryHook).toHaveBeenCalledWith(
expect.objectContaining({
taskId: task.taskId,
runtime: "cron",
task: expect.objectContaining({ taskId: task.taskId }),
now: expect.any(Number),
}),
);
});
});

View File

@@ -5,6 +5,7 @@ import { getAgentRunContext } from "../infra/agent-events.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { deriveSessionChatType } from "../sessions/session-chat-type.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { tryRecoverTaskBeforeMarkLost } from "./detached-task-runtime.js";
import {
deleteTaskRecordById,
ensureTaskRegistryReady,
@@ -77,6 +78,7 @@ let taskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime =
export type TaskRegistryMaintenanceSummary = {
reconciled: number;
recovered: number;
cleanupStamped: number;
pruned: number;
};
@@ -250,6 +252,9 @@ export function reconcileTaskLookupToken(token: string): TaskRecord | undefined
return task ? reconcileTaskRecordForOperatorInspection(task) : undefined;
}
// Preview is synchronous and cannot call the async detached-task recovery hook,
// so recovered tasks are counted under reconciled here. The real sweep
// in runTaskRegistryMaintenance splits them into reconciled vs recovered.
export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary {
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
const now = Date.now();
@@ -269,7 +274,7 @@ export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary
cleanupStamped += 1;
}
}
return { reconciled, cleanupStamped, pruned };
return { reconciled, recovered: 0, cleanupStamped, pruned };
}
/**
@@ -296,6 +301,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
const now = Date.now();
let reconciled = 0;
let recovered = 0;
let cleanupStamped = 0;
let pruned = 0;
const tasks = taskRegistryMaintenanceRuntime.listTaskRecords();
@@ -306,7 +312,29 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
continue;
}
if (shouldMarkLost(current, now)) {
const next = markTaskLost(current, now);
const recovery = await tryRecoverTaskBeforeMarkLost({
taskId: current.taskId,
runtime: current.runtime,
task: current,
now,
});
const freshAfterHook = taskRegistryMaintenanceRuntime.getTaskById(current.taskId);
if (!freshAfterHook || !shouldMarkLost(freshAfterHook, now)) {
processed += 1;
if (processed % SWEEP_YIELD_BATCH_SIZE === 0) {
await yieldToEventLoop();
}
continue;
}
if (recovery.recovered) {
recovered += 1;
processed += 1;
if (processed % SWEEP_YIELD_BATCH_SIZE === 0) {
await yieldToEventLoop();
}
continue;
}
const next = markTaskLost(freshAfterHook, now);
if (next.status === "lost") {
reconciled += 1;
}
@@ -341,7 +369,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
await yieldToEventLoop();
}
}
return { reconciled, cleanupStamped, pruned };
return { reconciled, recovered, cleanupStamped, pruned };
}
export async function sweepTaskRegistry(): Promise<TaskRegistryMaintenanceSummary> {

View File

@@ -1328,6 +1328,7 @@ describe("task-registry", () => {
expect(await runTaskRegistryMaintenance()).toEqual({
reconciled: 1,
recovered: 0,
cleanupStamped: 0,
pruned: 0,
});
@@ -1363,6 +1364,7 @@ describe("task-registry", () => {
expect(await sweepTaskRegistry()).toEqual({
reconciled: 0,
recovered: 0,
cleanupStamped: 0,
pruned: 1,
});
@@ -1406,12 +1408,14 @@ describe("task-registry", () => {
expect(previewTaskRegistryMaintenance()).toEqual({
reconciled: 0,
recovered: 0,
cleanupStamped: 1,
pruned: 0,
});
expect(await runTaskRegistryMaintenance()).toEqual({
reconciled: 0,
recovered: 0,
cleanupStamped: 1,
pruned: 0,
});
@@ -1530,6 +1534,7 @@ describe("task-registry", () => {
expect(await runTaskRegistryMaintenance()).toEqual({
reconciled: 0,
recovered: 0,
cleanupStamped: 0,
pruned: 0,
});
@@ -1570,6 +1575,7 @@ describe("task-registry", () => {
expect(await sweepTaskRegistry()).toEqual({
reconciled: 0,
recovered: 0,
cleanupStamped: 0,
pruned: 0,
});