diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index e086e690a94..eafa67247ca 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -4ec700ac180b7eca81ca48885bc7f645dbf5016e2438e44678f4c206eed4b643 plugin-sdk-api-baseline.json -ff0d1541e7220c67d97444304568285303e423770bd6af6227afdf470bf233cc plugin-sdk-api-baseline.jsonl +c1d52cab340bc37d6750de609ba565598cbfc883291786092b0bfebec9d0c926 plugin-sdk-api-baseline.json +d03fa4539c6a2f0a138da3c658543315ad56648c3fd375591d09253d5db09298 plugin-sdk-api-baseline.jsonl diff --git a/src/commands/tasks.ts b/src/commands/tasks.ts index cb111ed647c..65dc46778d7 100644 --- a/src/commands/tasks.ts +++ b/src/commands/tasks.ts @@ -531,7 +531,7 @@ export async function tasksMaintenanceCommand( runtime.log( info( - `Tasks maintenance (${opts.apply ? "applied" : "preview"}): tasks ${taskMaintenance.reconciled} reconcile · ${taskMaintenance.cleanupStamped} cleanup stamp · ${taskMaintenance.pruned} prune; task-flows ${flowMaintenance.reconciled} reconcile · ${flowMaintenance.pruned} prune`, + `Tasks maintenance (${opts.apply ? "applied" : "preview"}): tasks ${taskMaintenance.reconciled} reconcile · ${taskMaintenance.recovered} recovered · ${taskMaintenance.cleanupStamped} cleanup stamp · ${taskMaintenance.pruned} prune; task-flows ${flowMaintenance.reconciled} reconcile · ${flowMaintenance.pruned} prune`, ), ); runtime.log( diff --git a/src/tasks/detached-task-runtime-contract.ts b/src/tasks/detached-task-runtime-contract.ts index e1e82d9f09f..018f8115a24 100644 --- a/src/tasks/detached-task-runtime-contract.ts +++ b/src/tasks/detached-task-runtime-contract.ts @@ -97,6 +97,17 @@ export type DetachedTaskCancelResult = { task?: TaskRecord; }; +export type DetachedTaskRecoveryAttemptParams = { + taskId: string; + runtime: TaskRuntime; + task: TaskRecord; + now: number; +}; + +export type DetachedTaskRecoveryAttemptResult = { + recovered: boolean; +}; + export type DetachedTaskLifecycleRuntime = { createQueuedTaskRun: (params: DetachedTaskCreateParams) => TaskRecord; createRunningTaskRun: (params: DetachedRunningTaskCreateParams) => TaskRecord; @@ -112,6 +123,13 @@ export type DetachedTaskLifecycleRuntime = { cancelDetachedTaskRunById: ( params: DetachedTaskCancelParams, ) => Promise; + /** + * Give a registered detached runtime one last chance to recover a stale task + * before core marks it lost during maintenance. + */ + tryRecoverTaskBeforeMarkLost?: ( + params: DetachedTaskRecoveryAttemptParams, + ) => DetachedTaskRecoveryAttemptResult | Promise; }; export type DetachedTaskLifecycleRuntimeRegistration = { diff --git a/src/tasks/detached-task-runtime.test.ts b/src/tasks/detached-task-runtime.test.ts index 26eaea3280f..01ae49d1376 100644 --- a/src/tasks/detached-task-runtime.test.ts +++ b/src/tasks/detached-task-runtime.test.ts @@ -13,9 +13,28 @@ import { setDetachedTaskLifecycleRuntime, setDetachedTaskDeliveryStatusByRunId, startTaskRunByRunId, + tryRecoverTaskBeforeMarkLost, } from "./detached-task-runtime.js"; import type { TaskRecord } from "./task-registry.types.js"; +const { mockLogWarn } = vi.hoisted(() => ({ + mockLogWarn: vi.fn(), +})); +vi.mock("../logging/subsystem.js", () => ({ + createSubsystemLogger: () => ({ + subsystem: "tasks/detached-runtime", + isEnabled: () => true, + trace: vi.fn(), + debug: vi.fn(), + info: vi.fn(), + warn: mockLogWarn, + error: vi.fn(), + fatal: vi.fn(), + raw: vi.fn(), + child: vi.fn(), + }), +})); + function createFakeTaskRecord(overrides?: Partial): TaskRecord { return { taskId: "task-fake", @@ -36,6 +55,7 @@ function createFakeTaskRecord(overrides?: Partial): TaskRecord { describe("detached-task-runtime", () => { afterEach(() => { resetDetachedTaskLifecycleRuntimeForTests(); + mockLogWarn.mockClear(); }); it("dispatches lifecycle operations through the installed runtime", async () => { @@ -145,4 +165,109 @@ describe("detached-task-runtime", () => { }); expect(getDetachedTaskLifecycleRuntime()).toBe(runtime); }); + + describe("tryRecoverTaskBeforeMarkLost", () => { + it("returns recovered when hook returns recovered true", async () => { + const task = createFakeTaskRecord({ taskId: "task-recover", runtime: "subagent" }); + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ recovered: true })), + }); + const result = await tryRecoverTaskBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + now: 123, + }); + expect(result).toEqual({ recovered: true }); + }); + + it("returns not recovered when hook returns recovered false", async () => { + const task = createFakeTaskRecord({ taskId: "task-no-recover", runtime: "cron" }); + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ recovered: false })), + }); + const result = await tryRecoverTaskBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + now: 456, + }); + expect(result).toEqual({ recovered: false }); + }); + + it("returns not recovered when hook is not provided", async () => { + const task = createFakeTaskRecord({ taskId: "task-no-hook", runtime: "cli" }); + const result = await tryRecoverTaskBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + now: 789, + }); + expect(result).toEqual({ recovered: false }); + }); + + it("returns not recovered and logs warning when hook throws", async () => { + const task = createFakeTaskRecord({ taskId: "task-throw", runtime: "acp" }); + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + tryRecoverTaskBeforeMarkLost: vi.fn(() => { + throw new Error("plugin crashed"); + }), + }); + const result = await tryRecoverTaskBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + now: 1_000, + }); + expect(result).toEqual({ recovered: false }); + expect(mockLogWarn).toHaveBeenCalledWith( + "Detached task recovery hook threw, proceeding with markTaskLost", + expect.objectContaining({ taskId: "task-throw", runtime: "acp", elapsedMs: 0 }), + ); + }); + + it("returns not recovered and logs warning when hook returns invalid result", async () => { + const task = createFakeTaskRecord({ taskId: "task-invalid", runtime: "cron" }); + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + tryRecoverTaskBeforeMarkLost: vi.fn(() => ({ nope: true }) as never), + }); + const result = await tryRecoverTaskBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + now: 2_000, + }); + expect(result).toEqual({ recovered: false }); + expect(mockLogWarn).toHaveBeenCalledWith( + "Detached task recovery hook returned invalid result, proceeding with markTaskLost", + expect.objectContaining({ taskId: "task-invalid", runtime: "cron" }), + ); + }); + + it("logs when the recovery hook is slow", async () => { + const task = createFakeTaskRecord({ taskId: "task-slow", runtime: "subagent" }); + const dateNowSpy = vi.spyOn(Date, "now"); + dateNowSpy.mockReturnValueOnce(10_000).mockReturnValueOnce(16_000); + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + tryRecoverTaskBeforeMarkLost: vi.fn(async () => ({ recovered: true })), + }); + const result = await tryRecoverTaskBeforeMarkLost({ + taskId: task.taskId, + runtime: task.runtime, + task, + now: 3_000, + }); + expect(result).toEqual({ recovered: true }); + expect(mockLogWarn).toHaveBeenCalledWith( + "Detached task recovery hook was slow", + expect.objectContaining({ taskId: "task-slow", runtime: "subagent", elapsedMs: 6_000 }), + ); + dateNowSpy.mockRestore(); + }); + }); }); diff --git a/src/tasks/detached-task-runtime.ts b/src/tasks/detached-task-runtime.ts index 643f15af13f..a62042177c8 100644 --- a/src/tasks/detached-task-runtime.ts +++ b/src/tasks/detached-task-runtime.ts @@ -1,4 +1,7 @@ +import { createSubsystemLogger } from "../logging/subsystem.js"; import type { + DetachedTaskRecoveryAttemptParams, + DetachedTaskRecoveryAttemptResult, DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration, } from "./detached-task-runtime-contract.js"; @@ -19,6 +22,9 @@ import { startTaskRunByRunId as startTaskRunByRunIdFromExecutor, } from "./task-executor.js"; +const log = createSubsystemLogger("tasks/detached-runtime"); +const DETACHED_TASK_RECOVERY_WARN_MS = 5_000; + export type { DetachedTaskLifecycleRuntime, DetachedTaskLifecycleRuntimeRegistration }; const DEFAULT_DETACHED_TASK_LIFECYCLE_RUNTIME: DetachedTaskLifecycleRuntime = { @@ -104,3 +110,41 @@ export function cancelDetachedTaskRunById( ): ReturnType { return getDetachedTaskLifecycleRuntime().cancelDetachedTaskRunById(...args); } + +export async function tryRecoverTaskBeforeMarkLost( + params: DetachedTaskRecoveryAttemptParams, +): Promise { + const hook = getDetachedTaskLifecycleRuntime().tryRecoverTaskBeforeMarkLost; + if (!hook) { + return { recovered: false }; + } + const startedAt = Date.now(); + try { + const result = await hook(params); + const elapsedMs = Date.now() - startedAt; + if (elapsedMs >= DETACHED_TASK_RECOVERY_WARN_MS) { + log.warn("Detached task recovery hook was slow", { + taskId: params.taskId, + runtime: params.runtime, + elapsedMs, + }); + } + if (result && typeof result.recovered === "boolean") { + return result; + } + log.warn("Detached task recovery hook returned invalid result, proceeding with markTaskLost", { + taskId: params.taskId, + runtime: params.runtime, + result, + }); + return { recovered: false }; + } catch (err) { + log.warn("Detached task recovery hook threw, proceeding with markTaskLost", { + taskId: params.taskId, + runtime: params.runtime, + elapsedMs: Date.now() - startedAt, + error: err, + }); + return { recovered: false }; + } +} diff --git a/src/tasks/task-registry.maintenance.issue-60299.test.ts b/src/tasks/task-registry.maintenance.issue-60299.test.ts index 59af8e61658..239e7f97733 100644 --- a/src/tasks/task-registry.maintenance.issue-60299.test.ts +++ b/src/tasks/task-registry.maintenance.issue-60299.test.ts @@ -1,8 +1,14 @@ -import { afterEach, describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js"; import type { SessionEntry } from "../config/sessions.js"; import type { ParsedAgentSessionKey } from "../routing/session-key.js"; import { + resetDetachedTaskLifecycleRuntimeForTests, + setDetachedTaskLifecycleRuntime, + getDetachedTaskLifecycleRuntime, +} from "./detached-task-runtime.js"; +import { + previewTaskRegistryMaintenance, resetTaskRegistryMaintenanceRuntimeForTests, runTaskRegistryMaintenance, setTaskRegistryMaintenanceRuntimeForTests, @@ -38,6 +44,7 @@ type TaskRegistryMaintenanceRuntime = Parameters< afterEach(() => { stopTaskRegistryMaintenanceForTests(); resetTaskRegistryMaintenanceRuntimeForTests(); + resetDetachedTaskLifecycleRuntimeForTests(); }); function createTaskRegistryMaintenanceHarness(params: { @@ -197,4 +204,35 @@ describe("task-registry maintenance issue #60299", () => { expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 }); expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" }); }); + + it("skips markTaskLost and counts recovered when recovery hook recovers a stale task", async () => { + const task = makeStaleTask({ + runtime: "cron", + sourceId: "cron-job-recovered", + childSessionKey: undefined, + }); + + const { currentTasks } = createTaskRegistryMaintenanceHarness({ + tasks: [task], + }); + + const recoveryHook = vi.fn(() => ({ recovered: true })); + setDetachedTaskLifecycleRuntime({ + ...getDetachedTaskLifecycleRuntime(), + tryRecoverTaskBeforeMarkLost: recoveryHook, + }); + + expect(previewTaskRegistryMaintenance()).toMatchObject({ reconciled: 1, recovered: 0 }); + const result = await runTaskRegistryMaintenance(); + expect(result).toMatchObject({ reconciled: 0, recovered: 1 }); + expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" }); + expect(recoveryHook).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: task.taskId, + runtime: "cron", + task: expect.objectContaining({ taskId: task.taskId }), + now: expect.any(Number), + }), + ); + }); }); diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index afb47b075aa..eb1257f4ceb 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -5,6 +5,7 @@ import { getAgentRunContext } from "../infra/agent-events.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { deriveSessionChatType } from "../sessions/session-chat-type.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; +import { tryRecoverTaskBeforeMarkLost } from "./detached-task-runtime.js"; import { deleteTaskRecordById, ensureTaskRegistryReady, @@ -77,6 +78,7 @@ let taskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = export type TaskRegistryMaintenanceSummary = { reconciled: number; + recovered: number; cleanupStamped: number; pruned: number; }; @@ -250,6 +252,9 @@ export function reconcileTaskLookupToken(token: string): TaskRecord | undefined return task ? reconcileTaskRecordForOperatorInspection(task) : undefined; } +// Preview is synchronous and cannot call the async detached-task recovery hook, +// so recovered tasks are counted under reconciled here. The real sweep +// in runTaskRegistryMaintenance splits them into reconciled vs recovered. export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary { taskRegistryMaintenanceRuntime.ensureTaskRegistryReady(); const now = Date.now(); @@ -269,7 +274,7 @@ export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary cleanupStamped += 1; } } - return { reconciled, cleanupStamped, pruned }; + return { reconciled, recovered: 0, cleanupStamped, pruned }; } /** @@ -296,6 +301,7 @@ export async function runTaskRegistryMaintenance(): Promise { diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index c575b00668e..28c44ff2907 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -1328,6 +1328,7 @@ describe("task-registry", () => { expect(await runTaskRegistryMaintenance()).toEqual({ reconciled: 1, + recovered: 0, cleanupStamped: 0, pruned: 0, }); @@ -1363,6 +1364,7 @@ describe("task-registry", () => { expect(await sweepTaskRegistry()).toEqual({ reconciled: 0, + recovered: 0, cleanupStamped: 0, pruned: 1, }); @@ -1406,12 +1408,14 @@ describe("task-registry", () => { expect(previewTaskRegistryMaintenance()).toEqual({ reconciled: 0, + recovered: 0, cleanupStamped: 1, pruned: 0, }); expect(await runTaskRegistryMaintenance()).toEqual({ reconciled: 0, + recovered: 0, cleanupStamped: 1, pruned: 0, }); @@ -1530,6 +1534,7 @@ describe("task-registry", () => { expect(await runTaskRegistryMaintenance()).toEqual({ reconciled: 0, + recovered: 0, cleanupStamped: 0, pruned: 0, }); @@ -1570,6 +1575,7 @@ describe("task-registry", () => { expect(await sweepTaskRegistry()).toEqual({ reconciled: 0, + recovered: 0, cleanupStamped: 0, pruned: 0, });