From 36c3a54b51ec2d748ef0df6f527499a60eca902e Mon Sep 17 00:00:00 2001 From: Menglin Li Date: Sat, 11 Apr 2026 00:45:12 +0800 Subject: [PATCH] fix(gateway): plug long-running memory leaks Prune stale gateway control-plane rate-limit buckets, bound transcript-session lookup caching, clear agent event sequence state with run contexts, and clear node wake/nudge state on disconnect.\n\nVerified locally after rebasing onto main:\n\n- pnpm test src/gateway/control-plane-rate-limit.test.ts src/gateway/session-transcript-key.test.ts src/infra/agent-events.test.ts src/gateway/server-methods/nodes.invoke-wake.test.ts\n- pnpm check\n\nCo-authored-by: lml2468 <39320777+lml2468@users.noreply.github.com> --- src/gateway/control-plane-rate-limit.test.ts | 55 +++++++++++++++++++ src/gateway/control-plane-rate-limit.ts | 33 +++++++++++ src/gateway/server-maintenance.ts | 5 ++ .../server-methods/nodes.invoke-wake.test.ts | 49 ++++++++++++++++- src/gateway/server-methods/nodes.ts | 9 +++ src/gateway/server/ws-connection.ts | 2 + src/gateway/session-transcript-key.test.ts | 40 ++++++++++++++ src/gateway/session-transcript-key.ts | 11 ++++ src/infra/agent-events.test.ts | 24 ++++++++ src/infra/agent-events.ts | 5 +- 10 files changed, 230 insertions(+), 3 deletions(-) create mode 100644 src/gateway/control-plane-rate-limit.test.ts diff --git a/src/gateway/control-plane-rate-limit.test.ts b/src/gateway/control-plane-rate-limit.test.ts new file mode 100644 index 00000000000..5d6729492f1 --- /dev/null +++ b/src/gateway/control-plane-rate-limit.test.ts @@ -0,0 +1,55 @@ +import { afterEach, describe, expect, test } from "vitest"; +import { + consumeControlPlaneWriteBudget, + pruneStaleControlPlaneBuckets, + __testing, +} from "./control-plane-rate-limit.js"; + +describe("control-plane-rate-limit", () => { + afterEach(() => { + __testing.resetControlPlaneRateLimitState(); + }); + + test("pruneStaleControlPlaneBuckets removes expired buckets (#63643)", () => { + // Create buckets at different times + const baseMs = 1_000_000; + consumeControlPlaneWriteBudget({ + client: { connect: { device: { id: "dev-old" } }, clientIp: "1.2.3.4" } as never, + nowMs: baseMs, + }); + consumeControlPlaneWriteBudget({ + client: { connect: { device: { id: "dev-recent" } }, clientIp: "5.6.7.8" } as never, + nowMs: baseMs + 4 * 60_000, + }); + + // Prune at baseMs + 6 minutes — "dev-old" is > 5 min stale, "dev-recent" is only 2 min + const pruned = pruneStaleControlPlaneBuckets(baseMs + 6 * 60_000); + expect(pruned).toBe(1); + + // "dev-recent" should still have budget + const result = consumeControlPlaneWriteBudget({ + client: { connect: { device: { id: "dev-recent" } }, clientIp: "5.6.7.8" } as never, + nowMs: baseMs + 6 * 60_000, + }); + expect(result.allowed).toBe(true); + }); + + test("pruneStaleControlPlaneBuckets is safe on empty map", () => { + expect(pruneStaleControlPlaneBuckets()).toBe(0); + }); + + test("control-plane bucket map stays bounded between prune sweeps", () => { + const baseMs = 2_000_000; + for (let i = 0; i < 10_001; i++) { + consumeControlPlaneWriteBudget({ + client: { + connect: { device: { id: `dev-${i}` } }, + clientIp: "1.2.3.4", + } as never, + nowMs: baseMs, + }); + } + + expect(__testing.getControlPlaneRateLimitBucketCount()).toBe(10_000); + }); +}); diff --git a/src/gateway/control-plane-rate-limit.ts b/src/gateway/control-plane-rate-limit.ts index 6e05a53e30d..f7386ad2130 100644 --- a/src/gateway/control-plane-rate-limit.ts +++ b/src/gateway/control-plane-rate-limit.ts @@ -2,6 +2,9 @@ import type { GatewayClient } from "./server-methods/types.js"; const CONTROL_PLANE_RATE_LIMIT_MAX_REQUESTS = 3; const CONTROL_PLANE_RATE_LIMIT_WINDOW_MS = 60_000; +const CONTROL_PLANE_BUCKET_MAX_STALE_MS = 5 * 60_000; +/** Hard cap to prevent memory DoS from rapid unique-key injection (CWE-400). */ +const CONTROL_PLANE_BUCKET_MAX_ENTRIES = 10_000; type Bucket = { count: number; @@ -45,6 +48,17 @@ export function consumeControlPlaneWriteBudget(params: { const bucket = controlPlaneBuckets.get(key); if (!bucket || nowMs - bucket.windowStartMs >= CONTROL_PLANE_RATE_LIMIT_WINDOW_MS) { + // Enforce hard cap before inserting a new key to bound memory usage + // even between periodic prune sweeps. + if ( + !controlPlaneBuckets.has(key) && + controlPlaneBuckets.size >= CONTROL_PLANE_BUCKET_MAX_ENTRIES + ) { + const oldest = controlPlaneBuckets.keys().next().value; + if (oldest !== undefined) { + controlPlaneBuckets.delete(oldest); + } + } controlPlaneBuckets.set(key, { count: 1, windowStartMs: nowMs, @@ -79,7 +93,26 @@ export function consumeControlPlaneWriteBudget(params: { }; } +/** + * Remove buckets whose rate-limit window expired more than + * CONTROL_PLANE_BUCKET_MAX_STALE_MS ago. Called periodically + * by the gateway maintenance timer to prevent unbounded growth. + */ +export function pruneStaleControlPlaneBuckets(nowMs = Date.now()): number { + let pruned = 0; + for (const [key, bucket] of controlPlaneBuckets) { + if (nowMs - bucket.windowStartMs > CONTROL_PLANE_BUCKET_MAX_STALE_MS) { + controlPlaneBuckets.delete(key); + pruned += 1; + } + } + return pruned; +} + export const __testing = { + getControlPlaneRateLimitBucketCount() { + return controlPlaneBuckets.size; + }, resetControlPlaneRateLimitState() { controlPlaneBuckets.clear(); }, diff --git a/src/gateway/server-maintenance.ts b/src/gateway/server-maintenance.ts index 7755b6e6706..a00a573e87a 100644 --- a/src/gateway/server-maintenance.ts +++ b/src/gateway/server-maintenance.ts @@ -2,6 +2,7 @@ import type { HealthSummary } from "../commands/health.js"; import { sweepStaleRunContexts } from "../infra/agent-events.js"; import { cleanOldMedia } from "../media/store.js"; import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js"; +import { pruneStaleControlPlaneBuckets } from "./control-plane-rate-limit.js"; import type { ChatRunEntry } from "./server-chat.js"; import { DEDUPE_MAX, @@ -135,6 +136,10 @@ export function startGatewayMaintenanceTimers(params: { params.chatDeltaLastBroadcastLen.delete(runId); } + // Prune expired control-plane rate-limit buckets to prevent unbounded + // growth when many unique clients connect over time. + pruneStaleControlPlaneBuckets(now); + // Sweep stale buffers for runs that were never explicitly aborted. // Only reap orphaned buffers after the abort controller is gone; active // runs can legitimately sit idle while tools/models work. diff --git a/src/gateway/server-methods/nodes.invoke-wake.test.ts b/src/gateway/server-methods/nodes.invoke-wake.test.ts index 97b3549cd5e..c066fc11902 100644 --- a/src/gateway/server-methods/nodes.invoke-wake.test.ts +++ b/src/gateway/server-methods/nodes.invoke-wake.test.ts @@ -1,6 +1,11 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { ErrorCodes } from "../protocol/index.js"; -import { maybeWakeNodeWithApns, nodeHandlers } from "./nodes.js"; +import { + clearNodeWakeState, + maybeSendNodeWakeNudge, + maybeWakeNodeWithApns, + nodeHandlers, +} from "./nodes.js"; type MockNodeCommandPolicyParams = { command: string; @@ -313,6 +318,48 @@ describe("node.invoke APNs wake path", () => { expect(mocks.sendApnsBackgroundWake).not.toHaveBeenCalled(); }); + it("clears wake and nudge throttle state when a node disconnects", async () => { + mockDirectWakeConfig("ios-node-clear-wake"); + mocks.sendApnsAlert.mockResolvedValue({ + ok: true, + status: 200, + tokenSuffix: "1234abcd", + topic: "ai.openclaw.ios", + environment: "sandbox", + transport: "direct", + }); + + await expect(maybeWakeNodeWithApns("ios-node-clear-wake")).resolves.toMatchObject({ + path: "sent", + throttled: false, + }); + await expect(maybeSendNodeWakeNudge("ios-node-clear-wake")).resolves.toMatchObject({ + sent: true, + throttled: false, + }); + await expect(maybeWakeNodeWithApns("ios-node-clear-wake")).resolves.toMatchObject({ + path: "throttled", + throttled: true, + }); + await expect(maybeSendNodeWakeNudge("ios-node-clear-wake")).resolves.toMatchObject({ + sent: false, + throttled: true, + }); + + clearNodeWakeState("ios-node-clear-wake"); + + await expect(maybeWakeNodeWithApns("ios-node-clear-wake")).resolves.toMatchObject({ + path: "sent", + throttled: false, + }); + await expect(maybeSendNodeWakeNudge("ios-node-clear-wake")).resolves.toMatchObject({ + sent: true, + throttled: false, + }); + expect(mocks.sendApnsBackgroundWake).toHaveBeenCalledTimes(2); + expect(mocks.sendApnsAlert).toHaveBeenCalledTimes(2); + }); + it("wakes and retries invoke after the node reconnects", async () => { vi.useFakeTimers(); mockDirectWakeConfig("ios-node-reconnect"); diff --git a/src/gateway/server-methods/nodes.ts b/src/gateway/server-methods/nodes.ts index 342e98d6d87..4b6f3dcbbba 100644 --- a/src/gateway/server-methods/nodes.ts +++ b/src/gateway/server-methods/nodes.ts @@ -518,6 +518,15 @@ export async function waitForNodeReconnect(params: { return Boolean(params.context.nodeRegistry.get(params.nodeId)); } +/** + * Remove cached wake/nudge state for a node that has disconnected. + * Called from the WS close handler to prevent unbounded growth. + */ +export function clearNodeWakeState(nodeId: string): void { + nodeWakeById.delete(nodeId); + nodeWakeNudgeById.delete(nodeId); +} + export const nodeHandlers: GatewayRequestHandlers = { "node.pair.request": async ({ params, respond, context }) => { if (!validateNodePairRequestParams(params)) { diff --git a/src/gateway/server/ws-connection.ts b/src/gateway/server/ws-connection.ts index 9a5b1722a9f..b1e1d1d64a4 100644 --- a/src/gateway/server/ws-connection.ts +++ b/src/gateway/server/ws-connection.ts @@ -3,6 +3,7 @@ import type { Socket } from "node:net"; import type { WebSocket, WebSocketServer } from "ws"; import { resolveCanvasHostUrl } from "../../infra/canvas-host-url.js"; import { removeRemoteNodeInfo } from "../../infra/skills-remote.js"; +import { clearNodeWakeState } from "../server-methods/nodes.js"; import { upsertPresence } from "../../infra/system-presence.js"; import type { createSubsystemLogger } from "../../logging/subsystem.js"; import { normalizeLowercaseStringOrEmpty } from "../../shared/string-coerce.js"; @@ -325,6 +326,7 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti if (nodeId) { removeRemoteNodeInfo(nodeId); context.nodeUnsubscribeAll(nodeId); + clearNodeWakeState(nodeId); } } logWs("out", "close", { diff --git a/src/gateway/session-transcript-key.test.ts b/src/gateway/session-transcript-key.test.ts index b2bda645f35..c2e40e41025 100644 --- a/src/gateway/session-transcript-key.test.ts +++ b/src/gateway/session-transcript-key.test.ts @@ -141,4 +141,44 @@ describe("resolveSessionKeyForTranscriptFile", () => { expect(resolveSessionKeyForTranscriptFile("/tmp/shared.jsonl")).toBe("agent:main:newer"); }); + + it("evicts oldest entry when cache exceeds 256 entries (#63643)", () => { + // Fill cache with 256 unique transcript paths + for (let i = 0; i < 256; i++) { + const sessionKey = `agent:main:session-${i}`; + const transcriptPath = `/tmp/session-${i}.jsonl`; + const store = { + [sessionKey]: { sessionId: `sid-${i}`, updatedAt: now + i }, + } satisfies Record; + loadCombinedSessionStoreForGatewayMock.mockReturnValue({ + storePath: "(multiple)", + store, + }); + resolveSessionTranscriptCandidatesMock.mockReturnValue([transcriptPath]); + resolveSessionKeyForTranscriptFile(transcriptPath); + } + + // Now add the 257th — should evict session-0 + const overflowKey = "agent:main:session-overflow"; + const overflowPath = "/tmp/session-overflow.jsonl"; + const overflowStore = { + [overflowKey]: { sessionId: "sid-overflow", updatedAt: now + 999 }, + } satisfies Record; + loadCombinedSessionStoreForGatewayMock.mockReturnValue({ + storePath: "(multiple)", + store: overflowStore, + }); + resolveSessionTranscriptCandidatesMock.mockReturnValue([overflowPath]); + expect(resolveSessionKeyForTranscriptFile(overflowPath)).toBe(overflowKey); + + // session-0 should have been evicted from cache — next lookup will + // re-resolve from the store (returns undefined since store was mocked + // with only the overflow entry). + loadCombinedSessionStoreForGatewayMock.mockReturnValue({ + storePath: "(multiple)", + store: overflowStore, + }); + resolveSessionTranscriptCandidatesMock.mockReturnValue([]); + expect(resolveSessionKeyForTranscriptFile("/tmp/session-0.jsonl")).toBeUndefined(); + }); }); diff --git a/src/gateway/session-transcript-key.ts b/src/gateway/session-transcript-key.ts index d5dbebf2571..799d8418b23 100644 --- a/src/gateway/session-transcript-key.ts +++ b/src/gateway/session-transcript-key.ts @@ -12,6 +12,7 @@ import { } from "./session-utils.js"; const TRANSCRIPT_SESSION_KEY_CACHE = new Map(); +const TRANSCRIPT_SESSION_KEY_CACHE_MAX = 256; function resolveTranscriptPathForComparison(value: string | undefined): string | undefined { const trimmed = normalizeOptionalString(value); @@ -135,6 +136,16 @@ export function resolveSessionKeyForTranscriptFile(sessionFile: string): string ? freshestMatch?.key : undefined; if (resolvedKey) { + // Evict oldest-inserted entry when cache exceeds size cap (FIFO bound). + if ( + !TRANSCRIPT_SESSION_KEY_CACHE.has(targetPath) && + TRANSCRIPT_SESSION_KEY_CACHE.size >= TRANSCRIPT_SESSION_KEY_CACHE_MAX + ) { + const oldest = TRANSCRIPT_SESSION_KEY_CACHE.keys().next().value; + if (oldest !== undefined) { + TRANSCRIPT_SESSION_KEY_CACHE.delete(oldest); + } + } TRANSCRIPT_SESSION_KEY_CACHE.set(targetPath, resolvedKey); return resolvedKey; } diff --git a/src/infra/agent-events.test.ts b/src/infra/agent-events.test.ts index c5d308581e2..8cf77d7676e 100644 --- a/src/infra/agent-events.test.ts +++ b/src/infra/agent-events.test.ts @@ -232,3 +232,27 @@ describe("agent-events sequencing", () => { ]); }); }); + +test("clearAgentRunContext also cleans up seqByRun to prevent memory leak (#63643)", () => { + // Regression test: seqByRun entries were never deleted when a run ended, + // causing unbounded growth over time. + registerAgentRunContext("run-leak", { sessionKey: "main" }); + emitAgentEvent({ runId: "run-leak", stream: "lifecycle", data: {} }); + emitAgentEvent({ runId: "run-leak", stream: "lifecycle", data: {} }); + + // After clearing run context, the sequence counter should also be removed. + clearAgentRunContext("run-leak"); + + // Emitting a new event on the same runId should start seq from 1 again, + // proving the old entry was deleted. + const seqs: number[] = []; + const stop = onAgentEvent((evt) => { + if (evt.runId === "run-leak") { + seqs.push(evt.seq); + } + }); + emitAgentEvent({ runId: "run-leak", stream: "lifecycle", data: {} }); + stop(); + + expect(seqs).toEqual([1]); +}); diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index 996d047bb37..70ae8d0c456 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -165,8 +165,9 @@ export function getAgentRunContext(runId: string) { } export function clearAgentRunContext(runId: string) { - getAgentEventState().runContextById.delete(runId); - getAgentEventState().seqByRun.delete(runId); + const state = getAgentEventState(); + state.runContextById.delete(runId); + state.seqByRun.delete(runId); } /**