fix(subagents): retry archived session deletes after sweep failures (#61801)

Merged via squash.

Prepared head SHA: 1152c26a78
Co-authored-by: 100yenadmin <239388517+100yenadmin@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
EVA
2026-04-11 06:34:27 +07:00
committed by GitHub
parent a403e611c7
commit 3b289c7942
3 changed files with 205 additions and 58 deletions

View File

@@ -123,6 +123,7 @@ Docs: https://docs.openclaw.ai
- Agents/subagents: deduplicate delivered completion announces so retry or re-entry cleanup does not inject duplicate internal-context completion turns into the parent session. (#61525) Thanks @100yenadmin.
- Agents/exec: keep sandboxed `tools.exec.host=auto` sessions from honoring per-call `host=node` or `host=gateway` overrides while a sandbox runtime is active, and stop advertising node routing in that state so exec stays on the sandbox host. (#63880)
- Gateway/restart sentinel: route restart notices only from stored canonical delivery metadata and skip outbound guessing from lossy session keys, avoiding misdelivery on case-sensitive channels like Matrix. (#64391) Thanks @gumadeiras.
- Agents/subagents: preserve archived delete-mode runs until `sessions.delete` succeeds and prevent overlapping archive sweeps from duplicating in-flight cleanup attempts. (#61801) Thanks @100yenadmin.
- Cron/isolated agent: run scheduled agent turns as non-owner senders so owner-only tools stay unavailable during cron execution. (#63878)
- Voice Call/realtime: reject oversized realtime WebSocket frames before bridge setup so large pre-start payloads cannot crash the gateway. (#63890) Thanks @mmaps.

View File

@@ -2,12 +2,17 @@ import { promises as fs } from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { callGateway } from "../gateway/call.js";
const noop = () => {};
let currentConfig = {
agents: { defaults: { subagents: { archiveAfterMinutes: 60 } } },
};
const loadConfigMock = vi.fn(() => currentConfig);
const flushSweepMicrotasks = async () => {
await Promise.resolve();
await Promise.resolve();
};
vi.mock("../gateway/call.js", () => ({
callGateway: vi.fn(async (request: unknown) => {
@@ -58,6 +63,15 @@ describe("subagent registry archive behavior", () => {
currentConfig = {
agents: { defaults: { subagents: { archiveAfterMinutes: 60 } } },
};
vi.mocked(callGateway).mockReset();
vi.mocked(callGateway).mockImplementation(async (request: unknown) => {
const method = (request as { method?: string }).method;
if (method === "agent.wait") {
// Keep lifecycle unsettled so register/replace assertions can inspect stored state.
return { status: "pending" };
}
return {};
});
loadConfigMock.mockClear();
mod.__testing.setDepsForTest();
mod.resetSubagentRegistryForTests({ persist: false });
@@ -107,6 +121,120 @@ describe("subagent registry archive behavior", () => {
expect(mod.listSubagentRunsForRequester("agent:main:main")).toHaveLength(0);
});
it("keeps archived delete-mode runs for retry when sessions.delete fails", async () => {
currentConfig = {
agents: { defaults: { subagents: { archiveAfterMinutes: 1 } } },
};
const onSubagentEnded = vi.fn(async () => undefined);
const attachmentsRootDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sweep-retry-"));
const attachmentsDir = path.join(attachmentsRootDir, "child");
await fs.mkdir(attachmentsDir, { recursive: true });
await fs.writeFile(path.join(attachmentsDir, "artifact.txt"), "artifact", "utf8");
let deleteAttempts = 0;
vi.mocked(callGateway).mockImplementation(async (request: unknown) => {
const method = (request as { method?: string }).method;
if (method === "agent.wait") {
return { status: "pending" };
}
if (method === "sessions.delete") {
deleteAttempts += 1;
if (deleteAttempts === 1) {
throw new Error("delete failed");
}
}
return {};
});
mod.__testing.setDepsForTest({
ensureContextEnginesInitialized: vi.fn(),
ensureRuntimePluginsLoaded: vi.fn(),
resolveContextEngine: vi.fn(async () => ({ onSubagentEnded }) as never),
});
mod.registerSubagentRun({
runId: "run-delete-retry",
childSessionKey: "agent:main:subagent:delete-retry",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "retry delete",
cleanup: "delete",
attachmentsDir,
attachmentsRootDir,
});
vi.advanceTimersByTime(60_000);
await flushSweepMicrotasks();
expect(deleteAttempts).toBe(1);
expect(mod.listSubagentRunsForRequester("agent:main:main")).toHaveLength(1);
expect(onSubagentEnded).not.toHaveBeenCalled();
await expect(fs.access(attachmentsDir)).resolves.toBeUndefined();
vi.advanceTimersByTime(60_000);
await flushSweepMicrotasks();
expect(deleteAttempts).toBe(2);
expect(mod.listSubagentRunsForRequester("agent:main:main")).toHaveLength(0);
});
it("does not overlap archive sweep retries while sessions.delete is still in flight", async () => {
currentConfig = {
agents: { defaults: { subagents: { archiveAfterMinutes: 1 } } },
};
let resolveDelete: (() => void) | undefined;
const deletePromise = new Promise<void>((resolve) => {
resolveDelete = resolve;
});
vi.mocked(callGateway).mockImplementation(async (request: unknown) => {
const method = (request as { method?: string }).method;
if (method === "agent.wait") {
return { status: "pending" };
}
if (method === "sessions.delete") {
await deletePromise;
}
return {};
});
mod.registerSubagentRun({
runId: "run-delete-inflight",
childSessionKey: "agent:main:subagent:delete-inflight",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "inflight delete",
cleanup: "delete",
});
vi.advanceTimersByTime(60_000);
await flushSweepMicrotasks();
expect(
vi
.mocked(callGateway)
.mock.calls.filter(
([request]) => (request as { method?: string } | undefined)?.method === "sessions.delete",
),
).toHaveLength(1);
vi.advanceTimersByTime(60_000);
await flushSweepMicrotasks();
expect(
vi
.mocked(callGateway)
.mock.calls.filter(
([request]) => (request as { method?: string } | undefined)?.method === "sessions.delete",
),
).toHaveLength(1);
expect(mod.listSubagentRunsForRequester("agent:main:main")).toHaveLength(1);
if (!resolveDelete) {
throw new Error("expected delete resolver");
}
resolveDelete();
await flushSweepMicrotasks();
await vi.waitFor(() => {
expect(mod.listSubagentRunsForRequester("agent:main:main")).toHaveLength(0);
});
});
it("does not set archiveAtMs for persistent session-mode runs", () => {
mod.registerSubagentRun({
runId: "run-session-1",

View File

@@ -98,6 +98,7 @@ let subagentRegistryRuntimePromise: Promise<
> | null = null;
let sweeper: NodeJS.Timeout | null = null;
let sweepInProgress = false;
let listenerStarted = false;
let listenerStop: (() => void) | null = null;
// Use var to avoid TDZ when init runs across circular imports during bootstrap.
@@ -470,6 +471,9 @@ function startSweeper() {
return;
}
sweeper = setInterval(() => {
if (sweepInProgress) {
return;
}
void sweepSubagentRuns();
}, 60_000);
sweeper.unref?.();
@@ -484,69 +488,82 @@ function stopSweeper() {
}
async function sweepSubagentRuns() {
const now = Date.now();
let mutated = false;
for (const [runId, entry] of subagentRuns.entries()) {
// Session-mode runs have no archiveAtMs — apply absolute TTL after cleanup completes.
// Use cleanupCompletedAt (not endedAt) to avoid interrupting deferred cleanup flows.
if (!entry.archiveAtMs) {
if (
typeof entry.cleanupCompletedAt === "number" &&
now - entry.cleanupCompletedAt > SESSION_RUN_TTL_MS
) {
clearPendingLifecycleError(runId);
void notifyContextEngineSubagentEnded({
childSessionKey: entry.childSessionKey,
reason: "swept",
workspaceDir: entry.workspaceDir,
});
subagentRuns.delete(runId);
mutated = true;
if (!entry.retainAttachmentsOnKeep) {
await safeRemoveAttachmentsDir(entry);
if (sweepInProgress) {
return;
}
sweepInProgress = true;
try {
const now = Date.now();
let mutated = false;
for (const [runId, entry] of subagentRuns.entries()) {
// Session-mode runs have no archiveAtMs — apply absolute TTL after cleanup completes.
// Use cleanupCompletedAt (not endedAt) to avoid interrupting deferred cleanup flows.
if (!entry.archiveAtMs) {
if (
typeof entry.cleanupCompletedAt === "number" &&
now - entry.cleanupCompletedAt > SESSION_RUN_TTL_MS
) {
clearPendingLifecycleError(runId);
void notifyContextEngineSubagentEnded({
childSessionKey: entry.childSessionKey,
reason: "swept",
workspaceDir: entry.workspaceDir,
});
subagentRuns.delete(runId);
mutated = true;
if (!entry.retainAttachmentsOnKeep) {
await safeRemoveAttachmentsDir(entry);
}
}
continue;
}
if (entry.archiveAtMs > now) {
continue;
}
continue;
}
if (entry.archiveAtMs > now) {
continue;
}
clearPendingLifecycleError(runId);
void notifyContextEngineSubagentEnded({
childSessionKey: entry.childSessionKey,
reason: "swept",
workspaceDir: entry.workspaceDir,
});
subagentRuns.delete(runId);
mutated = true;
// Archive/purge is terminal for the run record; remove any retained attachments too.
await safeRemoveAttachmentsDir(entry);
try {
await subagentRegistryDeps.callGateway({
method: "sessions.delete",
params: {
key: entry.childSessionKey,
deleteTranscript: true,
emitLifecycleHooks: false,
},
timeoutMs: 10_000,
});
} catch {
// ignore
}
}
// Sweep orphaned pendingLifecycleError entries (absolute TTL).
for (const [runId, pending] of pendingLifecycleErrorByRunId.entries()) {
if (now - pending.endedAt > PENDING_ERROR_TTL_MS) {
clearPendingLifecycleError(runId);
try {
await subagentRegistryDeps.callGateway({
method: "sessions.delete",
params: {
key: entry.childSessionKey,
deleteTranscript: true,
emitLifecycleHooks: false,
},
timeoutMs: 10_000,
});
} catch (err) {
log.warn("sessions.delete failed during subagent sweep; keeping run for retry", {
runId,
childSessionKey: entry.childSessionKey,
err,
});
continue;
}
subagentRuns.delete(runId);
mutated = true;
// Archive/purge is terminal for the run record; remove any retained attachments too.
await safeRemoveAttachmentsDir(entry);
void notifyContextEngineSubagentEnded({
childSessionKey: entry.childSessionKey,
reason: "swept",
workspaceDir: entry.workspaceDir,
});
}
// Sweep orphaned pendingLifecycleError entries (absolute TTL).
for (const [runId, pending] of pendingLifecycleErrorByRunId.entries()) {
if (now - pending.endedAt > PENDING_ERROR_TTL_MS) {
clearPendingLifecycleError(runId);
}
}
}
if (mutated) {
persistSubagentRuns();
}
if (subagentRuns.size === 0) {
stopSweeper();
if (mutated) {
persistSubagentRuns();
}
if (subagentRuns.size === 0) {
stopSweeper();
}
} finally {
sweepInProgress = false;
}
}
@@ -681,6 +698,7 @@ export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) {
subagentRegistryRuntimePromise = null;
resetAnnounceQueuesForTests();
stopSweeper();
sweepInProgress = false;
restoreAttempted = false;
if (listenerStop) {
listenerStop();