tasks: add onBeforeMarkLost recovery hook to detached runtime seam

Let a registered DetachedTaskLifecycleRuntime prevent the maintenance sweep
from marking a recoverable task as lost. When the optional onBeforeMarkLost
hook returns { recovered: true }, the sweep skips markTaskLost and increments
a new `recovered` counter in TaskRegistryMaintenanceSummary.

The hook receives the full TaskRecord and is wrapped in try/catch: if it
throws, the sweep logs a warning and proceeds with markTaskLost (safe
default). After the async hook returns, the sweep re-reads the task to
guard against concurrent completion.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Garry Tan
2026-04-20 05:28:03 +08:00
committed by Mariano Belinky
parent 871aa9d0b9
commit 839f1e6131
6 changed files with 137 additions and 4 deletions

View File

@@ -531,7 +531,7 @@ export async function tasksMaintenanceCommand(
runtime.log(
info(
`Tasks maintenance (${opts.apply ? "applied" : "preview"}): tasks ${taskMaintenance.reconciled} reconcile · ${taskMaintenance.cleanupStamped} cleanup stamp · ${taskMaintenance.pruned} prune; task-flows ${flowMaintenance.reconciled} reconcile · ${flowMaintenance.pruned} prune`,
`Tasks maintenance (${opts.apply ? "applied" : "preview"}): tasks ${taskMaintenance.reconciled} reconcile · ${taskMaintenance.recovered} recovered · ${taskMaintenance.cleanupStamped} cleanup stamp · ${taskMaintenance.pruned} prune; task-flows ${flowMaintenance.reconciled} reconcile · ${flowMaintenance.pruned} prune`,
),
);
runtime.log(

View File

@@ -112,6 +112,11 @@ export type DetachedTaskLifecycleRuntime = {
cancelDetachedTaskRunById: (
params: DetachedTaskCancelParams,
) => Promise<DetachedTaskCancelResult>;
onBeforeMarkLost?: (params: {
taskId: string;
runtime: TaskRuntime;
task: TaskRecord;
}) => { recovered: boolean } | Promise<{ recovered: boolean }>;
};
export type DetachedTaskLifecycleRuntimeRegistration = {

View File

@@ -7,6 +7,7 @@ import {
failTaskRunByRunId,
getDetachedTaskLifecycleRuntime,
getDetachedTaskLifecycleRuntimeRegistration,
onBeforeMarkLost,
registerDetachedTaskRuntime,
recordTaskRunProgressByRunId,
resetDetachedTaskLifecycleRuntimeForTests,
@@ -145,4 +146,60 @@ describe("detached-task-runtime", () => {
});
expect(getDetachedTaskLifecycleRuntime()).toBe(runtime);
});
describe("onBeforeMarkLost", () => {
it("returns recovered when hook returns recovered true", async () => {
const task = createFakeTaskRecord({ taskId: "task-recover", runtime: "subagent" });
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
onBeforeMarkLost: vi.fn(() => ({ recovered: true })),
});
const result = await onBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
});
expect(result).toEqual({ recovered: true });
});
it("returns not recovered when hook returns recovered false", async () => {
const task = createFakeTaskRecord({ taskId: "task-no-recover", runtime: "cron" });
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
onBeforeMarkLost: vi.fn(() => ({ recovered: false })),
});
const result = await onBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
});
expect(result).toEqual({ recovered: false });
});
it("returns not recovered when hook is not provided", async () => {
const task = createFakeTaskRecord({ taskId: "task-no-hook", runtime: "cli" });
const result = await onBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
});
expect(result).toEqual({ recovered: false });
});
it("returns not recovered and logs warning when hook throws", async () => {
const task = createFakeTaskRecord({ taskId: "task-throw", runtime: "acp" });
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
onBeforeMarkLost: vi.fn(() => {
throw new Error("plugin crashed");
}),
});
const result = await onBeforeMarkLost({
taskId: task.taskId,
runtime: task.runtime,
task,
});
expect(result).toEqual({ recovered: false });
});
});
});

View File

@@ -1,3 +1,4 @@
import { createSubsystemLogger } from "../logging/subsystem.js";
import type {
DetachedTaskLifecycleRuntime,
DetachedTaskLifecycleRuntimeRegistration,
@@ -18,6 +19,9 @@ import {
setDetachedTaskDeliveryStatusByRunId as setDetachedTaskDeliveryStatusByRunIdFromExecutor,
startTaskRunByRunId as startTaskRunByRunIdFromExecutor,
} from "./task-executor.js";
import type { TaskRecord, TaskRuntime } from "./task-registry.types.js";
const log = createSubsystemLogger("tasks/detached-runtime");
export type { DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration };
@@ -104,3 +108,24 @@ export function cancelDetachedTaskRunById(
): ReturnType<DetachedTaskLifecycleRuntime["cancelDetachedTaskRunById"]> {
return getDetachedTaskLifecycleRuntime().cancelDetachedTaskRunById(...args);
}
export async function onBeforeMarkLost(params: {
taskId: string;
runtime: TaskRuntime;
task: TaskRecord;
}): Promise<{ recovered: boolean }> {
const hook = getDetachedTaskLifecycleRuntime().onBeforeMarkLost;
if (!hook) {
return { recovered: false };
}
try {
return await hook(params);
} catch (err) {
log.warn("onBeforeMarkLost hook threw, proceeding with markTaskLost", {
taskId: params.taskId,
runtime: params.runtime,
error: err,
});
return { recovered: false };
}
}

View File

@@ -1,7 +1,12 @@
import { afterEach, describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js";
import type { SessionEntry } from "../config/sessions.js";
import type { ParsedAgentSessionKey } from "../routing/session-key.js";
import {
resetDetachedTaskLifecycleRuntimeForTests,
setDetachedTaskLifecycleRuntime,
getDetachedTaskLifecycleRuntime,
} from "./detached-task-runtime.js";
import {
resetTaskRegistryMaintenanceRuntimeForTests,
runTaskRegistryMaintenance,
@@ -38,6 +43,7 @@ type TaskRegistryMaintenanceRuntime = Parameters<
afterEach(() => {
stopTaskRegistryMaintenanceForTests();
resetTaskRegistryMaintenanceRuntimeForTests();
resetDetachedTaskLifecycleRuntimeForTests();
});
function createTaskRegistryMaintenanceHarness(params: {
@@ -197,4 +203,25 @@ describe("task-registry maintenance issue #60299", () => {
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
});
it("skips markTaskLost and counts recovered when onBeforeMarkLost hook recovers a stale task", async () => {
const task = makeStaleTask({
runtime: "cron",
sourceId: "cron-job-recovered",
childSessionKey: undefined,
});
const { currentTasks } = createTaskRegistryMaintenanceHarness({
tasks: [task],
});
setDetachedTaskLifecycleRuntime({
...getDetachedTaskLifecycleRuntime(),
onBeforeMarkLost: vi.fn(() => ({ recovered: true })),
});
const result = await runTaskRegistryMaintenance();
expect(result).toMatchObject({ reconciled: 0, recovered: 1 });
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
});
});

View File

@@ -5,6 +5,7 @@ import { getAgentRunContext } from "../infra/agent-events.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { deriveSessionChatType } from "../sessions/session-chat-type.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { onBeforeMarkLost } from "./detached-task-runtime.js";
import {
deleteTaskRecordById,
ensureTaskRegistryReady,
@@ -77,6 +78,7 @@ let taskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime =
export type TaskRegistryMaintenanceSummary = {
reconciled: number;
recovered: number;
cleanupStamped: number;
pruned: number;
};
@@ -269,7 +271,7 @@ export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary
cleanupStamped += 1;
}
}
return { reconciled, cleanupStamped, pruned };
return { reconciled, recovered: 0, cleanupStamped, pruned };
}
/**
@@ -296,6 +298,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
taskRegistryMaintenanceRuntime.ensureTaskRegistryReady();
const now = Date.now();
let reconciled = 0;
let recovered = 0;
let cleanupStamped = 0;
let pruned = 0;
const tasks = taskRegistryMaintenanceRuntime.listTaskRecords();
@@ -306,6 +309,22 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
continue;
}
if (shouldMarkLost(current, now)) {
const recovery = await onBeforeMarkLost({
taskId: current.taskId,
runtime: current.runtime,
task: current,
});
if (recovery.recovered) {
const fresh = taskRegistryMaintenanceRuntime.getTaskById(current.taskId);
if (fresh && isActiveTask(fresh)) {
recovered += 1;
}
processed += 1;
if (processed % SWEEP_YIELD_BATCH_SIZE === 0) {
await yieldToEventLoop();
}
continue;
}
const next = markTaskLost(current, now);
if (next.status === "lost") {
reconciled += 1;
@@ -341,7 +360,7 @@ export async function runTaskRegistryMaintenance(): Promise<TaskRegistryMaintena
await yieldToEventLoop();
}
}
return { reconciled, cleanupStamped, pruned };
return { reconciled, recovered, cleanupStamped, pruned };
}
export async function sweepTaskRegistry(): Promise<TaskRegistryMaintenanceSummary> {