mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 12:10:42 +00:00
fix(agents): bound subagent orphan recovery
This commit is contained in:
@@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Agents/subagents: bound automatic orphan recovery with persisted recovery attempts and a wedged-session tombstone, and teach task maintenance/doctor to reconcile those sessions so restart loops no longer require manual `sessions.json` surgery. Fixes #74864. Thanks @solosage1.
|
||||
- CLI/progress: suppress nested progress spinners and line clears while TUI input owns raw stdin, so Crestodian `/status` no longer disturbs the active input row. (#75003) Thanks @velvet-shark.
|
||||
- Telegram: use durable message edits for streaming previews instead of native draft state, so generated replies no longer flicker through draft-to-message transitions that look like duplicates. (#75073) Thanks @obviyus.
|
||||
|
||||
|
||||
@@ -247,6 +247,7 @@ openclaw tasks notify <lookup> state_changes
|
||||
Reconciliation is runtime-aware:
|
||||
|
||||
- ACP/subagent tasks check their backing child session.
|
||||
- Subagent tasks whose child session has a restart-recovery tombstone are marked lost instead of being treated as recoverable backing sessions.
|
||||
- Cron tasks check whether the cron runtime still owns the job, then recover terminal status from persisted cron run logs/job state before falling back to `lost`. Only the Gateway process is authoritative for the in-memory cron active-job set; offline CLI audit uses durable history but does not mark a cron task lost solely because that local Set is empty.
|
||||
- Chat-backed CLI tasks check the owning live run context, not just the chat session row.
|
||||
|
||||
|
||||
@@ -93,6 +93,7 @@ cat ~/.openclaw/openclaw.json
|
||||
<Accordion title="State and integrity">
|
||||
- Session lock file inspection and stale lock cleanup.
|
||||
- Session transcript repair for duplicated prompt-rewrite branches created by affected 2026.4.24 builds.
|
||||
- Wedged subagent restart-recovery tombstone detection, with `--fix` support for clearing stale aborted recovery flags so startup does not keep treating the child as restart-aborted.
|
||||
- State integrity and permissions checks (sessions, transcripts, state dir).
|
||||
- Config file permission checks (chmod 600) when running locally.
|
||||
- Model auth health: checks OAuth expiry, can refresh expiring tokens, and reports auth-profile cooldown/disabled states.
|
||||
|
||||
@@ -512,6 +512,14 @@ restart-aborted child sessions remain recoverable through the sub-agent
|
||||
orphan recovery flow, which sends a synthetic resume message before
|
||||
clearing the aborted marker.
|
||||
|
||||
Automatic restart recovery is bounded per child session. If the same
|
||||
sub-agent child is accepted for orphan recovery repeatedly inside the
|
||||
rapid re-wedge window, OpenClaw persists a recovery tombstone on that
|
||||
session and stops auto-resuming it on later restarts. Run
|
||||
`openclaw tasks maintenance --apply` to reconcile the task record, or
|
||||
`openclaw doctor --fix` to clear stale aborted recovery flags on
|
||||
tombstoned sessions.
|
||||
|
||||
<Note>
|
||||
If a sub-agent spawn fails with Gateway `PAIRING_REQUIRED` /
|
||||
`scope-upgrade`, check the RPC caller before editing pairing state.
|
||||
|
||||
@@ -342,6 +342,110 @@ describe("subagent-orphan-recovery", () => {
|
||||
expect(mockStore["agent:main:subagent:test-session-1"]?.abortedLastRun).toBe(false);
|
||||
});
|
||||
|
||||
it("persists accepted recovery attempts after successful resume", async () => {
|
||||
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "resumed-run" } as never);
|
||||
mockSingleAbortedSession();
|
||||
|
||||
await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => createActiveRuns(createTestRunRecord()),
|
||||
});
|
||||
|
||||
const [, updater] = vi.mocked(sessions.updateSessionStore).mock.calls[0];
|
||||
const mockStore: ReturnType<typeof sessions.loadSessionStore> = {
|
||||
"agent:main:subagent:test-session-1": {
|
||||
sessionId: "session-abc",
|
||||
updatedAt: 0,
|
||||
abortedLastRun: true,
|
||||
},
|
||||
};
|
||||
await updater(mockStore);
|
||||
expect(mockStore["agent:main:subagent:test-session-1"]).toMatchObject({
|
||||
abortedLastRun: false,
|
||||
subagentRecovery: {
|
||||
automaticAttempts: 1,
|
||||
lastRunId: "run-1",
|
||||
lastAttemptAt: expect.any(Number),
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("tombstones rapid repeated accepted recovery before resuming again", async () => {
|
||||
const now = Date.now();
|
||||
mockSingleAbortedSession({
|
||||
subagentRecovery: {
|
||||
automaticAttempts: 2,
|
||||
lastAttemptAt: now - 30_000,
|
||||
lastRunId: "previous-run",
|
||||
},
|
||||
});
|
||||
|
||||
const result = await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => createActiveRuns(createTestRunRecord()),
|
||||
});
|
||||
|
||||
expect(result).toMatchObject({
|
||||
recovered: 0,
|
||||
failed: 0,
|
||||
skipped: 1,
|
||||
failedRuns: [
|
||||
expect.objectContaining({
|
||||
runId: "run-1",
|
||||
childSessionKey: "agent:main:subagent:test-session-1",
|
||||
error: expect.stringContaining("recovery blocked after 2 rapid accepted resume attempts"),
|
||||
}),
|
||||
],
|
||||
});
|
||||
expect(gateway.callGateway).not.toHaveBeenCalled();
|
||||
expect(sessions.updateSessionStore).toHaveBeenCalledOnce();
|
||||
|
||||
const [, updater] = vi.mocked(sessions.updateSessionStore).mock.calls[0];
|
||||
const mockStore: ReturnType<typeof sessions.loadSessionStore> = {
|
||||
"agent:main:subagent:test-session-1": {
|
||||
sessionId: "session-abc",
|
||||
updatedAt: 0,
|
||||
abortedLastRun: true,
|
||||
subagentRecovery: {
|
||||
automaticAttempts: 2,
|
||||
lastAttemptAt: now - 30_000,
|
||||
lastRunId: "previous-run",
|
||||
},
|
||||
},
|
||||
};
|
||||
await updater(mockStore);
|
||||
expect(mockStore["agent:main:subagent:test-session-1"]).toMatchObject({
|
||||
abortedLastRun: false,
|
||||
subagentRecovery: {
|
||||
automaticAttempts: 2,
|
||||
lastRunId: "run-1",
|
||||
wedgedAt: expect.any(Number),
|
||||
wedgedReason: expect.stringContaining("recovery blocked"),
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("skips already tombstoned wedged sessions without rewriting them", async () => {
|
||||
mockSingleAbortedSession({
|
||||
subagentRecovery: {
|
||||
automaticAttempts: 2,
|
||||
lastAttemptAt: Date.now() - 20_000,
|
||||
lastRunId: "previous-run",
|
||||
wedgedAt: Date.now() - 10_000,
|
||||
wedgedReason: "subagent orphan recovery blocked after 2 rapid accepted resume attempts",
|
||||
},
|
||||
});
|
||||
|
||||
const result = await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => createActiveRuns(createTestRunRecord()),
|
||||
});
|
||||
|
||||
expect(result.recovered).toBe(0);
|
||||
expect(result.failed).toBe(0);
|
||||
expect(result.skipped).toBe(1);
|
||||
expect(result.failedRuns).toHaveLength(1);
|
||||
expect(gateway.callGateway).not.toHaveBeenCalled();
|
||||
expect(sessions.updateSessionStore).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("truncates long task descriptions in resume message", async () => {
|
||||
mockSingleAbortedSession();
|
||||
|
||||
|
||||
@@ -29,6 +29,11 @@ import {
|
||||
loadRequesterSessionEntry,
|
||||
} from "./subagent-announce-delivery.js";
|
||||
import { resolveAnnounceOrigin } from "./subagent-announce-origin.js";
|
||||
import {
|
||||
evaluateSubagentRecoveryGate,
|
||||
markSubagentRecoveryAttempt,
|
||||
markSubagentRecoveryWedged,
|
||||
} from "./subagent-recovery-state.js";
|
||||
import {
|
||||
finalizeInterruptedSubagentRun,
|
||||
replaceSubagentRunAfterSteer,
|
||||
@@ -266,6 +271,7 @@ export async function recoverOrphanedSubagentSessions(params: {
|
||||
if (!childSessionKey) {
|
||||
continue;
|
||||
}
|
||||
const now = Date.now();
|
||||
if (resumedSessionKeys.has(childSessionKey)) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
@@ -304,6 +310,44 @@ export async function recoverOrphanedSubagentSessions(params: {
|
||||
continue;
|
||||
}
|
||||
|
||||
const recoveryGate = evaluateSubagentRecoveryGate(entry, now);
|
||||
if (!recoveryGate.allowed) {
|
||||
if (recoveryGate.shouldMarkWedged) {
|
||||
try {
|
||||
await updateSessionStore(storePath, (currentStore) => {
|
||||
const current = currentStore[childSessionKey];
|
||||
if (current) {
|
||||
markSubagentRecoveryWedged({
|
||||
entry: current,
|
||||
now,
|
||||
runId,
|
||||
reason: recoveryGate.reason,
|
||||
});
|
||||
currentStore[childSessionKey] = current;
|
||||
}
|
||||
});
|
||||
markSubagentRecoveryWedged({
|
||||
entry,
|
||||
now,
|
||||
runId,
|
||||
reason: recoveryGate.reason,
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn(
|
||||
`failed to persist wedged subagent recovery marker for ${childSessionKey}: ${String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
log.warn(`skipping orphan recovery for ${childSessionKey}: ${recoveryGate.reason}`);
|
||||
result.skipped++;
|
||||
result.failedRuns.push({
|
||||
runId,
|
||||
childSessionKey,
|
||||
error: recoveryGate.reason,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
log.info(`found orphaned subagent session: ${childSessionKey} (run=${runId})`);
|
||||
|
||||
const messages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile);
|
||||
@@ -352,6 +396,12 @@ export async function recoverOrphanedSubagentSessions(params: {
|
||||
const current = currentStore[childSessionKey];
|
||||
if (current) {
|
||||
current.abortedLastRun = false;
|
||||
markSubagentRecoveryAttempt({
|
||||
entry: current,
|
||||
now: Date.now(),
|
||||
runId,
|
||||
attempt: recoveryGate.nextAttempt,
|
||||
});
|
||||
current.updatedAt = Date.now();
|
||||
currentStore[childSessionKey] = current;
|
||||
}
|
||||
|
||||
117
src/agents/subagent-recovery-state.ts
Normal file
117
src/agents/subagent-recovery-state.ts
Normal file
@@ -0,0 +1,117 @@
|
||||
import type { SessionEntry } from "../config/sessions.js";
|
||||
|
||||
export const SUBAGENT_RECOVERY_MAX_AUTOMATIC_ATTEMPTS = 2;
|
||||
export const SUBAGENT_RECOVERY_REWEDGE_WINDOW_MS = 2 * 60_000;
|
||||
|
||||
export type SubagentRecoveryGate =
|
||||
| {
|
||||
allowed: true;
|
||||
nextAttempt: number;
|
||||
}
|
||||
| {
|
||||
allowed: false;
|
||||
reason: string;
|
||||
shouldMarkWedged: boolean;
|
||||
};
|
||||
|
||||
function isRecentRecoveryAttempt(entry: SessionEntry, now: number): boolean {
|
||||
const lastAttemptAt = entry.subagentRecovery?.lastAttemptAt;
|
||||
return (
|
||||
typeof lastAttemptAt === "number" &&
|
||||
Number.isFinite(lastAttemptAt) &&
|
||||
now - lastAttemptAt <= SUBAGENT_RECOVERY_REWEDGE_WINDOW_MS
|
||||
);
|
||||
}
|
||||
|
||||
export function isSubagentRecoveryWedgedEntry(entry: unknown): boolean {
|
||||
if (!entry || typeof entry !== "object") {
|
||||
return false;
|
||||
}
|
||||
const recovery = (entry as SessionEntry).subagentRecovery;
|
||||
return (
|
||||
typeof recovery?.wedgedAt === "number" &&
|
||||
Number.isFinite(recovery.wedgedAt) &&
|
||||
recovery.wedgedAt > 0
|
||||
);
|
||||
}
|
||||
|
||||
export function formatSubagentRecoveryWedgedReason(entry: SessionEntry): string {
|
||||
return (
|
||||
entry.subagentRecovery?.wedgedReason?.trim() ||
|
||||
"subagent orphan recovery is tombstoned for this session"
|
||||
);
|
||||
}
|
||||
|
||||
export function evaluateSubagentRecoveryGate(
|
||||
entry: SessionEntry,
|
||||
now: number,
|
||||
): SubagentRecoveryGate {
|
||||
if (isSubagentRecoveryWedgedEntry(entry)) {
|
||||
return {
|
||||
allowed: false,
|
||||
reason: formatSubagentRecoveryWedgedReason(entry),
|
||||
shouldMarkWedged: false,
|
||||
};
|
||||
}
|
||||
|
||||
const previousAttempts = isRecentRecoveryAttempt(entry, now)
|
||||
? Math.max(0, entry.subagentRecovery?.automaticAttempts ?? 0)
|
||||
: 0;
|
||||
if (previousAttempts >= SUBAGENT_RECOVERY_MAX_AUTOMATIC_ATTEMPTS) {
|
||||
return {
|
||||
allowed: false,
|
||||
reason:
|
||||
`subagent orphan recovery blocked after ${previousAttempts} rapid accepted resume attempts; ` +
|
||||
`run "openclaw tasks maintenance --apply" or "openclaw doctor --fix" to reconcile it`,
|
||||
shouldMarkWedged: true,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
allowed: true,
|
||||
nextAttempt: previousAttempts + 1,
|
||||
};
|
||||
}
|
||||
|
||||
export function markSubagentRecoveryAttempt(params: {
|
||||
entry: SessionEntry;
|
||||
now: number;
|
||||
runId: string;
|
||||
attempt: number;
|
||||
}): void {
|
||||
params.entry.subagentRecovery = {
|
||||
automaticAttempts: Math.max(1, params.attempt),
|
||||
lastAttemptAt: params.now,
|
||||
lastRunId: params.runId,
|
||||
};
|
||||
}
|
||||
|
||||
export function markSubagentRecoveryWedged(params: {
|
||||
entry: SessionEntry;
|
||||
now: number;
|
||||
runId?: string;
|
||||
reason: string;
|
||||
}): void {
|
||||
params.entry.abortedLastRun = false;
|
||||
params.entry.subagentRecovery = {
|
||||
...params.entry.subagentRecovery,
|
||||
automaticAttempts: Math.max(
|
||||
params.entry.subagentRecovery?.automaticAttempts ?? 0,
|
||||
SUBAGENT_RECOVERY_MAX_AUTOMATIC_ATTEMPTS,
|
||||
),
|
||||
lastAttemptAt: params.entry.subagentRecovery?.lastAttemptAt ?? params.now,
|
||||
...(params.runId ? { lastRunId: params.runId } : {}),
|
||||
wedgedAt: params.now,
|
||||
wedgedReason: params.reason,
|
||||
};
|
||||
params.entry.updatedAt = params.now;
|
||||
}
|
||||
|
||||
export function clearWedgedSubagentRecoveryAbort(entry: SessionEntry, now: number): boolean {
|
||||
if (!isSubagentRecoveryWedgedEntry(entry) || entry.abortedLastRun !== true) {
|
||||
return false;
|
||||
}
|
||||
entry.abortedLastRun = false;
|
||||
entry.updatedAt = now;
|
||||
return true;
|
||||
}
|
||||
@@ -62,6 +62,13 @@ function stateIntegrityText(): string {
|
||||
.join("\n");
|
||||
}
|
||||
|
||||
function doctorChangesText(): string {
|
||||
return noteMock.mock.calls
|
||||
.filter((call) => call[1] === "Doctor changes")
|
||||
.map((call) => String(call[0]))
|
||||
.join("\n");
|
||||
}
|
||||
|
||||
function createAgentDir(agentId: string, includeNestedAgentDir = true) {
|
||||
const stateDir = process.env.OPENCLAW_STATE_DIR;
|
||||
if (!stateDir) {
|
||||
@@ -86,7 +93,7 @@ async function runStateIntegrity(cfg: OpenClawConfig) {
|
||||
|
||||
function writeSessionStore(
|
||||
cfg: OpenClawConfig,
|
||||
sessions: Record<string, { sessionId: string; updatedAt: number }>,
|
||||
sessions: Record<string, { sessionId: string; updatedAt: number } & Record<string, unknown>>,
|
||||
) {
|
||||
setupSessionState(cfg, process.env, process.env.HOME ?? "");
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId: "main" });
|
||||
@@ -219,6 +226,70 @@ describe("doctor state integrity oauth dir checks", () => {
|
||||
expect(text).not.toContain("Examples:");
|
||||
});
|
||||
|
||||
it("warns about tombstoned subagent restart recovery sessions", async () => {
|
||||
const cfg: OpenClawConfig = {};
|
||||
writeSessionStore(cfg, {
|
||||
"agent:main:subagent:wedged-child": {
|
||||
sessionId: "session-wedged-child",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: true,
|
||||
subagentRecovery: {
|
||||
automaticAttempts: 2,
|
||||
lastAttemptAt: Date.now() - 30_000,
|
||||
lastRunId: "run-wedged-child",
|
||||
wedgedAt: Date.now() - 20_000,
|
||||
wedgedReason: "subagent orphan recovery blocked after 2 rapid accepted resume attempts",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const confirmRuntimeRepair = vi.fn(async () => false);
|
||||
await noteStateIntegrity(cfg, { confirmRuntimeRepair, note: noteMock });
|
||||
|
||||
const text = stateIntegrityText();
|
||||
expect(text).toContain("automatic restart recovery tombstoned");
|
||||
expect(text).toContain("agent:main:subagent:wedged-child");
|
||||
expect(text).toContain("openclaw tasks maintenance --apply");
|
||||
expect(confirmRuntimeRepair).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
message: expect.stringContaining("Clear stale aborted recovery flags"),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("clears stale aborted recovery flags for tombstoned subagent sessions when approved", async () => {
|
||||
const cfg: OpenClawConfig = {};
|
||||
const sessionKey = "agent:main:subagent:wedged-child";
|
||||
writeSessionStore(cfg, {
|
||||
[sessionKey]: {
|
||||
sessionId: "session-wedged-child",
|
||||
updatedAt: 0,
|
||||
abortedLastRun: true,
|
||||
subagentRecovery: {
|
||||
automaticAttempts: 2,
|
||||
lastAttemptAt: Date.now() - 30_000,
|
||||
lastRunId: "run-wedged-child",
|
||||
wedgedAt: Date.now() - 20_000,
|
||||
wedgedReason: "subagent orphan recovery blocked after 2 rapid accepted resume attempts",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const confirmRuntimeRepair = vi.fn(async (params: { message: string }) =>
|
||||
params.message.includes("Clear stale aborted recovery flags"),
|
||||
);
|
||||
await noteStateIntegrity(cfg, { confirmRuntimeRepair, note: noteMock });
|
||||
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId: "main" });
|
||||
const persisted = JSON.parse(fs.readFileSync(storePath, "utf8")) as Record<
|
||||
string,
|
||||
{ abortedLastRun?: boolean; updatedAt?: number }
|
||||
>;
|
||||
expect(persisted[sessionKey]?.abortedLastRun).toBe(false);
|
||||
expect(persisted[sessionKey]?.updatedAt).toBeGreaterThan(0);
|
||||
expect(doctorChangesText()).toContain("Cleared aborted restart-recovery flags");
|
||||
});
|
||||
|
||||
it("warns when a case-mismatched agent dir does not resolve to the configured agent path", async () => {
|
||||
createAgentDir("Research");
|
||||
|
||||
|
||||
@@ -2,6 +2,11 @@ import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { listAgentEntries, resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import {
|
||||
clearWedgedSubagentRecoveryAbort,
|
||||
formatSubagentRecoveryWedgedReason,
|
||||
isSubagentRecoveryWedgedEntry,
|
||||
} from "../agents/subagent-recovery-state.js";
|
||||
import { formatCliCommand } from "../cli/command-format.js";
|
||||
import { resolveOAuthDir, resolveStateDir } from "../config/paths.js";
|
||||
import {
|
||||
@@ -16,6 +21,8 @@ import {
|
||||
resolveStorePath,
|
||||
} from "../config/sessions/paths.js";
|
||||
import { loadSessionStore } from "../config/sessions/store-load.js";
|
||||
import { updateSessionStore } from "../config/sessions/store.js";
|
||||
import type { SessionEntry } from "../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { resolveRequiredHomeDir } from "../infra/home-dir.js";
|
||||
import { resolveMemoryBackendConfig } from "../memory-host-sdk/engine-storage.js";
|
||||
@@ -863,6 +870,57 @@ export async function noteStateIntegrity(
|
||||
);
|
||||
}
|
||||
|
||||
const wedgedSubagentSessions = entries.filter(([, entry]) =>
|
||||
isSubagentRecoveryWedgedEntry(entry),
|
||||
) as Array<[string, SessionEntry]>;
|
||||
if (wedgedSubagentSessions.length > 0) {
|
||||
const wedgedCount = countLabel(wedgedSubagentSessions.length, "wedged subagent session");
|
||||
warnings.push(
|
||||
[
|
||||
`- Found ${wedgedCount} with automatic restart recovery tombstoned.`,
|
||||
" OpenClaw will not auto-resume these child sessions on restart; reconcile their task records instead.",
|
||||
` Examples: ${wedgedSubagentSessions
|
||||
.slice(0, 3)
|
||||
.map(([key]) => key)
|
||||
.join(", ")}`,
|
||||
` Fix: ${formatCliCommand("openclaw tasks maintenance --apply")}`,
|
||||
].join("\n"),
|
||||
);
|
||||
const repairWedged = await prompter.confirmRuntimeRepair({
|
||||
message: `Clear stale aborted recovery flags for ${wedgedCount}?`,
|
||||
initialValue: true,
|
||||
});
|
||||
if (repairWedged) {
|
||||
let repaired = 0;
|
||||
const repairedAt = Date.now();
|
||||
await updateSessionStore(absoluteStorePath, (currentStore) => {
|
||||
for (const [key] of wedgedSubagentSessions) {
|
||||
const current = currentStore[key];
|
||||
if (current && clearWedgedSubagentRecoveryAbort(current, repairedAt)) {
|
||||
repaired += 1;
|
||||
currentStore[key] = current;
|
||||
}
|
||||
}
|
||||
});
|
||||
if (repaired > 0) {
|
||||
changes.push(
|
||||
`- Cleared aborted restart-recovery flags for ${countLabel(
|
||||
repaired,
|
||||
"wedged subagent session",
|
||||
)}.`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const wedgedReasons = wedgedSubagentSessions
|
||||
.map(([, entry]) => formatSubagentRecoveryWedgedReason(entry))
|
||||
.filter((reason, index, all) => all.indexOf(reason) === index)
|
||||
.slice(0, 2);
|
||||
if (wedgedReasons.length > 0) {
|
||||
warnings.push(wedgedReasons.map((reason) => ` Reason: ${reason}`).join("\n"));
|
||||
}
|
||||
}
|
||||
|
||||
const mainKey = resolveMainSessionKey(cfg);
|
||||
const mainEntry = store[mainKey];
|
||||
if (mainEntry?.sessionId) {
|
||||
|
||||
@@ -132,6 +132,19 @@ export type SessionPluginNextTurnInjection = {
|
||||
metadata?: SessionPluginJsonValue;
|
||||
};
|
||||
|
||||
export type SubagentRecoveryState = {
|
||||
/** Consecutive accepted automatic orphan-recovery resumes in the rapid re-wedge window. */
|
||||
automaticAttempts?: number;
|
||||
/** Timestamp (ms) of the latest accepted automatic orphan-recovery resume. */
|
||||
lastAttemptAt?: number;
|
||||
/** Registry run id that triggered the latest automatic orphan-recovery resume. */
|
||||
lastRunId?: string;
|
||||
/** Timestamp (ms) when automatic recovery was tombstoned for this session. */
|
||||
wedgedAt?: number;
|
||||
/** Human-readable reason automatic recovery was tombstoned. */
|
||||
wedgedReason?: string;
|
||||
};
|
||||
|
||||
export type SessionEntry = {
|
||||
/**
|
||||
* Last delivered heartbeat payload (used to suppress duplicate heartbeat notifications).
|
||||
@@ -173,6 +186,8 @@ export type SessionEntry = {
|
||||
pluginOwnerId?: string;
|
||||
systemSent?: boolean;
|
||||
abortedLastRun?: boolean;
|
||||
/** Durable guard state for automatic subagent orphan recovery. */
|
||||
subagentRecovery?: SubagentRecoveryState;
|
||||
/** Timestamp (ms) when the current sessionId first became active. */
|
||||
sessionStartedAt?: number;
|
||||
/** Timestamp (ms) of the last user/channel interaction that should extend idle lifetime. */
|
||||
|
||||
@@ -195,6 +195,40 @@ describe("task-registry maintenance issue #60299", () => {
|
||||
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
|
||||
});
|
||||
|
||||
it("marks subagent tasks lost when their child session recovery is tombstoned", async () => {
|
||||
const childSessionKey = "agent:main:subagent:wedged-child";
|
||||
const task = makeStaleTask({
|
||||
runtime: "subagent",
|
||||
runId: "run-wedged-child",
|
||||
childSessionKey,
|
||||
});
|
||||
|
||||
const { currentTasks } = createTaskRegistryMaintenanceHarness({
|
||||
tasks: [task],
|
||||
sessionStore: {
|
||||
[childSessionKey]: {
|
||||
sessionId: "session-wedged-child",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: false,
|
||||
subagentRecovery: {
|
||||
automaticAttempts: 2,
|
||||
lastAttemptAt: Date.now() - 30_000,
|
||||
lastRunId: "run-wedged-child",
|
||||
wedgedAt: Date.now() - 20_000,
|
||||
wedgedReason: "subagent orphan recovery blocked after 2 rapid accepted resume attempts",
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(previewTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 });
|
||||
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 });
|
||||
expect(currentTasks.get(task.taskId)).toMatchObject({
|
||||
status: "lost",
|
||||
error: "subagent orphan recovery blocked after 2 rapid accepted resume attempts",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not mark cron tasks lost when the current process is not the cron runtime authority", async () => {
|
||||
const task = makeStaleTask({
|
||||
runtime: "cron",
|
||||
|
||||
@@ -4,7 +4,12 @@ import {
|
||||
readAcpSessionEntry,
|
||||
type AcpSessionStoreEntry,
|
||||
} from "../acp/runtime/session-meta.js";
|
||||
import {
|
||||
formatSubagentRecoveryWedgedReason,
|
||||
isSubagentRecoveryWedgedEntry,
|
||||
} from "../agents/subagent-recovery-state.js";
|
||||
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
|
||||
import type { SessionEntry } from "../config/sessions.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { isCronJobActive } from "../cron/active-jobs.js";
|
||||
import { readCronRunLogEntriesSync, resolveCronRunLogPath } from "../cron/run-log.js";
|
||||
@@ -181,6 +186,18 @@ function findSessionEntryByKey(store: Record<string, unknown>, sessionKey: strin
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function findTaskSessionEntry(task: TaskRecord): SessionEntry | undefined {
|
||||
const childSessionKey = task.childSessionKey?.trim();
|
||||
if (!childSessionKey) {
|
||||
return undefined;
|
||||
}
|
||||
const agentId = taskRegistryMaintenanceRuntime.parseAgentSessionKey(childSessionKey)?.agentId;
|
||||
const storePath = taskRegistryMaintenanceRuntime.resolveStorePath(undefined, { agentId });
|
||||
const store = taskRegistryMaintenanceRuntime.loadSessionStore(storePath);
|
||||
const entry = findSessionEntryByKey(store, childSessionKey);
|
||||
return entry && typeof entry === "object" ? (entry as SessionEntry) : undefined;
|
||||
}
|
||||
|
||||
function isActiveTask(task: TaskRecord): boolean {
|
||||
return task.status === "queued" || task.status === "running";
|
||||
}
|
||||
@@ -374,15 +391,26 @@ function hasBackingSession(task: TaskRecord): boolean {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
const agentId = taskRegistryMaintenanceRuntime.parseAgentSessionKey(childSessionKey)?.agentId;
|
||||
const storePath = taskRegistryMaintenanceRuntime.resolveStorePath(undefined, { agentId });
|
||||
const store = taskRegistryMaintenanceRuntime.loadSessionStore(storePath);
|
||||
return Boolean(findSessionEntryByKey(store, childSessionKey));
|
||||
const entry = findTaskSessionEntry(task);
|
||||
if (task.runtime === "subagent" && isSubagentRecoveryWedgedEntry(entry)) {
|
||||
return false;
|
||||
}
|
||||
return Boolean(entry);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
function resolveTaskLostError(task: TaskRecord): string {
|
||||
if (task.runtime === "subagent") {
|
||||
const entry = findTaskSessionEntry(task);
|
||||
if (entry && isSubagentRecoveryWedgedEntry(entry)) {
|
||||
return formatSubagentRecoveryWedgedReason(entry);
|
||||
}
|
||||
}
|
||||
return "backing session missing";
|
||||
}
|
||||
|
||||
function shouldMarkLost(task: TaskRecord, now: number): boolean {
|
||||
if (!isActiveTask(task)) {
|
||||
return false;
|
||||
@@ -614,7 +642,7 @@ function markTaskLost(task: TaskRecord, now: number): TaskRecord {
|
||||
taskId: task.taskId,
|
||||
endedAt: task.endedAt ?? now,
|
||||
lastEventAt: now,
|
||||
error: task.error ?? "backing session missing",
|
||||
error: task.error ?? resolveTaskLostError(task),
|
||||
cleanupAfter,
|
||||
}) ?? task;
|
||||
void taskRegistryMaintenanceRuntime.maybeDeliverTaskTerminalUpdate(updated.taskId);
|
||||
@@ -662,7 +690,7 @@ function projectTaskLost(task: TaskRecord, now: number): TaskRecord {
|
||||
status: "lost",
|
||||
endedAt: task.endedAt ?? now,
|
||||
lastEventAt: now,
|
||||
error: task.error ?? "backing session missing",
|
||||
error: task.error ?? resolveTaskLostError(task),
|
||||
};
|
||||
return {
|
||||
...projected,
|
||||
|
||||
Reference in New Issue
Block a user