mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 14:10:51 +00:00
fix(subagents): retire stale unended runs
Co-authored-by: HCL <chenglunhu@gmail.com>
This commit is contained in:
@@ -69,6 +69,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Subagents: stop stale unended runs from counting as active or pending forever, while preserving restart-aborted recovery for recoverable child sessions. Fixes #71252. Thanks @hclsys.
|
||||
- Reply media: allow sandboxed replies to deliver OpenClaw-managed `media/outbound` and `media/tool-*` attachments without treating them as sandbox escapes, while keeping alias-escape checks on the managed media root. Fixes #71138. Thanks @mayor686, @truffle-dev, and @neeravmakwana.
|
||||
- CLI/agent: keep `openclaw agent --json` stdout reserved for the JSON response by routing gateway, plugin, and embedded-fallback diagnostics to stderr before execution starts. Fixes #71319.
|
||||
- Agents/Gemini: retry reasoning-only, empty, and planning-only Gemini turns instead of letting sessions silently stall. Fixes #71074. (#71362) Thanks @neeravmakwana.
|
||||
|
||||
@@ -343,6 +343,18 @@ Sub-agents use a dedicated in-process queue lane:
|
||||
- Lane name: `subagent`
|
||||
- Concurrency: `agents.defaults.subagents.maxConcurrent` (default `8`)
|
||||
|
||||
## Liveness and recovery
|
||||
|
||||
OpenClaw does not treat `endedAt` absence as permanent proof that a sub-agent
|
||||
is still alive. Unended runs older than the stale-run window stop counting as
|
||||
active/pending in `/subagents list`, status summaries, descendant completion
|
||||
gating, and per-session concurrency checks.
|
||||
|
||||
After a gateway restart, stale unended restored runs are pruned unless their
|
||||
child session is marked `abortedLastRun: true`. Those restart-aborted child
|
||||
sessions remain recoverable through the sub-agent orphan recovery flow, which
|
||||
sends a synthetic resume message before clearing the aborted marker.
|
||||
|
||||
## Stopping
|
||||
|
||||
- Sending `/stop` in the requester chat aborts the requester session and stops any active sub-agent runs spawned from it, cascading to nested children.
|
||||
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
resetSubagentRegistryForTests,
|
||||
} from "./subagent-registry.test-helpers.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
import { STALE_UNENDED_SUBAGENT_RUN_MS } from "./subagent-run-liveness.js";
|
||||
|
||||
let testWorkspaceDir = os.tmpdir();
|
||||
|
||||
@@ -156,4 +157,76 @@ describe("buildSubagentList", () => {
|
||||
expect(list.active[0]?.line).toContain("prompt/cache 197k");
|
||||
expect(list.active[0]?.line).not.toContain("1k io");
|
||||
});
|
||||
|
||||
it("keeps stale unended runs out of active and recent list output", () => {
|
||||
const now = Date.now();
|
||||
const staleRun = {
|
||||
runId: "run-stale-list",
|
||||
childSessionKey: "agent:main:subagent:stale-list",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "stale hidden work",
|
||||
cleanup: "keep",
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
} satisfies SubagentRunRecord;
|
||||
addSubagentRunForTests(staleRun);
|
||||
const cfg = {
|
||||
commands: { text: true },
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
} as OpenClawConfig;
|
||||
|
||||
const list = buildSubagentList({
|
||||
cfg,
|
||||
runs: [staleRun],
|
||||
recentMinutes: 30,
|
||||
taskMaxChars: 110,
|
||||
});
|
||||
|
||||
expect(list.total).toBe(1);
|
||||
expect(list.active).toEqual([]);
|
||||
expect(list.recent).toEqual([]);
|
||||
expect(list.text).toContain("active subagents:\n(none)");
|
||||
});
|
||||
|
||||
it("does not let a stale unended child keep an ended parent listed active", () => {
|
||||
const now = Date.now();
|
||||
const parentRun = {
|
||||
runId: "run-parent-ended-stale-child",
|
||||
childSessionKey: "agent:main:subagent:parent-ended-stale-child",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "parent ended",
|
||||
cleanup: "keep",
|
||||
createdAt: now - 120_000,
|
||||
startedAt: now - 120_000,
|
||||
endedAt: now - 60_000,
|
||||
outcome: { status: "ok" },
|
||||
} satisfies SubagentRunRecord;
|
||||
addSubagentRunForTests(parentRun);
|
||||
addSubagentRunForTests({
|
||||
runId: "run-stale-child",
|
||||
childSessionKey: `${parentRun.childSessionKey}:subagent:stale-child`,
|
||||
requesterSessionKey: parentRun.childSessionKey,
|
||||
requesterDisplayKey: "subagent:parent-ended-stale-child",
|
||||
task: "stale child",
|
||||
cleanup: "keep",
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
});
|
||||
const cfg = {
|
||||
commands: { text: true },
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
} as OpenClawConfig;
|
||||
|
||||
const list = buildSubagentList({
|
||||
cfg,
|
||||
runs: [parentRun],
|
||||
recentMinutes: 30,
|
||||
taskMaxChars: 110,
|
||||
});
|
||||
|
||||
expect(list.active).toEqual([]);
|
||||
expect(list.recent[0]?.status).toBe("done");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -20,6 +20,7 @@ import {
|
||||
} from "./subagent-registry-read.js";
|
||||
import { getSubagentRunsSnapshotForRead } from "./subagent-registry-state.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
import { hasSubagentRunEnded, isLiveUnendedSubagentRun } from "./subagent-run-liveness.js";
|
||||
|
||||
export type SubagentListItem = {
|
||||
index: number;
|
||||
@@ -133,7 +134,7 @@ export function isActiveSubagentRun(
|
||||
entry: SubagentRunRecord,
|
||||
pendingDescendantCount: (sessionKey: string) => number,
|
||||
) {
|
||||
return !entry.endedAt || pendingDescendantCount(entry.childSessionKey) > 0;
|
||||
return isLiveUnendedSubagentRun(entry) || pendingDescendantCount(entry.childSessionKey) > 0;
|
||||
}
|
||||
|
||||
function resolveRunStatus(entry: SubagentRunRecord, options?: { pendingDescendants?: number }) {
|
||||
@@ -142,7 +143,7 @@ function resolveRunStatus(entry: SubagentRunRecord, options?: { pendingDescendan
|
||||
const childLabel = pendingDescendants === 1 ? "child" : "children";
|
||||
return `active (waiting on ${pendingDescendants} ${childLabel})`;
|
||||
}
|
||||
if (!entry.endedAt) {
|
||||
if (!hasSubagentRunEnded(entry)) {
|
||||
return "running";
|
||||
}
|
||||
const status = entry.outcome?.status ?? "done";
|
||||
|
||||
@@ -15,6 +15,7 @@ import { withSubagentOutcomeTiming } from "./subagent-announce-output.js";
|
||||
import { SUBAGENT_ENDED_REASON_ERROR } from "./subagent-lifecycle-events.js";
|
||||
import { shouldUpdateRunOutcome } from "./subagent-registry-completion.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
import { isStaleUnendedSubagentRun } from "./subagent-run-liveness.js";
|
||||
import {
|
||||
getSubagentSessionRuntimeMs,
|
||||
getSubagentSessionStartedAt,
|
||||
@@ -35,7 +36,10 @@ export const ANNOUNCE_COMPLETION_HARD_EXPIRY_MS = 30 * 60_000;
|
||||
|
||||
const FROZEN_RESULT_TEXT_MAX_BYTES = 100 * 1024;
|
||||
|
||||
export type SubagentRunOrphanReason = "missing-session-entry" | "missing-session-id";
|
||||
export type SubagentRunOrphanReason =
|
||||
| "missing-session-entry"
|
||||
| "missing-session-id"
|
||||
| "stale-unended-run";
|
||||
|
||||
export function capFrozenResultText(resultText: string): string {
|
||||
const trimmed = resultText.trim();
|
||||
@@ -140,6 +144,8 @@ export async function persistSubagentSessionTiming(entry: SubagentRunRecord) {
|
||||
export function resolveSubagentRunOrphanReason(params: {
|
||||
entry: SubagentRunRecord;
|
||||
storeCache?: Map<string, Record<string, SessionEntry>>;
|
||||
includeStaleUnended?: boolean;
|
||||
now?: number;
|
||||
}): SubagentRunOrphanReason | null {
|
||||
const childSessionKey = params.entry.childSessionKey?.trim();
|
||||
if (!childSessionKey) {
|
||||
@@ -161,6 +167,13 @@ export function resolveSubagentRunOrphanReason(params: {
|
||||
if (typeof sessionEntry.sessionId !== "string" || !sessionEntry.sessionId.trim()) {
|
||||
return "missing-session-id";
|
||||
}
|
||||
if (
|
||||
params.includeStaleUnended === true &&
|
||||
sessionEntry.abortedLastRun !== true &&
|
||||
isStaleUnendedSubagentRun(params.entry, params.now)
|
||||
) {
|
||||
return "stale-unended-run";
|
||||
}
|
||||
return null;
|
||||
} catch {
|
||||
// Best-effort guard: avoid false orphan pruning on transient read/config failures.
|
||||
@@ -266,11 +279,14 @@ export function reconcileOrphanedRestoredRuns(params: {
|
||||
resumedRuns: Set<string>;
|
||||
}) {
|
||||
const storeCache = new Map<string, Record<string, SessionEntry>>();
|
||||
const now = Date.now();
|
||||
let changed = false;
|
||||
for (const [runId, entry] of params.runs.entries()) {
|
||||
const orphanReason = resolveSubagentRunOrphanReason({
|
||||
entry,
|
||||
storeCache,
|
||||
includeStaleUnended: true,
|
||||
now,
|
||||
});
|
||||
if (!orphanReason) {
|
||||
continue;
|
||||
|
||||
@@ -3,11 +3,14 @@ import {
|
||||
countActiveRunsForSessionFromRuns,
|
||||
countPendingDescendantRunsExcludingRunFromRuns,
|
||||
countPendingDescendantRunsFromRuns,
|
||||
getSubagentRunByChildSessionKeyFromRuns,
|
||||
isSubagentSessionRunActiveFromRuns,
|
||||
listRunsForRequesterFromRuns,
|
||||
resolveRequesterForChildSessionFromRuns,
|
||||
shouldIgnorePostCompletionAnnounceForSessionFromRuns,
|
||||
} from "./subagent-registry-queries.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
import { STALE_UNENDED_SUBAGENT_RUN_MS } from "./subagent-run-liveness.js";
|
||||
|
||||
function makeRun(overrides: Partial<SubagentRunRecord>): SubagentRunRecord {
|
||||
const runId = overrides.runId ?? "run-default";
|
||||
@@ -30,6 +33,142 @@ function toRunMap(runs: SubagentRunRecord[]): Map<string, SubagentRunRecord> {
|
||||
}
|
||||
|
||||
describe("subagent registry query regressions", () => {
|
||||
it("does not treat stale unended rows as active child-session liveness", () => {
|
||||
const now = Date.now();
|
||||
const childSessionKey = "agent:main:subagent:stale-live-check";
|
||||
const runs = toRunMap([
|
||||
makeRun({
|
||||
runId: "run-stale",
|
||||
childSessionKey,
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
}),
|
||||
]);
|
||||
|
||||
expect(isSubagentSessionRunActiveFromRuns(runs, childSessionKey)).toBe(false);
|
||||
|
||||
runs.set(
|
||||
"run-fresh",
|
||||
makeRun({
|
||||
runId: "run-fresh",
|
||||
childSessionKey,
|
||||
createdAt: now - 60_000,
|
||||
startedAt: now - 60_000,
|
||||
}),
|
||||
);
|
||||
|
||||
expect(isSubagentSessionRunActiveFromRuns(runs, childSessionKey)).toBe(true);
|
||||
});
|
||||
|
||||
it("does not count stale unended direct children as active concurrency", () => {
|
||||
const now = Date.now();
|
||||
const runs = toRunMap([
|
||||
makeRun({
|
||||
runId: "run-stale-child",
|
||||
childSessionKey: "agent:main:subagent:stale-child",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
}),
|
||||
makeRun({
|
||||
runId: "run-fresh-child",
|
||||
childSessionKey: "agent:main:subagent:fresh-child",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
createdAt: now - 60_000,
|
||||
startedAt: now - 60_000,
|
||||
}),
|
||||
]);
|
||||
|
||||
expect(countActiveRunsForSessionFromRuns(runs, "agent:main:main")).toBe(1);
|
||||
});
|
||||
|
||||
it("does not count stale unended descendants as pending work", () => {
|
||||
const now = Date.now();
|
||||
const parentSessionKey = "agent:main:subagent:parent-stale-desc";
|
||||
const runs = toRunMap([
|
||||
makeRun({
|
||||
runId: "run-stale-descendant",
|
||||
childSessionKey: `${parentSessionKey}:subagent:stale`,
|
||||
requesterSessionKey: parentSessionKey,
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
}),
|
||||
makeRun({
|
||||
runId: "run-ended-cleanup-pending",
|
||||
childSessionKey: `${parentSessionKey}:subagent:cleanup`,
|
||||
requesterSessionKey: parentSessionKey,
|
||||
createdAt: now - 10_000,
|
||||
startedAt: now - 9_000,
|
||||
endedAt: now - 1_000,
|
||||
cleanupCompletedAt: undefined,
|
||||
}),
|
||||
]);
|
||||
|
||||
expect(countPendingDescendantRunsFromRuns(runs, parentSessionKey)).toBe(1);
|
||||
});
|
||||
|
||||
it("keeps a stale unended orchestrator active only when live descendants remain", () => {
|
||||
const now = Date.now();
|
||||
const parentSessionKey = "agent:main:subagent:stale-orchestrator";
|
||||
const liveChildSessionKey = `${parentSessionKey}:subagent:live-child`;
|
||||
const runs = toRunMap([
|
||||
makeRun({
|
||||
runId: "run-parent-stale",
|
||||
childSessionKey: parentSessionKey,
|
||||
requesterSessionKey: "agent:main:main",
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
}),
|
||||
makeRun({
|
||||
runId: "run-live-child",
|
||||
childSessionKey: liveChildSessionKey,
|
||||
requesterSessionKey: parentSessionKey,
|
||||
createdAt: now - 60_000,
|
||||
startedAt: now - 60_000,
|
||||
}),
|
||||
]);
|
||||
|
||||
expect(countActiveRunsForSessionFromRuns(runs, "agent:main:main")).toBe(1);
|
||||
|
||||
runs.set(
|
||||
"run-live-child",
|
||||
makeRun({
|
||||
runId: "run-live-child",
|
||||
childSessionKey: liveChildSessionKey,
|
||||
requesterSessionKey: parentSessionKey,
|
||||
createdAt: now - 60_000,
|
||||
startedAt: now - 60_000,
|
||||
endedAt: now - 1_000,
|
||||
cleanupCompletedAt: now,
|
||||
}),
|
||||
);
|
||||
|
||||
expect(countActiveRunsForSessionFromRuns(runs, "agent:main:main")).toBe(0);
|
||||
});
|
||||
|
||||
it("prefers the newest ended child row over an older stale unended row", () => {
|
||||
const now = Date.now();
|
||||
const childSessionKey = "agent:main:subagent:stale-prefer-ended";
|
||||
const runs = toRunMap([
|
||||
makeRun({
|
||||
runId: "run-stale",
|
||||
childSessionKey,
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
startedAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
}),
|
||||
makeRun({
|
||||
runId: "run-ended",
|
||||
childSessionKey,
|
||||
createdAt: now - 60_000,
|
||||
startedAt: now - 59_000,
|
||||
endedAt: now - 1_000,
|
||||
cleanupCompletedAt: now,
|
||||
}),
|
||||
]);
|
||||
|
||||
expect(getSubagentRunByChildSessionKeyFromRuns(runs, childSessionKey)?.runId).toBe("run-ended");
|
||||
});
|
||||
|
||||
it("regression descendant count gating, pending descendants block announce until cleanup completion is recorded", () => {
|
||||
// Regression guard: parent announce must defer while any descendant cleanup is still pending.
|
||||
const parentSessionKey = "agent:main:subagent:parent";
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { DeliveryContext } from "../utils/delivery-context.types.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
import { hasSubagentRunEnded, isLiveUnendedSubagentRun } from "./subagent-run-liveness.js";
|
||||
|
||||
function resolveControllerSessionKey(entry: SubagentRunRecord): string {
|
||||
return entry.controllerSessionKey?.trim() || entry.requesterSessionKey;
|
||||
@@ -91,7 +92,7 @@ export function isSubagentSessionRunActiveFromRuns(
|
||||
childSessionKey: string,
|
||||
): boolean {
|
||||
const latest = findLatestRunForChildSession(runs, childSessionKey);
|
||||
return Boolean(latest && typeof latest.endedAt !== "number");
|
||||
return Boolean(latest && isLiveUnendedSubagentRun(latest));
|
||||
}
|
||||
|
||||
export function getSubagentRunByChildSessionKeyFromRuns(
|
||||
@@ -109,7 +110,7 @@ export function getSubagentRunByChildSessionKeyFromRuns(
|
||||
if (entry.childSessionKey !== key) {
|
||||
continue;
|
||||
}
|
||||
if (typeof entry.endedAt !== "number") {
|
||||
if (isLiveUnendedSubagentRun(entry)) {
|
||||
if (!latestActive || entry.createdAt > latestActive.createdAt) {
|
||||
latestActive = entry;
|
||||
}
|
||||
@@ -186,7 +187,7 @@ export function countActiveRunsForSessionFromRuns(
|
||||
|
||||
let count = 0;
|
||||
for (const entry of latestByChildSessionKey.values()) {
|
||||
if (typeof entry.endedAt !== "number") {
|
||||
if (isLiveUnendedSubagentRun(entry)) {
|
||||
count += 1;
|
||||
continue;
|
||||
}
|
||||
@@ -252,7 +253,7 @@ export function countActiveDescendantRunsFromRuns(
|
||||
let count = 0;
|
||||
if (
|
||||
!forEachDescendantRun(runs, rootSessionKey, (_runId, entry) => {
|
||||
if (typeof entry.endedAt !== "number") {
|
||||
if (isLiveUnendedSubagentRun(entry)) {
|
||||
count += 1;
|
||||
}
|
||||
})
|
||||
@@ -271,9 +272,10 @@ function countPendingDescendantRunsInternal(
|
||||
let count = 0;
|
||||
if (
|
||||
!forEachDescendantRun(runs, rootSessionKey, (runId, entry) => {
|
||||
const runEnded = typeof entry.endedAt === "number";
|
||||
const runEnded = hasSubagentRunEnded(entry);
|
||||
const cleanupCompleted = typeof entry.cleanupCompletedAt === "number";
|
||||
if ((!runEnded || !cleanupCompleted) && runId !== excludedRunId) {
|
||||
const runPending = runEnded ? !cleanupCompleted : isLiveUnendedSubagentRun(entry);
|
||||
if (runPending && runId !== excludedRunId) {
|
||||
count += 1;
|
||||
}
|
||||
})
|
||||
|
||||
@@ -79,6 +79,7 @@ describe("subagent registry persistence resume", () => {
|
||||
sessionKey: string;
|
||||
sessionId?: string;
|
||||
updatedAt?: number;
|
||||
abortedLastRun?: boolean;
|
||||
}) => {
|
||||
if (!tempStateDir) {
|
||||
throw new Error("tempStateDir not initialized");
|
||||
@@ -89,6 +90,7 @@ describe("subagent registry persistence resume", () => {
|
||||
sessionKey: params.sessionKey,
|
||||
sessionId: params.sessionId,
|
||||
updatedAt: params.updatedAt,
|
||||
abortedLastRun: params.abortedLastRun,
|
||||
defaultSessionId: `sess-${Date.now()}`,
|
||||
});
|
||||
};
|
||||
|
||||
@@ -26,6 +26,7 @@ export async function writeSubagentSessionEntry(params: {
|
||||
sessionKey: string;
|
||||
sessionId?: string;
|
||||
updatedAt?: number;
|
||||
abortedLastRun?: boolean;
|
||||
agentId: string;
|
||||
defaultSessionId: string;
|
||||
}): Promise<string> {
|
||||
@@ -35,6 +36,9 @@ export async function writeSubagentSessionEntry(params: {
|
||||
...store[params.sessionKey],
|
||||
sessionId: params.sessionId ?? params.defaultSessionId,
|
||||
updatedAt: params.updatedAt ?? Date.now(),
|
||||
...(typeof params.abortedLastRun === "boolean"
|
||||
? { abortedLastRun: params.abortedLastRun }
|
||||
: {}),
|
||||
};
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(storePath, `${JSON.stringify(store)}\n`, "utf8");
|
||||
|
||||
@@ -59,6 +59,7 @@ describe("subagent registry persistence", () => {
|
||||
sessionKey: string;
|
||||
sessionId?: string;
|
||||
updatedAt?: number;
|
||||
abortedLastRun?: boolean;
|
||||
}) => {
|
||||
if (!tempStateDir) {
|
||||
throw new Error("tempStateDir not initialized");
|
||||
@@ -70,6 +71,7 @@ describe("subagent registry persistence", () => {
|
||||
sessionKey: params.sessionKey,
|
||||
sessionId: params.sessionId,
|
||||
updatedAt: params.updatedAt,
|
||||
abortedLastRun: params.abortedLastRun,
|
||||
defaultSessionId: `sess-${agentId}-${Date.now()}`,
|
||||
});
|
||||
};
|
||||
@@ -552,6 +554,82 @@ describe("subagent registry persistence", () => {
|
||||
expect(listSubagentRunsForRequester("agent:main:main")).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("reconciles stale unended restored runs that are not restart-recoverable", async () => {
|
||||
const now = Date.now();
|
||||
const runId = "run-stale-unended-restore";
|
||||
const childSessionKey = "agent:main:subagent:stale-unended-restore";
|
||||
const registryPath = await writePersistedRegistry({
|
||||
version: 2,
|
||||
runs: {
|
||||
[runId]: {
|
||||
runId,
|
||||
childSessionKey,
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "stale unended restored work",
|
||||
cleanup: "keep",
|
||||
createdAt: now - 3 * 60 * 60 * 1_000,
|
||||
startedAt: now - 3 * 60 * 60 * 1_000,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
restartRegistry();
|
||||
await waitForRegistryWork(async () => {
|
||||
const after = JSON.parse(await fs.readFile(registryPath, "utf8")) as {
|
||||
runs?: Record<string, unknown>;
|
||||
};
|
||||
return after.runs?.[runId] === undefined;
|
||||
});
|
||||
|
||||
expect(callGateway).not.toHaveBeenCalled();
|
||||
expect(announceSpy).not.toHaveBeenCalled();
|
||||
expect(listSubagentRunsForRequester("agent:main:main")).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("keeps stale unended restored runs with abortedLastRun for restart recovery", async () => {
|
||||
const now = Date.now();
|
||||
const runId = "run-stale-aborted-restore";
|
||||
const childSessionKey = "agent:main:subagent:stale-aborted-restore";
|
||||
await writePersistedRegistry(
|
||||
{
|
||||
version: 2,
|
||||
runs: {
|
||||
[runId]: {
|
||||
runId,
|
||||
childSessionKey,
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "stale restart-recoverable work",
|
||||
cleanup: "keep",
|
||||
createdAt: now - 3 * 60 * 60 * 1_000,
|
||||
startedAt: now - 3 * 60 * 60 * 1_000,
|
||||
},
|
||||
},
|
||||
},
|
||||
{ seedChildSessions: false },
|
||||
);
|
||||
await writeChildSessionEntry({
|
||||
sessionKey: childSessionKey,
|
||||
sessionId: "sess-stale-aborted-restore",
|
||||
updatedAt: now,
|
||||
abortedLastRun: true,
|
||||
});
|
||||
|
||||
restartRegistry();
|
||||
await waitForRegistryWork(() => vi.mocked(callGateway).mock.calls.length > 0);
|
||||
|
||||
expect(callGateway).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
method: "agent.wait",
|
||||
params: expect.objectContaining({ runId }),
|
||||
}),
|
||||
);
|
||||
expect(
|
||||
listSubagentRunsForRequester("agent:main:main").some((entry) => entry.runId === runId),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("removes attachments when pruning orphaned restored runs", async () => {
|
||||
const persisted = createPersistedEndedRun({
|
||||
runId: "run-orphan-attachments",
|
||||
|
||||
75
src/agents/subagent-run-liveness.test.ts
Normal file
75
src/agents/subagent-run-liveness.test.ts
Normal file
@@ -0,0 +1,75 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
isLiveUnendedSubagentRun,
|
||||
isStaleUnendedSubagentRun,
|
||||
STALE_UNENDED_SUBAGENT_RUN_MS,
|
||||
} from "./subagent-run-liveness.js";
|
||||
|
||||
describe("subagent run liveness", () => {
|
||||
const now = Date.parse("2026-04-25T12:00:00Z");
|
||||
|
||||
it("keeps fresh unended runs live", () => {
|
||||
const entry = {
|
||||
createdAt: now - 60_000,
|
||||
};
|
||||
expect(isLiveUnendedSubagentRun(entry, now)).toBe(true);
|
||||
expect(isStaleUnendedSubagentRun(entry, now)).toBe(false);
|
||||
});
|
||||
|
||||
it("marks old unended runs stale when no explicit timeout extends the window", () => {
|
||||
const entry = {
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
};
|
||||
expect(isStaleUnendedSubagentRun(entry, now)).toBe(true);
|
||||
expect(isLiveUnendedSubagentRun(entry, now)).toBe(false);
|
||||
});
|
||||
|
||||
it("does not mark ended runs stale", () => {
|
||||
const entry = {
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
endedAt: now - 1,
|
||||
};
|
||||
expect(isStaleUnendedSubagentRun(entry, now)).toBe(false);
|
||||
expect(isLiveUnendedSubagentRun(entry, now)).toBe(false);
|
||||
});
|
||||
|
||||
it("uses sessionStartedAt ahead of createdAt", () => {
|
||||
const entry = {
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
sessionStartedAt: now - 60_000,
|
||||
};
|
||||
expect(isStaleUnendedSubagentRun(entry, now)).toBe(false);
|
||||
expect(isLiveUnendedSubagentRun(entry, now)).toBe(true);
|
||||
});
|
||||
|
||||
it("extends stale cutoff for explicit long run timeouts", () => {
|
||||
const entry = {
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
runTimeoutSeconds: 6 * 60 * 60,
|
||||
};
|
||||
expect(isStaleUnendedSubagentRun(entry, now)).toBe(false);
|
||||
expect(isLiveUnendedSubagentRun(entry, now)).toBe(true);
|
||||
});
|
||||
|
||||
it("ignores non-real fixture timestamps as unknown instead of stale", () => {
|
||||
const entry = {
|
||||
createdAt: 100,
|
||||
};
|
||||
expect(isStaleUnendedSubagentRun(entry, now)).toBe(false);
|
||||
expect(isLiveUnendedSubagentRun(entry, now)).toBe(true);
|
||||
});
|
||||
|
||||
it("defaults to current time when now is omitted", () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(now);
|
||||
try {
|
||||
expect(
|
||||
isStaleUnendedSubagentRun({
|
||||
createdAt: now - STALE_UNENDED_SUBAGENT_RUN_MS - 1,
|
||||
}),
|
||||
).toBe(true);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
52
src/agents/subagent-run-liveness.ts
Normal file
52
src/agents/subagent-run-liveness.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
import { getSubagentSessionStartedAt } from "./subagent-session-metrics.js";
|
||||
|
||||
export const STALE_UNENDED_SUBAGENT_RUN_MS = 2 * 60 * 60 * 1_000;
|
||||
const EXPLICIT_TIMEOUT_STALE_GRACE_MS = 60_000;
|
||||
const MIN_REALISTIC_RUN_TIMESTAMP_MS = Date.UTC(2020, 0, 1);
|
||||
|
||||
export function hasSubagentRunEnded(entry: Pick<SubagentRunRecord, "endedAt">): boolean {
|
||||
return typeof entry.endedAt === "number" && Number.isFinite(entry.endedAt);
|
||||
}
|
||||
|
||||
function resolveStaleCutoffMs(entry: Pick<SubagentRunRecord, "runTimeoutSeconds">): number {
|
||||
const timeoutSeconds = entry.runTimeoutSeconds;
|
||||
if (typeof timeoutSeconds === "number" && Number.isFinite(timeoutSeconds) && timeoutSeconds > 0) {
|
||||
return Math.max(
|
||||
STALE_UNENDED_SUBAGENT_RUN_MS,
|
||||
Math.floor(timeoutSeconds) * 1_000 + EXPLICIT_TIMEOUT_STALE_GRACE_MS,
|
||||
);
|
||||
}
|
||||
return STALE_UNENDED_SUBAGENT_RUN_MS;
|
||||
}
|
||||
|
||||
export function isStaleUnendedSubagentRun(
|
||||
entry: Pick<
|
||||
SubagentRunRecord,
|
||||
"createdAt" | "startedAt" | "sessionStartedAt" | "endedAt" | "runTimeoutSeconds"
|
||||
>,
|
||||
now = Date.now(),
|
||||
): boolean {
|
||||
if (hasSubagentRunEnded(entry)) {
|
||||
return false;
|
||||
}
|
||||
const startedAt = getSubagentSessionStartedAt(entry);
|
||||
if (
|
||||
typeof startedAt !== "number" ||
|
||||
!Number.isFinite(startedAt) ||
|
||||
startedAt < MIN_REALISTIC_RUN_TIMESTAMP_MS
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
return now - startedAt > resolveStaleCutoffMs(entry);
|
||||
}
|
||||
|
||||
export function isLiveUnendedSubagentRun(
|
||||
entry: Pick<
|
||||
SubagentRunRecord,
|
||||
"createdAt" | "startedAt" | "sessionStartedAt" | "endedAt" | "runTimeoutSeconds"
|
||||
>,
|
||||
now = Date.now(),
|
||||
): boolean {
|
||||
return !hasSubagentRunEnded(entry) && !isStaleUnendedSubagentRun(entry, now);
|
||||
}
|
||||
Reference in New Issue
Block a user