mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-15 03:01:02 +00:00
fix(gateway): add TTL cleanup for 3 Maps that grow unbounded causing OOM
Gateway crashes with OOM after batch processing ~1000+ agent sessions. Three Maps accumulate entries without cleanup: 1. subagentRuns: session-mode runs have archiveAtMs=undefined, so the 60s sweeper skips them forever. Add 5-min absolute TTL for completed session runs. Also sweep orphaned pendingLifecycleError entries with 5-min TTL. 2. runContextById: only cleaned via manual clearAgentRunContext() calls. Add registeredAt timestamp and sweepStaleRunContexts() with 30-min TTL, called from the existing 60s maintenance timer. Also sweeps the companion seqByRun Map for the same runIds. 3. pendingLifecycleErrorByRunId: 15s retry timer but no absolute TTL. Now swept in sweepSubagentRuns() after 5 min. Pre-deploy entries without registeredAt are treated as infinitely old and swept immediately. Fixes #52725 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -478,8 +478,24 @@ function stopSweeper() {
|
||||
async function sweepSubagentRuns() {
|
||||
const now = Date.now();
|
||||
let mutated = false;
|
||||
const SESSION_RUN_TTL_MS = 5 * 60 * 1000; // 5 min absolute TTL for session-mode runs
|
||||
for (const [runId, entry] of subagentRuns.entries()) {
|
||||
if (!entry.archiveAtMs || entry.archiveAtMs > now) {
|
||||
// Session-mode runs have no archiveAtMs — apply absolute TTL after completion.
|
||||
if (!entry.archiveAtMs) {
|
||||
if (typeof entry.endedAt === "number" && now - entry.endedAt > SESSION_RUN_TTL_MS) {
|
||||
clearPendingLifecycleError(runId);
|
||||
void notifyContextEngineSubagentEnded({
|
||||
childSessionKey: entry.childSessionKey,
|
||||
reason: "swept",
|
||||
workspaceDir: entry.workspaceDir,
|
||||
});
|
||||
subagentRuns.delete(runId);
|
||||
mutated = true;
|
||||
await safeRemoveAttachmentsDir(entry);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (entry.archiveAtMs > now) {
|
||||
continue;
|
||||
}
|
||||
clearPendingLifecycleError(runId);
|
||||
@@ -506,6 +522,14 @@ async function sweepSubagentRuns() {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
// Sweep orphaned pendingLifecycleError entries (absolute TTL).
|
||||
const PENDING_ERROR_TTL_MS = 5 * 60 * 1000;
|
||||
for (const [runId, pending] of pendingLifecycleErrorByRunId.entries()) {
|
||||
if (now - pending.endedAt > PENDING_ERROR_TTL_MS) {
|
||||
clearPendingLifecycleError(runId);
|
||||
}
|
||||
}
|
||||
|
||||
if (mutated) {
|
||||
persistSubagentRuns();
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { HealthSummary } from "../commands/health.js";
|
||||
import { sweepStaleRunContexts } from "../infra/agent-events.js";
|
||||
import { cleanOldMedia } from "../media/store.js";
|
||||
import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js";
|
||||
import type { ChatRunEntry } from "./server-chat.js";
|
||||
@@ -151,6 +152,8 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
params.chatDeltaSentAt.delete(runId);
|
||||
params.chatDeltaLastBroadcastLen.delete(runId);
|
||||
}
|
||||
// Sweep stale agent run contexts (orphaned when lifecycle end/error is missed).
|
||||
sweepStaleRunContexts();
|
||||
}, 60_000);
|
||||
|
||||
if (typeof params.mediaCleanupTtlMs !== "number") {
|
||||
|
||||
@@ -111,6 +111,8 @@ export type AgentRunContext = {
|
||||
isHeartbeat?: boolean;
|
||||
/** Whether control UI clients should receive chat/agent updates for this run. */
|
||||
isControlUiVisible?: boolean;
|
||||
/** Timestamp when this context was first registered (for TTL-based cleanup). */
|
||||
registeredAt?: number;
|
||||
};
|
||||
|
||||
type AgentEventState = {
|
||||
@@ -136,7 +138,7 @@ export function registerAgentRunContext(runId: string, context: AgentRunContext)
|
||||
const state = getAgentEventState();
|
||||
const existing = state.runContextById.get(runId);
|
||||
if (!existing) {
|
||||
state.runContextById.set(runId, { ...context });
|
||||
state.runContextById.set(runId, { ...context, registeredAt: context.registeredAt ?? Date.now() });
|
||||
return;
|
||||
}
|
||||
if (context.sessionKey && existing.sessionKey !== context.sessionKey) {
|
||||
@@ -161,6 +163,25 @@ export function clearAgentRunContext(runId: string) {
|
||||
getAgentEventState().runContextById.delete(runId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sweep stale run contexts that exceeded the given TTL.
|
||||
* Guards against orphaned entries when lifecycle "end"/"error" events are missed.
|
||||
*/
|
||||
export function sweepStaleRunContexts(maxAgeMs = 30 * 60 * 1000): number {
|
||||
const now = Date.now();
|
||||
let swept = 0;
|
||||
for (const [runId, ctx] of state.runContextById.entries()) {
|
||||
// Treat missing registeredAt (pre-deploy entries) as infinitely old.
|
||||
const age = ctx.registeredAt ? now - ctx.registeredAt : Infinity;
|
||||
if (age > maxAgeMs) {
|
||||
state.runContextById.delete(runId);
|
||||
state.seqByRun.delete(runId);
|
||||
swept++;
|
||||
}
|
||||
}
|
||||
return swept;
|
||||
}
|
||||
|
||||
export function resetAgentRunContextForTest() {
|
||||
getAgentEventState().runContextById.clear();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user