mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 19:24:46 +00:00
fix: preserve pending subagent sessions during maintenance (#81498)
This commit is contained in:
48
src/agents/subagent-registry-maintenance.ts
Normal file
48
src/agents/subagent-registry-maintenance.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { registerSessionMaintenancePreserveKeysProvider } from "../config/sessions/store-maintenance-preserve.js";
|
||||
import { subagentRuns } from "./subagent-registry-memory.js";
|
||||
import { getSubagentRunsSnapshotForRead } from "./subagent-registry-state.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
|
||||
function isCleanupCompleteForMaintenance(entry: SubagentRunRecord): boolean {
|
||||
return typeof entry.cleanupCompletedAt === "number";
|
||||
}
|
||||
|
||||
function isActiveForMaintenance(entry: SubagentRunRecord): boolean {
|
||||
return typeof entry.endedAt !== "number";
|
||||
}
|
||||
|
||||
function isPendingFinalDeliveryForMaintenance(entry: SubagentRunRecord): boolean {
|
||||
return entry.pendingFinalDelivery === true;
|
||||
}
|
||||
|
||||
function isAwaitingCompletionAnnounceForMaintenance(entry: SubagentRunRecord): boolean {
|
||||
return entry.expectsCompletionMessage === true && typeof entry.completionAnnouncedAt !== "number";
|
||||
}
|
||||
|
||||
function shouldPreserveForMaintenance(entry: SubagentRunRecord): boolean {
|
||||
if (isCleanupCompleteForMaintenance(entry)) {
|
||||
return false;
|
||||
}
|
||||
if (isActiveForMaintenance(entry)) {
|
||||
return true;
|
||||
}
|
||||
return (
|
||||
isAwaitingCompletionAnnounceForMaintenance(entry) || isPendingFinalDeliveryForMaintenance(entry)
|
||||
);
|
||||
}
|
||||
|
||||
export function listSessionMaintenanceProtectedSubagentSessionKeys(): string[] {
|
||||
const keys = new Set<string>();
|
||||
for (const entry of getSubagentRunsSnapshotForRead(subagentRuns).values()) {
|
||||
if (!shouldPreserveForMaintenance(entry)) {
|
||||
continue;
|
||||
}
|
||||
const childSessionKey = entry.childSessionKey.trim();
|
||||
if (childSessionKey) {
|
||||
keys.add(childSessionKey);
|
||||
}
|
||||
}
|
||||
return [...keys];
|
||||
}
|
||||
|
||||
registerSessionMaintenancePreserveKeysProvider(listSessionMaintenanceProtectedSubagentSessionKeys);
|
||||
@@ -225,6 +225,76 @@ describe("subagent registry seam flow", () => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("lists active and pending-delivery child sessions for maintenance preservation", () => {
|
||||
const now = Date.now();
|
||||
mod.addSubagentRunForTests({
|
||||
runId: "run-active",
|
||||
childSessionKey: "agent:main:subagent:active",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "active task",
|
||||
cleanup: "delete",
|
||||
expectsCompletionMessage: true,
|
||||
createdAt: now,
|
||||
});
|
||||
mod.addSubagentRunForTests({
|
||||
runId: "run-pending",
|
||||
childSessionKey: "agent:main:subagent:pending",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "pending delivery task",
|
||||
cleanup: "delete",
|
||||
expectsCompletionMessage: true,
|
||||
createdAt: now - 2,
|
||||
endedAt: now - 1,
|
||||
pendingFinalDelivery: true,
|
||||
frozenResultText: "child output",
|
||||
});
|
||||
mod.addSubagentRunForTests({
|
||||
runId: "run-complete",
|
||||
childSessionKey: "agent:main:subagent:complete",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "already delivered task",
|
||||
cleanup: "keep",
|
||||
expectsCompletionMessage: true,
|
||||
createdAt: now - 4,
|
||||
endedAt: now - 3,
|
||||
completionAnnouncedAt: now - 2,
|
||||
cleanupCompletedAt: now - 1,
|
||||
});
|
||||
|
||||
expect(mod.listSessionMaintenanceProtectedSubagentSessionKeys().toSorted()).toEqual([
|
||||
"agent:main:subagent:active",
|
||||
"agent:main:subagent:pending",
|
||||
]);
|
||||
});
|
||||
|
||||
it("uses the disk-aware run snapshot for maintenance preservation", () => {
|
||||
const now = Date.now();
|
||||
mocks.getSubagentRunsSnapshotForRead.mockReturnValueOnce(
|
||||
new Map([
|
||||
[
|
||||
"run-restored",
|
||||
{
|
||||
runId: "run-restored",
|
||||
childSessionKey: "agent:main:subagent:restored",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "restored pending task",
|
||||
cleanup: "delete",
|
||||
expectsCompletionMessage: true,
|
||||
createdAt: now,
|
||||
},
|
||||
],
|
||||
]),
|
||||
);
|
||||
|
||||
expect(mod.listSessionMaintenanceProtectedSubagentSessionKeys()).toEqual([
|
||||
"agent:main:subagent:restored",
|
||||
]);
|
||||
});
|
||||
|
||||
it("schedules orphan recovery instead of terminally failing on recoverable wait transport errors", async () => {
|
||||
mocks.callGateway.mockImplementation(async (request: { method?: string }) => {
|
||||
if (request.method === "agent.wait") {
|
||||
|
||||
@@ -1241,6 +1241,10 @@ export function initSubagentRegistry() {
|
||||
restoreSubagentRunsOnce();
|
||||
}
|
||||
|
||||
// Importing this module also registers the subagent maintenance preserve-key
|
||||
// provider as a side effect (see subagent-registry-maintenance.ts).
|
||||
export { listSessionMaintenanceProtectedSubagentSessionKeys } from "./subagent-registry-maintenance.js";
|
||||
|
||||
// Let the shared outbound plan treat bare silent replies as dropped (instead
|
||||
// of rewriting them to visible fallback text) when the parent session has at
|
||||
// least one pending spawned child whose completion will deliver the real
|
||||
|
||||
@@ -17,6 +17,7 @@ import {
|
||||
resolveStorePath,
|
||||
} from "./paths.js";
|
||||
import { cloneSessionStoreRecord } from "./store-cache.js";
|
||||
import { collectSessionMaintenancePreserveKeys } from "./store-maintenance-preserve.js";
|
||||
import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js";
|
||||
import {
|
||||
capEntryCount,
|
||||
@@ -276,14 +277,17 @@ async function previewStoreCleanup(params: {
|
||||
},
|
||||
})
|
||||
: 0;
|
||||
const preserveSessionKeys = collectSessionMaintenancePreserveKeys([params.activeKey]);
|
||||
const pruned = pruneStaleEntries(previewStore, params.maintenance.pruneAfterMs, {
|
||||
log: false,
|
||||
preserveKeys: preserveSessionKeys,
|
||||
onPruned: ({ key }) => {
|
||||
staleKeys.add(key);
|
||||
},
|
||||
});
|
||||
const capped = capEntryCount(previewStore, params.maintenance.maxEntries, {
|
||||
log: false,
|
||||
preserveKeys: preserveSessionKeys,
|
||||
onCapped: ({ key }) => {
|
||||
cappedKeys.add(key);
|
||||
},
|
||||
@@ -313,6 +317,7 @@ async function previewStoreCleanup(params: {
|
||||
store: previewStore,
|
||||
storePath: params.target.storePath,
|
||||
activeSessionKey: params.activeKey,
|
||||
preserveKeys: preserveSessionKeys,
|
||||
maintenance: params.maintenance,
|
||||
warnOnly: false,
|
||||
dryRun: true,
|
||||
|
||||
@@ -102,6 +102,43 @@ describe("enforceSessionDiskBudget", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves runtime-provided session keys when removing entries for disk budget", async () => {
|
||||
await withTempDir({ prefix: "openclaw-disk-budget-" }, async (dir) => {
|
||||
const storePath = path.join(dir, "sessions.json");
|
||||
const childKey = "agent:main:subagent:pending-budget";
|
||||
const removableKey = "agent:main:old-removable";
|
||||
const now = Date.now();
|
||||
const store: Record<string, SessionEntry> = {
|
||||
[childKey]: {
|
||||
sessionId: "pending-budget",
|
||||
updatedAt: now - 10_000,
|
||||
spawnedBy: "agent:main:main",
|
||||
},
|
||||
[removableKey]: {
|
||||
sessionId: "old-removable",
|
||||
updatedAt: now,
|
||||
},
|
||||
};
|
||||
await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8");
|
||||
|
||||
const result = await enforceSessionDiskBudget({
|
||||
store,
|
||||
storePath,
|
||||
preserveKeys: new Set([childKey]),
|
||||
maintenance: {
|
||||
maxDiskBytes: 120,
|
||||
highWaterBytes: 80,
|
||||
},
|
||||
warnOnly: false,
|
||||
});
|
||||
|
||||
expectBudgetResult(result);
|
||||
expect(result.removedEntries).toBe(1);
|
||||
expect(store).toHaveProperty(childKey);
|
||||
expect(store).not.toHaveProperty(removableKey);
|
||||
});
|
||||
});
|
||||
|
||||
it("removes unreferenced compaction checkpoint artifacts under pressure", async () => {
|
||||
await withTempDir({ prefix: "openclaw-disk-budget-" }, async (dir) => {
|
||||
const storePath = path.join(dir, "sessions.json");
|
||||
|
||||
@@ -15,7 +15,7 @@ import {
|
||||
isTrajectorySessionArtifactName,
|
||||
} from "./artifacts.js";
|
||||
import { resolveSessionFilePath } from "./paths.js";
|
||||
import { isProtectedSessionMaintenanceEntry } from "./store-maintenance.js";
|
||||
import { shouldPreserveMaintenanceEntry } from "./store-maintenance.js";
|
||||
import type { SessionEntry } from "./types.js";
|
||||
|
||||
export type SessionDiskBudgetConfig = {
|
||||
@@ -332,6 +332,7 @@ export async function enforceSessionDiskBudget(params: {
|
||||
store: Record<string, SessionEntry>;
|
||||
storePath: string;
|
||||
activeSessionKey?: string;
|
||||
preserveKeys?: ReadonlySet<string>;
|
||||
maintenance: SessionDiskBudgetConfig;
|
||||
warnOnly: boolean;
|
||||
dryRun?: boolean;
|
||||
@@ -438,7 +439,7 @@ export async function enforceSessionDiskBudget(params: {
|
||||
if (!entry) {
|
||||
continue;
|
||||
}
|
||||
if (isProtectedSessionMaintenanceEntry(key, entry)) {
|
||||
if (shouldPreserveMaintenanceEntry({ key, entry, preserveKeys: params.preserveKeys })) {
|
||||
continue;
|
||||
}
|
||||
const previousProjectedBytes = projectedStoreBytes;
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
setSerializedSessionStore,
|
||||
writeSessionStoreCache,
|
||||
} from "./store-cache.js";
|
||||
import { collectSessionMaintenancePreserveKeys } from "./store-maintenance-preserve.js";
|
||||
import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js";
|
||||
import {
|
||||
capEntryCount,
|
||||
@@ -156,13 +157,20 @@ export function loadSessionStore(
|
||||
let pruned = 0;
|
||||
let capped = 0;
|
||||
if (maintenance.mode === "enforce" && beforeCount > maintenance.maxEntries) {
|
||||
pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, { log: false });
|
||||
const preserveSessionKeys = collectSessionMaintenancePreserveKeys();
|
||||
pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, {
|
||||
log: false,
|
||||
preserveKeys: preserveSessionKeys,
|
||||
});
|
||||
const countAfterPrune = Object.keys(store).length;
|
||||
capped = shouldRunSessionEntryMaintenance({
|
||||
entryCount: countAfterPrune,
|
||||
maxEntries: maintenance.maxEntries,
|
||||
})
|
||||
? capEntryCount(store, maintenance.maxEntries, { log: false })
|
||||
? capEntryCount(store, maintenance.maxEntries, {
|
||||
log: false,
|
||||
preserveKeys: preserveSessionKeys,
|
||||
})
|
||||
: 0;
|
||||
}
|
||||
const afterCount = Object.keys(store).length;
|
||||
|
||||
48
src/config/sessions/store-maintenance-preserve.ts
Normal file
48
src/config/sessions/store-maintenance-preserve.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { normalizeStoreSessionKey } from "./store-entry.js";
|
||||
|
||||
export type SessionMaintenancePreserveKeysProvider = () => Iterable<string> | undefined;
|
||||
|
||||
const preserveKeysProviders = new Set<SessionMaintenancePreserveKeysProvider>();
|
||||
|
||||
export function registerSessionMaintenancePreserveKeysProvider(
|
||||
provider: SessionMaintenancePreserveKeysProvider,
|
||||
): () => void {
|
||||
preserveKeysProviders.add(provider);
|
||||
return () => {
|
||||
preserveKeysProviders.delete(provider);
|
||||
};
|
||||
}
|
||||
|
||||
function addSessionMaintenancePreserveKey(keys: Set<string>, value: string | undefined): void {
|
||||
// Match how store keys are normalized in `normalizeStoreSessionKey`
|
||||
// (trim + lowercase) so providers can register session keys in any
|
||||
// case without missing matches during maintenance lookups.
|
||||
const normalized = normalizeStoreSessionKey(value ?? "");
|
||||
if (normalized) {
|
||||
keys.add(normalized);
|
||||
}
|
||||
}
|
||||
|
||||
function addSessionMaintenancePreserveKeys(
|
||||
keys: Set<string>,
|
||||
values: Iterable<string | undefined> | undefined,
|
||||
): void {
|
||||
for (const value of values ?? []) {
|
||||
addSessionMaintenancePreserveKey(keys, value);
|
||||
}
|
||||
}
|
||||
|
||||
export function collectSessionMaintenancePreserveKeys(
|
||||
baseKeys?: Iterable<string | undefined>,
|
||||
): Set<string> | undefined {
|
||||
const keys = new Set<string>();
|
||||
addSessionMaintenancePreserveKeys(keys, baseKeys);
|
||||
for (const provider of preserveKeysProviders) {
|
||||
try {
|
||||
addSessionMaintenancePreserveKeys(keys, provider());
|
||||
} catch {
|
||||
// Maintenance must remain best-effort if a runtime provider is temporarily unavailable.
|
||||
}
|
||||
}
|
||||
return keys.size > 0 ? keys : undefined;
|
||||
}
|
||||
@@ -309,7 +309,7 @@ export function isProtectedSessionMaintenanceEntry(
|
||||
return chatType === "group" || chatType === "channel" || chatType === "thread";
|
||||
}
|
||||
|
||||
function shouldPreserveMaintenanceEntry(params: {
|
||||
export function shouldPreserveMaintenanceEntry(params: {
|
||||
key: string;
|
||||
entry: SessionEntry | undefined;
|
||||
preserveKeys?: ReadonlySet<string>;
|
||||
|
||||
@@ -17,6 +17,7 @@ vi.mock("../config.js", async () => ({
|
||||
|
||||
import { getRuntimeConfig } from "../config.js";
|
||||
import { runSessionsCleanup } from "./cleanup-service.js";
|
||||
import { registerSessionMaintenancePreserveKeysProvider } from "./store-maintenance-preserve.js";
|
||||
import {
|
||||
clearSessionStoreCacheForTest,
|
||||
loadSessionStore,
|
||||
@@ -716,6 +717,38 @@ describe("Integration: saveSessionStore with pruning", () => {
|
||||
expect(loaded["session-74"]).toBeUndefined();
|
||||
});
|
||||
|
||||
it("explicit loadSessionStore maintenance preserves runtime-provided subagent sessions", async () => {
|
||||
const now = Date.now();
|
||||
const childKey = "agent:main:subagent:pending-delivery";
|
||||
const store = Object.fromEntries(
|
||||
Array.from({ length: 75 }, (_, index) => [`session-${index}`, makeEntry(now - index)]),
|
||||
);
|
||||
store[childKey] = {
|
||||
...makeEntry(now - 100 * DAY_MS),
|
||||
spawnedBy: "agent:main:slack:direct:U1",
|
||||
};
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
const unregister = registerSessionMaintenancePreserveKeysProvider(() => [childKey]);
|
||||
|
||||
try {
|
||||
const loaded = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
runMaintenance: true,
|
||||
maintenanceConfig: {
|
||||
...ENFORCED_MAINTENANCE_OVERRIDE,
|
||||
maxEntries: 50,
|
||||
pruneAfterMs: 365 * DAY_MS,
|
||||
},
|
||||
});
|
||||
|
||||
expect(Object.keys(loaded)).toHaveLength(50);
|
||||
expect(loaded).toHaveProperty(childKey);
|
||||
expect(loaded["session-74"]).toBeUndefined();
|
||||
} finally {
|
||||
unregister();
|
||||
}
|
||||
});
|
||||
|
||||
it("persists quota suspension TTL transitions through writer maintenance", async () => {
|
||||
const now = Date.now();
|
||||
const store: Record<string, SessionEntry> = {
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import crypto from "node:crypto";
|
||||
import { afterAll, beforeAll, describe, expect, it } from "vitest";
|
||||
import { createFixtureSuite } from "../../test-utils/fixture-suite.js";
|
||||
import {
|
||||
collectSessionMaintenancePreserveKeys,
|
||||
registerSessionMaintenancePreserveKeysProvider,
|
||||
} from "./store-maintenance-preserve.js";
|
||||
import {
|
||||
isProtectedSessionMaintenanceEntry,
|
||||
resolveMaintenanceConfigFromInput,
|
||||
@@ -115,6 +119,82 @@ describe("capEntryCount", () => {
|
||||
expect(store.oldest).toBeUndefined();
|
||||
expect(store.old).toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves runtime-provided pending subagent sessions when capping", () => {
|
||||
const now = Date.now();
|
||||
const childKey = "agent:main:subagent:child";
|
||||
const store = makeStore([
|
||||
[childKey, { ...makeEntry(now - 10 * DAY_MS), spawnedBy: "agent:main:slack:direct:U1" }],
|
||||
["recent-1", makeEntry(now)],
|
||||
["recent-2", makeEntry(now - 1)],
|
||||
["old", makeEntry(now - 2)],
|
||||
]);
|
||||
const unregister = registerSessionMaintenancePreserveKeysProvider(() => [childKey]);
|
||||
|
||||
try {
|
||||
const evicted = capEntryCount(store, 2, {
|
||||
preserveKeys: collectSessionMaintenancePreserveKeys(),
|
||||
});
|
||||
|
||||
expect(evicted).toBe(2);
|
||||
expect(Object.keys(store)).toHaveLength(2);
|
||||
expect(store).toHaveProperty(childKey);
|
||||
expect(store).toHaveProperty("recent-1");
|
||||
expect(store["recent-2"]).toBeUndefined();
|
||||
expect(store.old).toBeUndefined();
|
||||
} finally {
|
||||
unregister();
|
||||
}
|
||||
});
|
||||
|
||||
it("normalizes runtime-provided preserve keys to match lowercased store keys", () => {
|
||||
const now = Date.now();
|
||||
const childKey = "agent:main:subagent:child";
|
||||
const store = makeStore([
|
||||
[childKey, { ...makeEntry(now - 10 * DAY_MS), spawnedBy: "agent:main:slack:direct:U1" }],
|
||||
["recent-1", makeEntry(now)],
|
||||
["old", makeEntry(now - 1)],
|
||||
]);
|
||||
// Provider returns the key in mixed case + with surrounding whitespace;
|
||||
// normalization must match the lowercased store key during maintenance.
|
||||
const unregister = registerSessionMaintenancePreserveKeysProvider(() => [
|
||||
" Agent:Main:Subagent:CHILD ",
|
||||
]);
|
||||
|
||||
try {
|
||||
const evicted = capEntryCount(store, 2, {
|
||||
preserveKeys: collectSessionMaintenancePreserveKeys(),
|
||||
});
|
||||
|
||||
expect(evicted).toBe(1);
|
||||
expect(Object.keys(store)).toHaveLength(2);
|
||||
expect(store).toHaveProperty(childKey);
|
||||
expect(store).toHaveProperty("recent-1");
|
||||
expect(store.old).toBeUndefined();
|
||||
} finally {
|
||||
unregister();
|
||||
}
|
||||
});
|
||||
|
||||
it("can temporarily exceed the cap when every candidate is runtime-protected", () => {
|
||||
const now = Date.now();
|
||||
const store = makeStore([
|
||||
["agent:main:subagent:child-a", makeEntry(now - 2)],
|
||||
["agent:main:subagent:child-b", makeEntry(now - 1)],
|
||||
]);
|
||||
const unregister = registerSessionMaintenancePreserveKeysProvider(() => Object.keys(store));
|
||||
|
||||
try {
|
||||
const evicted = capEntryCount(store, 1, {
|
||||
preserveKeys: collectSessionMaintenancePreserveKeys(),
|
||||
});
|
||||
|
||||
expect(evicted).toBe(0);
|
||||
expect(Object.keys(store)).toHaveLength(2);
|
||||
} finally {
|
||||
unregister();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("isProtectedSessionMaintenanceEntry", () => {
|
||||
|
||||
@@ -23,6 +23,7 @@ import {
|
||||
} from "./store-cache.js";
|
||||
import { normalizeStoreSessionKey, resolveSessionStoreEntry } from "./store-entry.js";
|
||||
import { loadSessionStore, normalizeSessionStore } from "./store-load.js";
|
||||
import { collectSessionMaintenancePreserveKeys } from "./store-maintenance-preserve.js";
|
||||
import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js";
|
||||
import {
|
||||
capEntryCount,
|
||||
@@ -282,9 +283,7 @@ async function saveSessionStoreUnlocked(
|
||||
diskBudget,
|
||||
});
|
||||
} else {
|
||||
const preserveSessionKeys = opts?.activeSessionKey
|
||||
? new Set([opts.activeSessionKey])
|
||||
: undefined;
|
||||
const preserveSessionKeys = collectSessionMaintenancePreserveKeys([opts?.activeSessionKey]);
|
||||
// Prune stale entries and cap total count before serializing.
|
||||
const removedSessionFiles = new Map<string, string | undefined>();
|
||||
const pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, {
|
||||
@@ -355,6 +354,7 @@ async function saveSessionStoreUnlocked(
|
||||
store,
|
||||
storePath,
|
||||
activeSessionKey: opts?.activeSessionKey,
|
||||
preserveKeys: preserveSessionKeys,
|
||||
maintenance,
|
||||
warnOnly: false,
|
||||
log,
|
||||
|
||||
Reference in New Issue
Block a user