From 3b289c79429032ba4d836fefe899ed296ff57ccf Mon Sep 17 00:00:00 2001 From: EVA Date: Sat, 11 Apr 2026 06:34:27 +0700 Subject: [PATCH] fix(subagents): retry archived session deletes after sweep failures (#61801) Merged via squash. Prepared head SHA: 1152c26a7801e0541c07b35b513fd54b7f37bed4 Co-authored-by: 100yenadmin <239388517+100yenadmin@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 1 + .../subagent-registry.archive.e2e.test.ts | 128 +++++++++++++++++ src/agents/subagent-registry.ts | 134 ++++++++++-------- 3 files changed, 205 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 382c593af12..05fe86a1019 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -123,6 +123,7 @@ Docs: https://docs.openclaw.ai - Agents/subagents: deduplicate delivered completion announces so retry or re-entry cleanup does not inject duplicate internal-context completion turns into the parent session. (#61525) Thanks @100yenadmin. - Agents/exec: keep sandboxed `tools.exec.host=auto` sessions from honoring per-call `host=node` or `host=gateway` overrides while a sandbox runtime is active, and stop advertising node routing in that state so exec stays on the sandbox host. (#63880) - Gateway/restart sentinel: route restart notices only from stored canonical delivery metadata and skip outbound guessing from lossy session keys, avoiding misdelivery on case-sensitive channels like Matrix. (#64391) Thanks @gumadeiras. +- Agents/subagents: preserve archived delete-mode runs until `sessions.delete` succeeds and prevent overlapping archive sweeps from duplicating in-flight cleanup attempts. (#61801) Thanks @100yenadmin. - Cron/isolated agent: run scheduled agent turns as non-owner senders so owner-only tools stay unavailable during cron execution. (#63878) - Voice Call/realtime: reject oversized realtime WebSocket frames before bridge setup so large pre-start payloads cannot crash the gateway. (#63890) Thanks @mmaps. diff --git a/src/agents/subagent-registry.archive.e2e.test.ts b/src/agents/subagent-registry.archive.e2e.test.ts index 4c9214105e9..1090178fb09 100644 --- a/src/agents/subagent-registry.archive.e2e.test.ts +++ b/src/agents/subagent-registry.archive.e2e.test.ts @@ -2,12 +2,17 @@ import { promises as fs } from "node:fs"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { callGateway } from "../gateway/call.js"; const noop = () => {}; let currentConfig = { agents: { defaults: { subagents: { archiveAfterMinutes: 60 } } }, }; const loadConfigMock = vi.fn(() => currentConfig); +const flushSweepMicrotasks = async () => { + await Promise.resolve(); + await Promise.resolve(); +}; vi.mock("../gateway/call.js", () => ({ callGateway: vi.fn(async (request: unknown) => { @@ -58,6 +63,15 @@ describe("subagent registry archive behavior", () => { currentConfig = { agents: { defaults: { subagents: { archiveAfterMinutes: 60 } } }, }; + vi.mocked(callGateway).mockReset(); + vi.mocked(callGateway).mockImplementation(async (request: unknown) => { + const method = (request as { method?: string }).method; + if (method === "agent.wait") { + // Keep lifecycle unsettled so register/replace assertions can inspect stored state. + return { status: "pending" }; + } + return {}; + }); loadConfigMock.mockClear(); mod.__testing.setDepsForTest(); mod.resetSubagentRegistryForTests({ persist: false }); @@ -107,6 +121,120 @@ describe("subagent registry archive behavior", () => { expect(mod.listSubagentRunsForRequester("agent:main:main")).toHaveLength(0); }); + it("keeps archived delete-mode runs for retry when sessions.delete fails", async () => { + currentConfig = { + agents: { defaults: { subagents: { archiveAfterMinutes: 1 } } }, + }; + const onSubagentEnded = vi.fn(async () => undefined); + const attachmentsRootDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sweep-retry-")); + const attachmentsDir = path.join(attachmentsRootDir, "child"); + await fs.mkdir(attachmentsDir, { recursive: true }); + await fs.writeFile(path.join(attachmentsDir, "artifact.txt"), "artifact", "utf8"); + let deleteAttempts = 0; + vi.mocked(callGateway).mockImplementation(async (request: unknown) => { + const method = (request as { method?: string }).method; + if (method === "agent.wait") { + return { status: "pending" }; + } + if (method === "sessions.delete") { + deleteAttempts += 1; + if (deleteAttempts === 1) { + throw new Error("delete failed"); + } + } + return {}; + }); + mod.__testing.setDepsForTest({ + ensureContextEnginesInitialized: vi.fn(), + ensureRuntimePluginsLoaded: vi.fn(), + resolveContextEngine: vi.fn(async () => ({ onSubagentEnded }) as never), + }); + + mod.registerSubagentRun({ + runId: "run-delete-retry", + childSessionKey: "agent:main:subagent:delete-retry", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "retry delete", + cleanup: "delete", + attachmentsDir, + attachmentsRootDir, + }); + + vi.advanceTimersByTime(60_000); + await flushSweepMicrotasks(); + + expect(deleteAttempts).toBe(1); + expect(mod.listSubagentRunsForRequester("agent:main:main")).toHaveLength(1); + expect(onSubagentEnded).not.toHaveBeenCalled(); + await expect(fs.access(attachmentsDir)).resolves.toBeUndefined(); + + vi.advanceTimersByTime(60_000); + await flushSweepMicrotasks(); + + expect(deleteAttempts).toBe(2); + expect(mod.listSubagentRunsForRequester("agent:main:main")).toHaveLength(0); + }); + + it("does not overlap archive sweep retries while sessions.delete is still in flight", async () => { + currentConfig = { + agents: { defaults: { subagents: { archiveAfterMinutes: 1 } } }, + }; + let resolveDelete: (() => void) | undefined; + const deletePromise = new Promise((resolve) => { + resolveDelete = resolve; + }); + vi.mocked(callGateway).mockImplementation(async (request: unknown) => { + const method = (request as { method?: string }).method; + if (method === "agent.wait") { + return { status: "pending" }; + } + if (method === "sessions.delete") { + await deletePromise; + } + return {}; + }); + + mod.registerSubagentRun({ + runId: "run-delete-inflight", + childSessionKey: "agent:main:subagent:delete-inflight", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "inflight delete", + cleanup: "delete", + }); + + vi.advanceTimersByTime(60_000); + await flushSweepMicrotasks(); + expect( + vi + .mocked(callGateway) + .mock.calls.filter( + ([request]) => (request as { method?: string } | undefined)?.method === "sessions.delete", + ), + ).toHaveLength(1); + + vi.advanceTimersByTime(60_000); + await flushSweepMicrotasks(); + expect( + vi + .mocked(callGateway) + .mock.calls.filter( + ([request]) => (request as { method?: string } | undefined)?.method === "sessions.delete", + ), + ).toHaveLength(1); + expect(mod.listSubagentRunsForRequester("agent:main:main")).toHaveLength(1); + + if (!resolveDelete) { + throw new Error("expected delete resolver"); + } + resolveDelete(); + await flushSweepMicrotasks(); + await vi.waitFor(() => { + expect(mod.listSubagentRunsForRequester("agent:main:main")).toHaveLength(0); + }); + }); + it("does not set archiveAtMs for persistent session-mode runs", () => { mod.registerSubagentRun({ runId: "run-session-1", diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index b6337f62773..b549329b0e4 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -98,6 +98,7 @@ let subagentRegistryRuntimePromise: Promise< > | null = null; let sweeper: NodeJS.Timeout | null = null; +let sweepInProgress = false; let listenerStarted = false; let listenerStop: (() => void) | null = null; // Use var to avoid TDZ when init runs across circular imports during bootstrap. @@ -470,6 +471,9 @@ function startSweeper() { return; } sweeper = setInterval(() => { + if (sweepInProgress) { + return; + } void sweepSubagentRuns(); }, 60_000); sweeper.unref?.(); @@ -484,69 +488,82 @@ function stopSweeper() { } async function sweepSubagentRuns() { - const now = Date.now(); - let mutated = false; - for (const [runId, entry] of subagentRuns.entries()) { - // Session-mode runs have no archiveAtMs — apply absolute TTL after cleanup completes. - // Use cleanupCompletedAt (not endedAt) to avoid interrupting deferred cleanup flows. - if (!entry.archiveAtMs) { - if ( - typeof entry.cleanupCompletedAt === "number" && - now - entry.cleanupCompletedAt > SESSION_RUN_TTL_MS - ) { - clearPendingLifecycleError(runId); - void notifyContextEngineSubagentEnded({ - childSessionKey: entry.childSessionKey, - reason: "swept", - workspaceDir: entry.workspaceDir, - }); - subagentRuns.delete(runId); - mutated = true; - if (!entry.retainAttachmentsOnKeep) { - await safeRemoveAttachmentsDir(entry); + if (sweepInProgress) { + return; + } + sweepInProgress = true; + try { + const now = Date.now(); + let mutated = false; + for (const [runId, entry] of subagentRuns.entries()) { + // Session-mode runs have no archiveAtMs — apply absolute TTL after cleanup completes. + // Use cleanupCompletedAt (not endedAt) to avoid interrupting deferred cleanup flows. + if (!entry.archiveAtMs) { + if ( + typeof entry.cleanupCompletedAt === "number" && + now - entry.cleanupCompletedAt > SESSION_RUN_TTL_MS + ) { + clearPendingLifecycleError(runId); + void notifyContextEngineSubagentEnded({ + childSessionKey: entry.childSessionKey, + reason: "swept", + workspaceDir: entry.workspaceDir, + }); + subagentRuns.delete(runId); + mutated = true; + if (!entry.retainAttachmentsOnKeep) { + await safeRemoveAttachmentsDir(entry); + } } + continue; + } + if (entry.archiveAtMs > now) { + continue; } - continue; - } - if (entry.archiveAtMs > now) { - continue; - } - clearPendingLifecycleError(runId); - void notifyContextEngineSubagentEnded({ - childSessionKey: entry.childSessionKey, - reason: "swept", - workspaceDir: entry.workspaceDir, - }); - subagentRuns.delete(runId); - mutated = true; - // Archive/purge is terminal for the run record; remove any retained attachments too. - await safeRemoveAttachmentsDir(entry); - try { - await subagentRegistryDeps.callGateway({ - method: "sessions.delete", - params: { - key: entry.childSessionKey, - deleteTranscript: true, - emitLifecycleHooks: false, - }, - timeoutMs: 10_000, - }); - } catch { - // ignore - } - } - // Sweep orphaned pendingLifecycleError entries (absolute TTL). - for (const [runId, pending] of pendingLifecycleErrorByRunId.entries()) { - if (now - pending.endedAt > PENDING_ERROR_TTL_MS) { clearPendingLifecycleError(runId); + try { + await subagentRegistryDeps.callGateway({ + method: "sessions.delete", + params: { + key: entry.childSessionKey, + deleteTranscript: true, + emitLifecycleHooks: false, + }, + timeoutMs: 10_000, + }); + } catch (err) { + log.warn("sessions.delete failed during subagent sweep; keeping run for retry", { + runId, + childSessionKey: entry.childSessionKey, + err, + }); + continue; + } + subagentRuns.delete(runId); + mutated = true; + // Archive/purge is terminal for the run record; remove any retained attachments too. + await safeRemoveAttachmentsDir(entry); + void notifyContextEngineSubagentEnded({ + childSessionKey: entry.childSessionKey, + reason: "swept", + workspaceDir: entry.workspaceDir, + }); + } + // Sweep orphaned pendingLifecycleError entries (absolute TTL). + for (const [runId, pending] of pendingLifecycleErrorByRunId.entries()) { + if (now - pending.endedAt > PENDING_ERROR_TTL_MS) { + clearPendingLifecycleError(runId); + } } - } - if (mutated) { - persistSubagentRuns(); - } - if (subagentRuns.size === 0) { - stopSweeper(); + if (mutated) { + persistSubagentRuns(); + } + if (subagentRuns.size === 0) { + stopSweeper(); + } + } finally { + sweepInProgress = false; } } @@ -681,6 +698,7 @@ export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) { subagentRegistryRuntimePromise = null; resetAnnounceQueuesForTests(); stopSweeper(); + sweepInProgress = false; restoreAttempted = false; if (listenerStop) { listenerStop();