diff --git a/CHANGELOG.md b/CHANGELOG.md index 3532c5f51dc..909285547e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Subagents: stop stale unended runs from counting as active or pending forever, while preserving restart-aborted recovery for recoverable child sessions. Fixes #71252. Thanks @hclsys. - Reply media: allow sandboxed replies to deliver OpenClaw-managed `media/outbound` and `media/tool-*` attachments without treating them as sandbox escapes, while keeping alias-escape checks on the managed media root. Fixes #71138. Thanks @mayor686, @truffle-dev, and @neeravmakwana. - CLI/agent: keep `openclaw agent --json` stdout reserved for the JSON response by routing gateway, plugin, and embedded-fallback diagnostics to stderr before execution starts. Fixes #71319. - Agents/Gemini: retry reasoning-only, empty, and planning-only Gemini turns instead of letting sessions silently stall. Fixes #71074. (#71362) Thanks @neeravmakwana. diff --git a/docs/tools/subagents.md b/docs/tools/subagents.md index 52b4dfa5728..236488356fc 100644 --- a/docs/tools/subagents.md +++ b/docs/tools/subagents.md @@ -343,6 +343,18 @@ Sub-agents use a dedicated in-process queue lane: - Lane name: `subagent` - Concurrency: `agents.defaults.subagents.maxConcurrent` (default `8`) +## Liveness and recovery + +OpenClaw does not treat `endedAt` absence as permanent proof that a sub-agent +is still alive. Unended runs older than the stale-run window stop counting as +active/pending in `/subagents list`, status summaries, descendant completion +gating, and per-session concurrency checks. + +After a gateway restart, stale unended restored runs are pruned unless their +child session is marked `abortedLastRun: true`. Those restart-aborted child +sessions remain recoverable through the sub-agent orphan recovery flow, which +sends a synthetic resume message before clearing the aborted marker. + ## Stopping - Sending `/stop` in the requester chat aborts the requester session and stops any active sub-agent runs spawned from it, cascading to nested children. diff --git a/extensions/acpx/src/service.test.ts b/extensions/acpx/src/service.test.ts index 7cbd5c74945..9b5f0390842 100644 --- a/extensions/acpx/src/service.test.ts +++ b/extensions/acpx/src/service.test.ts @@ -276,6 +276,32 @@ describe("createAcpxRuntimeService", () => { await service.stop?.(ctx); }); + it("formats non-string doctor details without losing object payloads", async () => { + const workspaceDir = await makeTempDir(); + const ctx = createServiceContext(workspaceDir); + const runtime = createMockRuntime({ + doctor: async () => ({ + ok: false, + message: "probe failed", + details: [{ code: "ACP_CLOSED", agent: "codex" }, new Error("stdin closed")], + }), + isHealthy: () => false, + }); + const service = createAcpxRuntimeService({ + runtimeFactory: () => runtime as never, + }); + + await service.start(ctx); + + await vi.waitFor(() => { + expect(ctx.logger.warn).toHaveBeenCalledWith( + 'embedded acpx runtime backend probe failed: probe failed ({"code":"ACP_CLOSED","agent":"codex"}; stdin closed)', + ); + }); + + await service.stop?.(ctx); + }); + it("can skip the embedded runtime backend via env", async () => { process.env.OPENCLAW_SKIP_ACPX_RUNTIME = "1"; const workspaceDir = await makeTempDir(); diff --git a/extensions/acpx/src/service.ts b/extensions/acpx/src/service.ts index c7065a174fe..df0537497fa 100644 --- a/extensions/acpx/src/service.ts +++ b/extensions/acpx/src/service.ts @@ -1,4 +1,5 @@ import fs from "node:fs/promises"; +import { inspect } from "node:util"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import type { AcpRuntime, @@ -79,8 +80,36 @@ function warnOnIgnoredLegacyCompatibilityConfig(params: { ); } -function formatDoctorFailureMessage(report: { message: string; details?: string[] }): string { - const detailText = report.details?.filter(Boolean).join("; ").trim(); +function formatDoctorDetail(detail: unknown): string | null { + if (!detail) { + return null; + } + if (typeof detail === "string") { + return detail.trim() || null; + } + if (detail instanceof Error) { + return formatErrorMessage(detail); + } + if (typeof detail === "object") { + try { + return JSON.stringify(detail) ?? inspect(detail, { breakLength: Infinity, depth: 3 }); + } catch { + return inspect(detail, { breakLength: Infinity, depth: 3 }); + } + } + if ( + typeof detail === "number" || + typeof detail === "boolean" || + typeof detail === "bigint" || + typeof detail === "symbol" + ) { + return detail.toString(); + } + return inspect(detail, { breakLength: Infinity, depth: 3 }); +} + +function formatDoctorFailureMessage(report: { message: string; details?: unknown[] }): string { + const detailText = report.details?.map(formatDoctorDetail).filter(Boolean).join("; ").trim(); return detailText ? `${report.message} (${detailText})` : report.message; } diff --git a/src/agents/subagent-list.test.ts b/src/agents/subagent-list.test.ts index fd401ef9971..6a2816540d2 100644 --- a/src/agents/subagent-list.test.ts +++ b/src/agents/subagent-list.test.ts @@ -10,6 +10,7 @@ import { resetSubagentRegistryForTests, } from "./subagent-registry.test-helpers.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; +import { STALE_UNENDED_SUBAGENT_RUN_MS } from "./subagent-run-liveness.js"; let testWorkspaceDir = os.tmpdir(); @@ -156,4 +157,76 @@ describe("buildSubagentList", () => { expect(list.active[0]?.line).toContain("prompt/cache 197k"); expect(list.active[0]?.line).not.toContain("1k io"); }); + + it("keeps stale unended runs out of active and recent list output", () => { + const now = Date.now(); + const staleRun = { + runId: "run-stale-list", + childSessionKey: "agent:main:subagent:stale-list", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "stale hidden work", + cleanup: "keep", + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + } satisfies SubagentRunRecord; + addSubagentRunForTests(staleRun); + const cfg = { + commands: { text: true }, + channels: { whatsapp: { allowFrom: ["*"] } }, + } as OpenClawConfig; + + const list = buildSubagentList({ + cfg, + runs: [staleRun], + recentMinutes: 30, + taskMaxChars: 110, + }); + + expect(list.total).toBe(1); + expect(list.active).toEqual([]); + expect(list.recent).toEqual([]); + expect(list.text).toContain("active subagents:\n(none)"); + }); + + it("does not let a stale unended child keep an ended parent listed active", () => { + const now = Date.now(); + const parentRun = { + runId: "run-parent-ended-stale-child", + childSessionKey: "agent:main:subagent:parent-ended-stale-child", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "parent ended", + cleanup: "keep", + createdAt: now - 120_000, + startedAt: now - 120_000, + endedAt: now - 60_000, + outcome: { status: "ok" }, + } satisfies SubagentRunRecord; + addSubagentRunForTests(parentRun); + addSubagentRunForTests({ + runId: "run-stale-child", + childSessionKey: `${parentRun.childSessionKey}:subagent:stale-child`, + requesterSessionKey: parentRun.childSessionKey, + requesterDisplayKey: "subagent:parent-ended-stale-child", + task: "stale child", + cleanup: "keep", + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + }); + const cfg = { + commands: { text: true }, + channels: { whatsapp: { allowFrom: ["*"] } }, + } as OpenClawConfig; + + const list = buildSubagentList({ + cfg, + runs: [parentRun], + recentMinutes: 30, + taskMaxChars: 110, + }); + + expect(list.active).toEqual([]); + expect(list.recent[0]?.status).toBe("done"); + }); }); diff --git a/src/agents/subagent-list.ts b/src/agents/subagent-list.ts index 0543b1115a2..06dae768ff9 100644 --- a/src/agents/subagent-list.ts +++ b/src/agents/subagent-list.ts @@ -20,6 +20,7 @@ import { } from "./subagent-registry-read.js"; import { getSubagentRunsSnapshotForRead } from "./subagent-registry-state.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; +import { hasSubagentRunEnded, isLiveUnendedSubagentRun } from "./subagent-run-liveness.js"; export type SubagentListItem = { index: number; @@ -133,7 +134,7 @@ export function isActiveSubagentRun( entry: SubagentRunRecord, pendingDescendantCount: (sessionKey: string) => number, ) { - return !entry.endedAt || pendingDescendantCount(entry.childSessionKey) > 0; + return isLiveUnendedSubagentRun(entry) || pendingDescendantCount(entry.childSessionKey) > 0; } function resolveRunStatus(entry: SubagentRunRecord, options?: { pendingDescendants?: number }) { @@ -142,7 +143,7 @@ function resolveRunStatus(entry: SubagentRunRecord, options?: { pendingDescendan const childLabel = pendingDescendants === 1 ? "child" : "children"; return `active (waiting on ${pendingDescendants} ${childLabel})`; } - if (!entry.endedAt) { + if (!hasSubagentRunEnded(entry)) { return "running"; } const status = entry.outcome?.status ?? "done"; diff --git a/src/agents/subagent-registry-helpers.ts b/src/agents/subagent-registry-helpers.ts index 71aa8484749..7dc643be756 100644 --- a/src/agents/subagent-registry-helpers.ts +++ b/src/agents/subagent-registry-helpers.ts @@ -15,6 +15,7 @@ import { withSubagentOutcomeTiming } from "./subagent-announce-output.js"; import { SUBAGENT_ENDED_REASON_ERROR } from "./subagent-lifecycle-events.js"; import { shouldUpdateRunOutcome } from "./subagent-registry-completion.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; +import { isStaleUnendedSubagentRun } from "./subagent-run-liveness.js"; import { getSubagentSessionRuntimeMs, getSubagentSessionStartedAt, @@ -35,7 +36,10 @@ export const ANNOUNCE_COMPLETION_HARD_EXPIRY_MS = 30 * 60_000; const FROZEN_RESULT_TEXT_MAX_BYTES = 100 * 1024; -export type SubagentRunOrphanReason = "missing-session-entry" | "missing-session-id"; +export type SubagentRunOrphanReason = + | "missing-session-entry" + | "missing-session-id" + | "stale-unended-run"; export function capFrozenResultText(resultText: string): string { const trimmed = resultText.trim(); @@ -140,6 +144,8 @@ export async function persistSubagentSessionTiming(entry: SubagentRunRecord) { export function resolveSubagentRunOrphanReason(params: { entry: SubagentRunRecord; storeCache?: Map>; + includeStaleUnended?: boolean; + now?: number; }): SubagentRunOrphanReason | null { const childSessionKey = params.entry.childSessionKey?.trim(); if (!childSessionKey) { @@ -161,6 +167,13 @@ export function resolveSubagentRunOrphanReason(params: { if (typeof sessionEntry.sessionId !== "string" || !sessionEntry.sessionId.trim()) { return "missing-session-id"; } + if ( + params.includeStaleUnended === true && + sessionEntry.abortedLastRun !== true && + isStaleUnendedSubagentRun(params.entry, params.now) + ) { + return "stale-unended-run"; + } return null; } catch { // Best-effort guard: avoid false orphan pruning on transient read/config failures. @@ -266,11 +279,14 @@ export function reconcileOrphanedRestoredRuns(params: { resumedRuns: Set; }) { const storeCache = new Map>(); + const now = Date.now(); let changed = false; for (const [runId, entry] of params.runs.entries()) { const orphanReason = resolveSubagentRunOrphanReason({ entry, storeCache, + includeStaleUnended: true, + now, }); if (!orphanReason) { continue; diff --git a/src/agents/subagent-registry-queries.test.ts b/src/agents/subagent-registry-queries.test.ts index fd7f992a746..41d465a3d82 100644 --- a/src/agents/subagent-registry-queries.test.ts +++ b/src/agents/subagent-registry-queries.test.ts @@ -3,11 +3,14 @@ import { countActiveRunsForSessionFromRuns, countPendingDescendantRunsExcludingRunFromRuns, countPendingDescendantRunsFromRuns, + getSubagentRunByChildSessionKeyFromRuns, + isSubagentSessionRunActiveFromRuns, listRunsForRequesterFromRuns, resolveRequesterForChildSessionFromRuns, shouldIgnorePostCompletionAnnounceForSessionFromRuns, } from "./subagent-registry-queries.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; +import { STALE_UNENDED_SUBAGENT_RUN_MS } from "./subagent-run-liveness.js"; function makeRun(overrides: Partial): SubagentRunRecord { const runId = overrides.runId ?? "run-default"; @@ -30,6 +33,142 @@ function toRunMap(runs: SubagentRunRecord[]): Map { } describe("subagent registry query regressions", () => { + it("does not treat stale unended rows as active child-session liveness", () => { + const now = Date.now(); + const childSessionKey = "agent:main:subagent:stale-live-check"; + const runs = toRunMap([ + makeRun({ + runId: "run-stale", + childSessionKey, + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + }), + ]); + + expect(isSubagentSessionRunActiveFromRuns(runs, childSessionKey)).toBe(false); + + runs.set( + "run-fresh", + makeRun({ + runId: "run-fresh", + childSessionKey, + createdAt: now - 60_000, + startedAt: now - 60_000, + }), + ); + + expect(isSubagentSessionRunActiveFromRuns(runs, childSessionKey)).toBe(true); + }); + + it("does not count stale unended direct children as active concurrency", () => { + const now = Date.now(); + const runs = toRunMap([ + makeRun({ + runId: "run-stale-child", + childSessionKey: "agent:main:subagent:stale-child", + requesterSessionKey: "agent:main:main", + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + }), + makeRun({ + runId: "run-fresh-child", + childSessionKey: "agent:main:subagent:fresh-child", + requesterSessionKey: "agent:main:main", + createdAt: now - 60_000, + startedAt: now - 60_000, + }), + ]); + + expect(countActiveRunsForSessionFromRuns(runs, "agent:main:main")).toBe(1); + }); + + it("does not count stale unended descendants as pending work", () => { + const now = Date.now(); + const parentSessionKey = "agent:main:subagent:parent-stale-desc"; + const runs = toRunMap([ + makeRun({ + runId: "run-stale-descendant", + childSessionKey: `${parentSessionKey}:subagent:stale`, + requesterSessionKey: parentSessionKey, + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + }), + makeRun({ + runId: "run-ended-cleanup-pending", + childSessionKey: `${parentSessionKey}:subagent:cleanup`, + requesterSessionKey: parentSessionKey, + createdAt: now - 10_000, + startedAt: now - 9_000, + endedAt: now - 1_000, + cleanupCompletedAt: undefined, + }), + ]); + + expect(countPendingDescendantRunsFromRuns(runs, parentSessionKey)).toBe(1); + }); + + it("keeps a stale unended orchestrator active only when live descendants remain", () => { + const now = Date.now(); + const parentSessionKey = "agent:main:subagent:stale-orchestrator"; + const liveChildSessionKey = `${parentSessionKey}:subagent:live-child`; + const runs = toRunMap([ + makeRun({ + runId: "run-parent-stale", + childSessionKey: parentSessionKey, + requesterSessionKey: "agent:main:main", + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + }), + makeRun({ + runId: "run-live-child", + childSessionKey: liveChildSessionKey, + requesterSessionKey: parentSessionKey, + createdAt: now - 60_000, + startedAt: now - 60_000, + }), + ]); + + expect(countActiveRunsForSessionFromRuns(runs, "agent:main:main")).toBe(1); + + runs.set( + "run-live-child", + makeRun({ + runId: "run-live-child", + childSessionKey: liveChildSessionKey, + requesterSessionKey: parentSessionKey, + createdAt: now - 60_000, + startedAt: now - 60_000, + endedAt: now - 1_000, + cleanupCompletedAt: now, + }), + ); + + expect(countActiveRunsForSessionFromRuns(runs, "agent:main:main")).toBe(0); + }); + + it("prefers the newest ended child row over an older stale unended row", () => { + const now = Date.now(); + const childSessionKey = "agent:main:subagent:stale-prefer-ended"; + const runs = toRunMap([ + makeRun({ + runId: "run-stale", + childSessionKey, + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + }), + makeRun({ + runId: "run-ended", + childSessionKey, + createdAt: now - 60_000, + startedAt: now - 59_000, + endedAt: now - 1_000, + cleanupCompletedAt: now, + }), + ]); + + expect(getSubagentRunByChildSessionKeyFromRuns(runs, childSessionKey)?.runId).toBe("run-ended"); + }); + it("regression descendant count gating, pending descendants block announce until cleanup completion is recorded", () => { // Regression guard: parent announce must defer while any descendant cleanup is still pending. const parentSessionKey = "agent:main:subagent:parent"; diff --git a/src/agents/subagent-registry-queries.ts b/src/agents/subagent-registry-queries.ts index 3e7b8ee373c..e70cf91882e 100644 --- a/src/agents/subagent-registry-queries.ts +++ b/src/agents/subagent-registry-queries.ts @@ -1,5 +1,6 @@ import type { DeliveryContext } from "../utils/delivery-context.types.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; +import { hasSubagentRunEnded, isLiveUnendedSubagentRun } from "./subagent-run-liveness.js"; function resolveControllerSessionKey(entry: SubagentRunRecord): string { return entry.controllerSessionKey?.trim() || entry.requesterSessionKey; @@ -91,7 +92,7 @@ export function isSubagentSessionRunActiveFromRuns( childSessionKey: string, ): boolean { const latest = findLatestRunForChildSession(runs, childSessionKey); - return Boolean(latest && typeof latest.endedAt !== "number"); + return Boolean(latest && isLiveUnendedSubagentRun(latest)); } export function getSubagentRunByChildSessionKeyFromRuns( @@ -109,7 +110,7 @@ export function getSubagentRunByChildSessionKeyFromRuns( if (entry.childSessionKey !== key) { continue; } - if (typeof entry.endedAt !== "number") { + if (isLiveUnendedSubagentRun(entry)) { if (!latestActive || entry.createdAt > latestActive.createdAt) { latestActive = entry; } @@ -186,7 +187,7 @@ export function countActiveRunsForSessionFromRuns( let count = 0; for (const entry of latestByChildSessionKey.values()) { - if (typeof entry.endedAt !== "number") { + if (isLiveUnendedSubagentRun(entry)) { count += 1; continue; } @@ -252,7 +253,7 @@ export function countActiveDescendantRunsFromRuns( let count = 0; if ( !forEachDescendantRun(runs, rootSessionKey, (_runId, entry) => { - if (typeof entry.endedAt !== "number") { + if (isLiveUnendedSubagentRun(entry)) { count += 1; } }) @@ -271,9 +272,10 @@ function countPendingDescendantRunsInternal( let count = 0; if ( !forEachDescendantRun(runs, rootSessionKey, (runId, entry) => { - const runEnded = typeof entry.endedAt === "number"; + const runEnded = hasSubagentRunEnded(entry); const cleanupCompleted = typeof entry.cleanupCompletedAt === "number"; - if ((!runEnded || !cleanupCompleted) && runId !== excludedRunId) { + const runPending = runEnded ? !cleanupCompleted : isLiveUnendedSubagentRun(entry); + if (runPending && runId !== excludedRunId) { count += 1; } }) diff --git a/src/agents/subagent-registry.persistence.resume.test.ts b/src/agents/subagent-registry.persistence.resume.test.ts index f5a3b89d0bd..d46e70313a2 100644 --- a/src/agents/subagent-registry.persistence.resume.test.ts +++ b/src/agents/subagent-registry.persistence.resume.test.ts @@ -79,6 +79,7 @@ describe("subagent registry persistence resume", () => { sessionKey: string; sessionId?: string; updatedAt?: number; + abortedLastRun?: boolean; }) => { if (!tempStateDir) { throw new Error("tempStateDir not initialized"); @@ -89,6 +90,7 @@ describe("subagent registry persistence resume", () => { sessionKey: params.sessionKey, sessionId: params.sessionId, updatedAt: params.updatedAt, + abortedLastRun: params.abortedLastRun, defaultSessionId: `sess-${Date.now()}`, }); }; diff --git a/src/agents/subagent-registry.persistence.test-support.ts b/src/agents/subagent-registry.persistence.test-support.ts index 035d4f4e7cc..ec66b1125e1 100644 --- a/src/agents/subagent-registry.persistence.test-support.ts +++ b/src/agents/subagent-registry.persistence.test-support.ts @@ -26,6 +26,7 @@ export async function writeSubagentSessionEntry(params: { sessionKey: string; sessionId?: string; updatedAt?: number; + abortedLastRun?: boolean; agentId: string; defaultSessionId: string; }): Promise { @@ -35,6 +36,9 @@ export async function writeSubagentSessionEntry(params: { ...store[params.sessionKey], sessionId: params.sessionId ?? params.defaultSessionId, updatedAt: params.updatedAt ?? Date.now(), + ...(typeof params.abortedLastRun === "boolean" + ? { abortedLastRun: params.abortedLastRun } + : {}), }; await fs.mkdir(path.dirname(storePath), { recursive: true }); await fs.writeFile(storePath, `${JSON.stringify(store)}\n`, "utf8"); diff --git a/src/agents/subagent-registry.persistence.test.ts b/src/agents/subagent-registry.persistence.test.ts index 68d46c00f2f..eabd2493c8e 100644 --- a/src/agents/subagent-registry.persistence.test.ts +++ b/src/agents/subagent-registry.persistence.test.ts @@ -59,6 +59,7 @@ describe("subagent registry persistence", () => { sessionKey: string; sessionId?: string; updatedAt?: number; + abortedLastRun?: boolean; }) => { if (!tempStateDir) { throw new Error("tempStateDir not initialized"); @@ -70,6 +71,7 @@ describe("subagent registry persistence", () => { sessionKey: params.sessionKey, sessionId: params.sessionId, updatedAt: params.updatedAt, + abortedLastRun: params.abortedLastRun, defaultSessionId: `sess-${agentId}-${Date.now()}`, }); }; @@ -552,6 +554,82 @@ describe("subagent registry persistence", () => { expect(listSubagentRunsForRequester("agent:main:main")).toHaveLength(0); }); + it("reconciles stale unended restored runs that are not restart-recoverable", async () => { + const now = Date.now(); + const runId = "run-stale-unended-restore"; + const childSessionKey = "agent:main:subagent:stale-unended-restore"; + const registryPath = await writePersistedRegistry({ + version: 2, + runs: { + [runId]: { + runId, + childSessionKey, + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "stale unended restored work", + cleanup: "keep", + createdAt: now - 3 * 60 * 60 * 1_000, + startedAt: now - 3 * 60 * 60 * 1_000, + }, + }, + }); + + restartRegistry(); + await waitForRegistryWork(async () => { + const after = JSON.parse(await fs.readFile(registryPath, "utf8")) as { + runs?: Record; + }; + return after.runs?.[runId] === undefined; + }); + + expect(callGateway).not.toHaveBeenCalled(); + expect(announceSpy).not.toHaveBeenCalled(); + expect(listSubagentRunsForRequester("agent:main:main")).toHaveLength(0); + }); + + it("keeps stale unended restored runs with abortedLastRun for restart recovery", async () => { + const now = Date.now(); + const runId = "run-stale-aborted-restore"; + const childSessionKey = "agent:main:subagent:stale-aborted-restore"; + await writePersistedRegistry( + { + version: 2, + runs: { + [runId]: { + runId, + childSessionKey, + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "stale restart-recoverable work", + cleanup: "keep", + createdAt: now - 3 * 60 * 60 * 1_000, + startedAt: now - 3 * 60 * 60 * 1_000, + }, + }, + }, + { seedChildSessions: false }, + ); + await writeChildSessionEntry({ + sessionKey: childSessionKey, + sessionId: "sess-stale-aborted-restore", + updatedAt: now, + abortedLastRun: true, + }); + + restartRegistry(); + await waitForRegistryWork(() => vi.mocked(callGateway).mock.calls.length > 0); + + expect(callGateway).toHaveBeenCalledWith( + expect.objectContaining({ + method: "agent.wait", + params: expect.objectContaining({ runId }), + }), + ); + expect( + listSubagentRunsForRequester("agent:main:main").some((entry) => entry.runId === runId), + ).toBe(true); + }); + it("removes attachments when pruning orphaned restored runs", async () => { const persisted = createPersistedEndedRun({ runId: "run-orphan-attachments", diff --git a/src/agents/subagent-run-liveness.test.ts b/src/agents/subagent-run-liveness.test.ts new file mode 100644 index 00000000000..78376d14584 --- /dev/null +++ b/src/agents/subagent-run-liveness.test.ts @@ -0,0 +1,75 @@ +import { describe, expect, it, vi } from "vitest"; +import { + isLiveUnendedSubagentRun, + isStaleUnendedSubagentRun, + STALE_UNENDED_SUBAGENT_RUN_MS, +} from "./subagent-run-liveness.js"; + +describe("subagent run liveness", () => { + const now = Date.parse("2026-04-25T12:00:00Z"); + + it("keeps fresh unended runs live", () => { + const entry = { + createdAt: now - 60_000, + }; + expect(isLiveUnendedSubagentRun(entry, now)).toBe(true); + expect(isStaleUnendedSubagentRun(entry, now)).toBe(false); + }); + + it("marks old unended runs stale when no explicit timeout extends the window", () => { + const entry = { + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + }; + expect(isStaleUnendedSubagentRun(entry, now)).toBe(true); + expect(isLiveUnendedSubagentRun(entry, now)).toBe(false); + }); + + it("does not mark ended runs stale", () => { + const entry = { + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + endedAt: now - 1, + }; + expect(isStaleUnendedSubagentRun(entry, now)).toBe(false); + expect(isLiveUnendedSubagentRun(entry, now)).toBe(false); + }); + + it("uses sessionStartedAt ahead of createdAt", () => { + const entry = { + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + sessionStartedAt: now - 60_000, + }; + expect(isStaleUnendedSubagentRun(entry, now)).toBe(false); + expect(isLiveUnendedSubagentRun(entry, now)).toBe(true); + }); + + it("extends stale cutoff for explicit long run timeouts", () => { + const entry = { + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + runTimeoutSeconds: 6 * 60 * 60, + }; + expect(isStaleUnendedSubagentRun(entry, now)).toBe(false); + expect(isLiveUnendedSubagentRun(entry, now)).toBe(true); + }); + + it("ignores non-real fixture timestamps as unknown instead of stale", () => { + const entry = { + createdAt: 100, + }; + expect(isStaleUnendedSubagentRun(entry, now)).toBe(false); + expect(isLiveUnendedSubagentRun(entry, now)).toBe(true); + }); + + it("defaults to current time when now is omitted", () => { + vi.useFakeTimers(); + vi.setSystemTime(now); + try { + expect( + isStaleUnendedSubagentRun({ + createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1, + }), + ).toBe(true); + } finally { + vi.useRealTimers(); + } + }); +}); diff --git a/src/agents/subagent-run-liveness.ts b/src/agents/subagent-run-liveness.ts new file mode 100644 index 00000000000..70a1de5fb41 --- /dev/null +++ b/src/agents/subagent-run-liveness.ts @@ -0,0 +1,52 @@ +import type { SubagentRunRecord } from "./subagent-registry.types.js"; +import { getSubagentSessionStartedAt } from "./subagent-session-metrics.js"; + +export const STALE_UNENDED_SUBAGENT_RUN_MS = 2 * 60 * 60 * 1_000; +const EXPLICIT_TIMEOUT_STALE_GRACE_MS = 60_000; +const MIN_REALISTIC_RUN_TIMESTAMP_MS = Date.UTC(2020, 0, 1); + +export function hasSubagentRunEnded(entry: Pick): boolean { + return typeof entry.endedAt === "number" && Number.isFinite(entry.endedAt); +} + +function resolveStaleCutoffMs(entry: Pick): number { + const timeoutSeconds = entry.runTimeoutSeconds; + if (typeof timeoutSeconds === "number" && Number.isFinite(timeoutSeconds) && timeoutSeconds > 0) { + return Math.max( + STALE_UNENDED_SUBAGENT_RUN_MS, + Math.floor(timeoutSeconds) * 1_000 + EXPLICIT_TIMEOUT_STALE_GRACE_MS, + ); + } + return STALE_UNENDED_SUBAGENT_RUN_MS; +} + +export function isStaleUnendedSubagentRun( + entry: Pick< + SubagentRunRecord, + "createdAt" | "startedAt" | "sessionStartedAt" | "endedAt" | "runTimeoutSeconds" + >, + now = Date.now(), +): boolean { + if (hasSubagentRunEnded(entry)) { + return false; + } + const startedAt = getSubagentSessionStartedAt(entry); + if ( + typeof startedAt !== "number" || + !Number.isFinite(startedAt) || + startedAt < MIN_REALISTIC_RUN_TIMESTAMP_MS + ) { + return false; + } + return now - startedAt > resolveStaleCutoffMs(entry); +} + +export function isLiveUnendedSubagentRun( + entry: Pick< + SubagentRunRecord, + "createdAt" | "startedAt" | "sessionStartedAt" | "endedAt" | "runTimeoutSeconds" + >, + now = Date.now(), +): boolean { + return !hasSubagentRunEnded(entry) && !isStaleUnendedSubagentRun(entry, now); +}