diff --git a/src/agents/subagent-registry-maintenance.ts b/src/agents/subagent-registry-maintenance.ts new file mode 100644 index 00000000000..94e0ee33090 --- /dev/null +++ b/src/agents/subagent-registry-maintenance.ts @@ -0,0 +1,48 @@ +import { registerSessionMaintenancePreserveKeysProvider } from "../config/sessions/store-maintenance-preserve.js"; +import { subagentRuns } from "./subagent-registry-memory.js"; +import { getSubagentRunsSnapshotForRead } from "./subagent-registry-state.js"; +import type { SubagentRunRecord } from "./subagent-registry.types.js"; + +function isCleanupCompleteForMaintenance(entry: SubagentRunRecord): boolean { + return typeof entry.cleanupCompletedAt === "number"; +} + +function isActiveForMaintenance(entry: SubagentRunRecord): boolean { + return typeof entry.endedAt !== "number"; +} + +function isPendingFinalDeliveryForMaintenance(entry: SubagentRunRecord): boolean { + return entry.pendingFinalDelivery === true; +} + +function isAwaitingCompletionAnnounceForMaintenance(entry: SubagentRunRecord): boolean { + return entry.expectsCompletionMessage === true && typeof entry.completionAnnouncedAt !== "number"; +} + +function shouldPreserveForMaintenance(entry: SubagentRunRecord): boolean { + if (isCleanupCompleteForMaintenance(entry)) { + return false; + } + if (isActiveForMaintenance(entry)) { + return true; + } + return ( + isAwaitingCompletionAnnounceForMaintenance(entry) || isPendingFinalDeliveryForMaintenance(entry) + ); +} + +export function listSessionMaintenanceProtectedSubagentSessionKeys(): string[] { + const keys = new Set(); + for (const entry of getSubagentRunsSnapshotForRead(subagentRuns).values()) { + if (!shouldPreserveForMaintenance(entry)) { + continue; + } + const childSessionKey = entry.childSessionKey.trim(); + if (childSessionKey) { + keys.add(childSessionKey); + } + } + return [...keys]; +} + +registerSessionMaintenancePreserveKeysProvider(listSessionMaintenanceProtectedSubagentSessionKeys); diff --git a/src/agents/subagent-registry.test.ts b/src/agents/subagent-registry.test.ts index 67d736a03b1..7347610a5b2 100644 --- a/src/agents/subagent-registry.test.ts +++ b/src/agents/subagent-registry.test.ts @@ -225,6 +225,76 @@ describe("subagent registry seam flow", () => { vi.useRealTimers(); }); + it("lists active and pending-delivery child sessions for maintenance preservation", () => { + const now = Date.now(); + mod.addSubagentRunForTests({ + runId: "run-active", + childSessionKey: "agent:main:subagent:active", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "active task", + cleanup: "delete", + expectsCompletionMessage: true, + createdAt: now, + }); + mod.addSubagentRunForTests({ + runId: "run-pending", + childSessionKey: "agent:main:subagent:pending", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "pending delivery task", + cleanup: "delete", + expectsCompletionMessage: true, + createdAt: now - 2, + endedAt: now - 1, + pendingFinalDelivery: true, + frozenResultText: "child output", + }); + mod.addSubagentRunForTests({ + runId: "run-complete", + childSessionKey: "agent:main:subagent:complete", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "already delivered task", + cleanup: "keep", + expectsCompletionMessage: true, + createdAt: now - 4, + endedAt: now - 3, + completionAnnouncedAt: now - 2, + cleanupCompletedAt: now - 1, + }); + + expect(mod.listSessionMaintenanceProtectedSubagentSessionKeys().toSorted()).toEqual([ + "agent:main:subagent:active", + "agent:main:subagent:pending", + ]); + }); + + it("uses the disk-aware run snapshot for maintenance preservation", () => { + const now = Date.now(); + mocks.getSubagentRunsSnapshotForRead.mockReturnValueOnce( + new Map([ + [ + "run-restored", + { + runId: "run-restored", + childSessionKey: "agent:main:subagent:restored", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "restored pending task", + cleanup: "delete", + expectsCompletionMessage: true, + createdAt: now, + }, + ], + ]), + ); + + expect(mod.listSessionMaintenanceProtectedSubagentSessionKeys()).toEqual([ + "agent:main:subagent:restored", + ]); + }); + it("schedules orphan recovery instead of terminally failing on recoverable wait transport errors", async () => { mocks.callGateway.mockImplementation(async (request: { method?: string }) => { if (request.method === "agent.wait") { diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 8884577f5a2..931bea248c3 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -1241,6 +1241,10 @@ export function initSubagentRegistry() { restoreSubagentRunsOnce(); } +// Importing this module also registers the subagent maintenance preserve-key +// provider as a side effect (see subagent-registry-maintenance.ts). +export { listSessionMaintenanceProtectedSubagentSessionKeys } from "./subagent-registry-maintenance.js"; + // Let the shared outbound plan treat bare silent replies as dropped (instead // of rewriting them to visible fallback text) when the parent session has at // least one pending spawned child whose completion will deliver the real diff --git a/src/config/sessions/cleanup-service.ts b/src/config/sessions/cleanup-service.ts index 75d46a93f71..a5dfddaaad5 100644 --- a/src/config/sessions/cleanup-service.ts +++ b/src/config/sessions/cleanup-service.ts @@ -17,6 +17,7 @@ import { resolveStorePath, } from "./paths.js"; import { cloneSessionStoreRecord } from "./store-cache.js"; +import { collectSessionMaintenancePreserveKeys } from "./store-maintenance-preserve.js"; import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js"; import { capEntryCount, @@ -276,14 +277,17 @@ async function previewStoreCleanup(params: { }, }) : 0; + const preserveSessionKeys = collectSessionMaintenancePreserveKeys([params.activeKey]); const pruned = pruneStaleEntries(previewStore, params.maintenance.pruneAfterMs, { log: false, + preserveKeys: preserveSessionKeys, onPruned: ({ key }) => { staleKeys.add(key); }, }); const capped = capEntryCount(previewStore, params.maintenance.maxEntries, { log: false, + preserveKeys: preserveSessionKeys, onCapped: ({ key }) => { cappedKeys.add(key); }, @@ -313,6 +317,7 @@ async function previewStoreCleanup(params: { store: previewStore, storePath: params.target.storePath, activeSessionKey: params.activeKey, + preserveKeys: preserveSessionKeys, maintenance: params.maintenance, warnOnly: false, dryRun: true, diff --git a/src/config/sessions/disk-budget.test.ts b/src/config/sessions/disk-budget.test.ts index 99ccbe70e08..8c4c3f73fe5 100644 --- a/src/config/sessions/disk-budget.test.ts +++ b/src/config/sessions/disk-budget.test.ts @@ -102,6 +102,43 @@ describe("enforceSessionDiskBudget", () => { }); }); + it("preserves runtime-provided session keys when removing entries for disk budget", async () => { + await withTempDir({ prefix: "openclaw-disk-budget-" }, async (dir) => { + const storePath = path.join(dir, "sessions.json"); + const childKey = "agent:main:subagent:pending-budget"; + const removableKey = "agent:main:old-removable"; + const now = Date.now(); + const store: Record = { + [childKey]: { + sessionId: "pending-budget", + updatedAt: now - 10_000, + spawnedBy: "agent:main:main", + }, + [removableKey]: { + sessionId: "old-removable", + updatedAt: now, + }, + }; + await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8"); + + const result = await enforceSessionDiskBudget({ + store, + storePath, + preserveKeys: new Set([childKey]), + maintenance: { + maxDiskBytes: 120, + highWaterBytes: 80, + }, + warnOnly: false, + }); + + expectBudgetResult(result); + expect(result.removedEntries).toBe(1); + expect(store).toHaveProperty(childKey); + expect(store).not.toHaveProperty(removableKey); + }); + }); + it("removes unreferenced compaction checkpoint artifacts under pressure", async () => { await withTempDir({ prefix: "openclaw-disk-budget-" }, async (dir) => { const storePath = path.join(dir, "sessions.json"); diff --git a/src/config/sessions/disk-budget.ts b/src/config/sessions/disk-budget.ts index a0f1de2c72b..c4707e8df94 100644 --- a/src/config/sessions/disk-budget.ts +++ b/src/config/sessions/disk-budget.ts @@ -15,7 +15,7 @@ import { isTrajectorySessionArtifactName, } from "./artifacts.js"; import { resolveSessionFilePath } from "./paths.js"; -import { isProtectedSessionMaintenanceEntry } from "./store-maintenance.js"; +import { shouldPreserveMaintenanceEntry } from "./store-maintenance.js"; import type { SessionEntry } from "./types.js"; export type SessionDiskBudgetConfig = { @@ -332,6 +332,7 @@ export async function enforceSessionDiskBudget(params: { store: Record; storePath: string; activeSessionKey?: string; + preserveKeys?: ReadonlySet; maintenance: SessionDiskBudgetConfig; warnOnly: boolean; dryRun?: boolean; @@ -438,7 +439,7 @@ export async function enforceSessionDiskBudget(params: { if (!entry) { continue; } - if (isProtectedSessionMaintenanceEntry(key, entry)) { + if (shouldPreserveMaintenanceEntry({ key, entry, preserveKeys: params.preserveKeys })) { continue; } const previousProjectedBytes = projectedStoreBytes; diff --git a/src/config/sessions/store-load.ts b/src/config/sessions/store-load.ts index 05d8a6aa9dc..16a64fcf076 100644 --- a/src/config/sessions/store-load.ts +++ b/src/config/sessions/store-load.ts @@ -9,6 +9,7 @@ import { setSerializedSessionStore, writeSessionStoreCache, } from "./store-cache.js"; +import { collectSessionMaintenancePreserveKeys } from "./store-maintenance-preserve.js"; import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js"; import { capEntryCount, @@ -156,13 +157,20 @@ export function loadSessionStore( let pruned = 0; let capped = 0; if (maintenance.mode === "enforce" && beforeCount > maintenance.maxEntries) { - pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, { log: false }); + const preserveSessionKeys = collectSessionMaintenancePreserveKeys(); + pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, { + log: false, + preserveKeys: preserveSessionKeys, + }); const countAfterPrune = Object.keys(store).length; capped = shouldRunSessionEntryMaintenance({ entryCount: countAfterPrune, maxEntries: maintenance.maxEntries, }) - ? capEntryCount(store, maintenance.maxEntries, { log: false }) + ? capEntryCount(store, maintenance.maxEntries, { + log: false, + preserveKeys: preserveSessionKeys, + }) : 0; } const afterCount = Object.keys(store).length; diff --git a/src/config/sessions/store-maintenance-preserve.ts b/src/config/sessions/store-maintenance-preserve.ts new file mode 100644 index 00000000000..dd59ba5e41b --- /dev/null +++ b/src/config/sessions/store-maintenance-preserve.ts @@ -0,0 +1,48 @@ +import { normalizeStoreSessionKey } from "./store-entry.js"; + +export type SessionMaintenancePreserveKeysProvider = () => Iterable | undefined; + +const preserveKeysProviders = new Set(); + +export function registerSessionMaintenancePreserveKeysProvider( + provider: SessionMaintenancePreserveKeysProvider, +): () => void { + preserveKeysProviders.add(provider); + return () => { + preserveKeysProviders.delete(provider); + }; +} + +function addSessionMaintenancePreserveKey(keys: Set, value: string | undefined): void { + // Match how store keys are normalized in `normalizeStoreSessionKey` + // (trim + lowercase) so providers can register session keys in any + // case without missing matches during maintenance lookups. + const normalized = normalizeStoreSessionKey(value ?? ""); + if (normalized) { + keys.add(normalized); + } +} + +function addSessionMaintenancePreserveKeys( + keys: Set, + values: Iterable | undefined, +): void { + for (const value of values ?? []) { + addSessionMaintenancePreserveKey(keys, value); + } +} + +export function collectSessionMaintenancePreserveKeys( + baseKeys?: Iterable, +): Set | undefined { + const keys = new Set(); + addSessionMaintenancePreserveKeys(keys, baseKeys); + for (const provider of preserveKeysProviders) { + try { + addSessionMaintenancePreserveKeys(keys, provider()); + } catch { + // Maintenance must remain best-effort if a runtime provider is temporarily unavailable. + } + } + return keys.size > 0 ? keys : undefined; +} diff --git a/src/config/sessions/store-maintenance.ts b/src/config/sessions/store-maintenance.ts index 70aca7138a0..e3afa073005 100644 --- a/src/config/sessions/store-maintenance.ts +++ b/src/config/sessions/store-maintenance.ts @@ -309,7 +309,7 @@ export function isProtectedSessionMaintenanceEntry( return chatType === "group" || chatType === "channel" || chatType === "thread"; } -function shouldPreserveMaintenanceEntry(params: { +export function shouldPreserveMaintenanceEntry(params: { key: string; entry: SessionEntry | undefined; preserveKeys?: ReadonlySet; diff --git a/src/config/sessions/store.pruning.integration.test.ts b/src/config/sessions/store.pruning.integration.test.ts index 8469c793f56..c65245e1f10 100644 --- a/src/config/sessions/store.pruning.integration.test.ts +++ b/src/config/sessions/store.pruning.integration.test.ts @@ -17,6 +17,7 @@ vi.mock("../config.js", async () => ({ import { getRuntimeConfig } from "../config.js"; import { runSessionsCleanup } from "./cleanup-service.js"; +import { registerSessionMaintenancePreserveKeysProvider } from "./store-maintenance-preserve.js"; import { clearSessionStoreCacheForTest, loadSessionStore, @@ -716,6 +717,38 @@ describe("Integration: saveSessionStore with pruning", () => { expect(loaded["session-74"]).toBeUndefined(); }); + it("explicit loadSessionStore maintenance preserves runtime-provided subagent sessions", async () => { + const now = Date.now(); + const childKey = "agent:main:subagent:pending-delivery"; + const store = Object.fromEntries( + Array.from({ length: 75 }, (_, index) => [`session-${index}`, makeEntry(now - index)]), + ); + store[childKey] = { + ...makeEntry(now - 100 * DAY_MS), + spawnedBy: "agent:main:slack:direct:U1", + }; + await fs.writeFile(storePath, JSON.stringify(store), "utf-8"); + const unregister = registerSessionMaintenancePreserveKeysProvider(() => [childKey]); + + try { + const loaded = loadSessionStore(storePath, { + skipCache: true, + runMaintenance: true, + maintenanceConfig: { + ...ENFORCED_MAINTENANCE_OVERRIDE, + maxEntries: 50, + pruneAfterMs: 365 * DAY_MS, + }, + }); + + expect(Object.keys(loaded)).toHaveLength(50); + expect(loaded).toHaveProperty(childKey); + expect(loaded["session-74"]).toBeUndefined(); + } finally { + unregister(); + } + }); + it("persists quota suspension TTL transitions through writer maintenance", async () => { const now = Date.now(); const store: Record = { diff --git a/src/config/sessions/store.pruning.test.ts b/src/config/sessions/store.pruning.test.ts index 27560e4ebc2..42ae3973488 100644 --- a/src/config/sessions/store.pruning.test.ts +++ b/src/config/sessions/store.pruning.test.ts @@ -1,6 +1,10 @@ import crypto from "node:crypto"; import { afterAll, beforeAll, describe, expect, it } from "vitest"; import { createFixtureSuite } from "../../test-utils/fixture-suite.js"; +import { + collectSessionMaintenancePreserveKeys, + registerSessionMaintenancePreserveKeysProvider, +} from "./store-maintenance-preserve.js"; import { isProtectedSessionMaintenanceEntry, resolveMaintenanceConfigFromInput, @@ -115,6 +119,82 @@ describe("capEntryCount", () => { expect(store.oldest).toBeUndefined(); expect(store.old).toBeUndefined(); }); + + it("preserves runtime-provided pending subagent sessions when capping", () => { + const now = Date.now(); + const childKey = "agent:main:subagent:child"; + const store = makeStore([ + [childKey, { ...makeEntry(now - 10 * DAY_MS), spawnedBy: "agent:main:slack:direct:U1" }], + ["recent-1", makeEntry(now)], + ["recent-2", makeEntry(now - 1)], + ["old", makeEntry(now - 2)], + ]); + const unregister = registerSessionMaintenancePreserveKeysProvider(() => [childKey]); + + try { + const evicted = capEntryCount(store, 2, { + preserveKeys: collectSessionMaintenancePreserveKeys(), + }); + + expect(evicted).toBe(2); + expect(Object.keys(store)).toHaveLength(2); + expect(store).toHaveProperty(childKey); + expect(store).toHaveProperty("recent-1"); + expect(store["recent-2"]).toBeUndefined(); + expect(store.old).toBeUndefined(); + } finally { + unregister(); + } + }); + + it("normalizes runtime-provided preserve keys to match lowercased store keys", () => { + const now = Date.now(); + const childKey = "agent:main:subagent:child"; + const store = makeStore([ + [childKey, { ...makeEntry(now - 10 * DAY_MS), spawnedBy: "agent:main:slack:direct:U1" }], + ["recent-1", makeEntry(now)], + ["old", makeEntry(now - 1)], + ]); + // Provider returns the key in mixed case + with surrounding whitespace; + // normalization must match the lowercased store key during maintenance. + const unregister = registerSessionMaintenancePreserveKeysProvider(() => [ + " Agent:Main:Subagent:CHILD ", + ]); + + try { + const evicted = capEntryCount(store, 2, { + preserveKeys: collectSessionMaintenancePreserveKeys(), + }); + + expect(evicted).toBe(1); + expect(Object.keys(store)).toHaveLength(2); + expect(store).toHaveProperty(childKey); + expect(store).toHaveProperty("recent-1"); + expect(store.old).toBeUndefined(); + } finally { + unregister(); + } + }); + + it("can temporarily exceed the cap when every candidate is runtime-protected", () => { + const now = Date.now(); + const store = makeStore([ + ["agent:main:subagent:child-a", makeEntry(now - 2)], + ["agent:main:subagent:child-b", makeEntry(now - 1)], + ]); + const unregister = registerSessionMaintenancePreserveKeysProvider(() => Object.keys(store)); + + try { + const evicted = capEntryCount(store, 1, { + preserveKeys: collectSessionMaintenancePreserveKeys(), + }); + + expect(evicted).toBe(0); + expect(Object.keys(store)).toHaveLength(2); + } finally { + unregister(); + } + }); }); describe("isProtectedSessionMaintenanceEntry", () => { diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 18af31e2c3d..3198c28bd87 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -23,6 +23,7 @@ import { } from "./store-cache.js"; import { normalizeStoreSessionKey, resolveSessionStoreEntry } from "./store-entry.js"; import { loadSessionStore, normalizeSessionStore } from "./store-load.js"; +import { collectSessionMaintenancePreserveKeys } from "./store-maintenance-preserve.js"; import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js"; import { capEntryCount, @@ -282,9 +283,7 @@ async function saveSessionStoreUnlocked( diskBudget, }); } else { - const preserveSessionKeys = opts?.activeSessionKey - ? new Set([opts.activeSessionKey]) - : undefined; + const preserveSessionKeys = collectSessionMaintenancePreserveKeys([opts?.activeSessionKey]); // Prune stale entries and cap total count before serializing. const removedSessionFiles = new Map(); const pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, { @@ -355,6 +354,7 @@ async function saveSessionStoreUnlocked( store, storePath, activeSessionKey: opts?.activeSessionKey, + preserveKeys: preserveSessionKeys, maintenance, warnOnly: false, log,