fix(gateway): plug long-running memory leaks

Prune stale gateway control-plane rate-limit buckets, bound transcript-session lookup caching, clear agent event sequence state with run contexts, and clear node wake/nudge state on disconnect.\n\nVerified locally after rebasing onto main:\n\n- pnpm test src/gateway/control-plane-rate-limit.test.ts src/gateway/session-transcript-key.test.ts src/infra/agent-events.test.ts src/gateway/server-methods/nodes.invoke-wake.test.ts\n- pnpm check\n\nCo-authored-by: lml2468 <39320777+lml2468@users.noreply.github.com>
This commit is contained in:
Menglin Li
2026-04-11 00:45:12 +08:00
committed by GitHub
parent 54ae138db7
commit 36c3a54b51
10 changed files with 230 additions and 3 deletions

View File

@@ -0,0 +1,55 @@
import { afterEach, describe, expect, test } from "vitest";
import {
consumeControlPlaneWriteBudget,
pruneStaleControlPlaneBuckets,
__testing,
} from "./control-plane-rate-limit.js";
describe("control-plane-rate-limit", () => {
afterEach(() => {
__testing.resetControlPlaneRateLimitState();
});
test("pruneStaleControlPlaneBuckets removes expired buckets (#63643)", () => {
// Create buckets at different times
const baseMs = 1_000_000;
consumeControlPlaneWriteBudget({
client: { connect: { device: { id: "dev-old" } }, clientIp: "1.2.3.4" } as never,
nowMs: baseMs,
});
consumeControlPlaneWriteBudget({
client: { connect: { device: { id: "dev-recent" } }, clientIp: "5.6.7.8" } as never,
nowMs: baseMs + 4 * 60_000,
});
// Prune at baseMs + 6 minutes — "dev-old" is > 5 min stale, "dev-recent" is only 2 min
const pruned = pruneStaleControlPlaneBuckets(baseMs + 6 * 60_000);
expect(pruned).toBe(1);
// "dev-recent" should still have budget
const result = consumeControlPlaneWriteBudget({
client: { connect: { device: { id: "dev-recent" } }, clientIp: "5.6.7.8" } as never,
nowMs: baseMs + 6 * 60_000,
});
expect(result.allowed).toBe(true);
});
test("pruneStaleControlPlaneBuckets is safe on empty map", () => {
expect(pruneStaleControlPlaneBuckets()).toBe(0);
});
test("control-plane bucket map stays bounded between prune sweeps", () => {
const baseMs = 2_000_000;
for (let i = 0; i < 10_001; i++) {
consumeControlPlaneWriteBudget({
client: {
connect: { device: { id: `dev-${i}` } },
clientIp: "1.2.3.4",
} as never,
nowMs: baseMs,
});
}
expect(__testing.getControlPlaneRateLimitBucketCount()).toBe(10_000);
});
});

View File

@@ -2,6 +2,9 @@ import type { GatewayClient } from "./server-methods/types.js";
const CONTROL_PLANE_RATE_LIMIT_MAX_REQUESTS = 3;
const CONTROL_PLANE_RATE_LIMIT_WINDOW_MS = 60_000;
const CONTROL_PLANE_BUCKET_MAX_STALE_MS = 5 * 60_000;
/** Hard cap to prevent memory DoS from rapid unique-key injection (CWE-400). */
const CONTROL_PLANE_BUCKET_MAX_ENTRIES = 10_000;
type Bucket = {
count: number;
@@ -45,6 +48,17 @@ export function consumeControlPlaneWriteBudget(params: {
const bucket = controlPlaneBuckets.get(key);
if (!bucket || nowMs - bucket.windowStartMs >= CONTROL_PLANE_RATE_LIMIT_WINDOW_MS) {
// Enforce hard cap before inserting a new key to bound memory usage
// even between periodic prune sweeps.
if (
!controlPlaneBuckets.has(key) &&
controlPlaneBuckets.size >= CONTROL_PLANE_BUCKET_MAX_ENTRIES
) {
const oldest = controlPlaneBuckets.keys().next().value;
if (oldest !== undefined) {
controlPlaneBuckets.delete(oldest);
}
}
controlPlaneBuckets.set(key, {
count: 1,
windowStartMs: nowMs,
@@ -79,7 +93,26 @@ export function consumeControlPlaneWriteBudget(params: {
};
}
/**
* Remove buckets whose rate-limit window expired more than
* CONTROL_PLANE_BUCKET_MAX_STALE_MS ago. Called periodically
* by the gateway maintenance timer to prevent unbounded growth.
*/
export function pruneStaleControlPlaneBuckets(nowMs = Date.now()): number {
let pruned = 0;
for (const [key, bucket] of controlPlaneBuckets) {
if (nowMs - bucket.windowStartMs > CONTROL_PLANE_BUCKET_MAX_STALE_MS) {
controlPlaneBuckets.delete(key);
pruned += 1;
}
}
return pruned;
}
export const __testing = {
getControlPlaneRateLimitBucketCount() {
return controlPlaneBuckets.size;
},
resetControlPlaneRateLimitState() {
controlPlaneBuckets.clear();
},

View File

@@ -2,6 +2,7 @@ import type { HealthSummary } from "../commands/health.js";
import { sweepStaleRunContexts } from "../infra/agent-events.js";
import { cleanOldMedia } from "../media/store.js";
import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js";
import { pruneStaleControlPlaneBuckets } from "./control-plane-rate-limit.js";
import type { ChatRunEntry } from "./server-chat.js";
import {
DEDUPE_MAX,
@@ -135,6 +136,10 @@ export function startGatewayMaintenanceTimers(params: {
params.chatDeltaLastBroadcastLen.delete(runId);
}
// Prune expired control-plane rate-limit buckets to prevent unbounded
// growth when many unique clients connect over time.
pruneStaleControlPlaneBuckets(now);
// Sweep stale buffers for runs that were never explicitly aborted.
// Only reap orphaned buffers after the abort controller is gone; active
// runs can legitimately sit idle while tools/models work.

View File

@@ -1,6 +1,11 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { ErrorCodes } from "../protocol/index.js";
import { maybeWakeNodeWithApns, nodeHandlers } from "./nodes.js";
import {
clearNodeWakeState,
maybeSendNodeWakeNudge,
maybeWakeNodeWithApns,
nodeHandlers,
} from "./nodes.js";
type MockNodeCommandPolicyParams = {
command: string;
@@ -313,6 +318,48 @@ describe("node.invoke APNs wake path", () => {
expect(mocks.sendApnsBackgroundWake).not.toHaveBeenCalled();
});
it("clears wake and nudge throttle state when a node disconnects", async () => {
mockDirectWakeConfig("ios-node-clear-wake");
mocks.sendApnsAlert.mockResolvedValue({
ok: true,
status: 200,
tokenSuffix: "1234abcd",
topic: "ai.openclaw.ios",
environment: "sandbox",
transport: "direct",
});
await expect(maybeWakeNodeWithApns("ios-node-clear-wake")).resolves.toMatchObject({
path: "sent",
throttled: false,
});
await expect(maybeSendNodeWakeNudge("ios-node-clear-wake")).resolves.toMatchObject({
sent: true,
throttled: false,
});
await expect(maybeWakeNodeWithApns("ios-node-clear-wake")).resolves.toMatchObject({
path: "throttled",
throttled: true,
});
await expect(maybeSendNodeWakeNudge("ios-node-clear-wake")).resolves.toMatchObject({
sent: false,
throttled: true,
});
clearNodeWakeState("ios-node-clear-wake");
await expect(maybeWakeNodeWithApns("ios-node-clear-wake")).resolves.toMatchObject({
path: "sent",
throttled: false,
});
await expect(maybeSendNodeWakeNudge("ios-node-clear-wake")).resolves.toMatchObject({
sent: true,
throttled: false,
});
expect(mocks.sendApnsBackgroundWake).toHaveBeenCalledTimes(2);
expect(mocks.sendApnsAlert).toHaveBeenCalledTimes(2);
});
it("wakes and retries invoke after the node reconnects", async () => {
vi.useFakeTimers();
mockDirectWakeConfig("ios-node-reconnect");

View File

@@ -518,6 +518,15 @@ export async function waitForNodeReconnect(params: {
return Boolean(params.context.nodeRegistry.get(params.nodeId));
}
/**
* Remove cached wake/nudge state for a node that has disconnected.
* Called from the WS close handler to prevent unbounded growth.
*/
export function clearNodeWakeState(nodeId: string): void {
nodeWakeById.delete(nodeId);
nodeWakeNudgeById.delete(nodeId);
}
export const nodeHandlers: GatewayRequestHandlers = {
"node.pair.request": async ({ params, respond, context }) => {
if (!validateNodePairRequestParams(params)) {

View File

@@ -3,6 +3,7 @@ import type { Socket } from "node:net";
import type { WebSocket, WebSocketServer } from "ws";
import { resolveCanvasHostUrl } from "../../infra/canvas-host-url.js";
import { removeRemoteNodeInfo } from "../../infra/skills-remote.js";
import { clearNodeWakeState } from "../server-methods/nodes.js";
import { upsertPresence } from "../../infra/system-presence.js";
import type { createSubsystemLogger } from "../../logging/subsystem.js";
import { normalizeLowercaseStringOrEmpty } from "../../shared/string-coerce.js";
@@ -325,6 +326,7 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
if (nodeId) {
removeRemoteNodeInfo(nodeId);
context.nodeUnsubscribeAll(nodeId);
clearNodeWakeState(nodeId);
}
}
logWs("out", "close", {

View File

@@ -141,4 +141,44 @@ describe("resolveSessionKeyForTranscriptFile", () => {
expect(resolveSessionKeyForTranscriptFile("/tmp/shared.jsonl")).toBe("agent:main:newer");
});
it("evicts oldest entry when cache exceeds 256 entries (#63643)", () => {
// Fill cache with 256 unique transcript paths
for (let i = 0; i < 256; i++) {
const sessionKey = `agent:main:session-${i}`;
const transcriptPath = `/tmp/session-${i}.jsonl`;
const store = {
[sessionKey]: { sessionId: `sid-${i}`, updatedAt: now + i },
} satisfies Record<string, SessionEntry>;
loadCombinedSessionStoreForGatewayMock.mockReturnValue({
storePath: "(multiple)",
store,
});
resolveSessionTranscriptCandidatesMock.mockReturnValue([transcriptPath]);
resolveSessionKeyForTranscriptFile(transcriptPath);
}
// Now add the 257th — should evict session-0
const overflowKey = "agent:main:session-overflow";
const overflowPath = "/tmp/session-overflow.jsonl";
const overflowStore = {
[overflowKey]: { sessionId: "sid-overflow", updatedAt: now + 999 },
} satisfies Record<string, SessionEntry>;
loadCombinedSessionStoreForGatewayMock.mockReturnValue({
storePath: "(multiple)",
store: overflowStore,
});
resolveSessionTranscriptCandidatesMock.mockReturnValue([overflowPath]);
expect(resolveSessionKeyForTranscriptFile(overflowPath)).toBe(overflowKey);
// session-0 should have been evicted from cache — next lookup will
// re-resolve from the store (returns undefined since store was mocked
// with only the overflow entry).
loadCombinedSessionStoreForGatewayMock.mockReturnValue({
storePath: "(multiple)",
store: overflowStore,
});
resolveSessionTranscriptCandidatesMock.mockReturnValue([]);
expect(resolveSessionKeyForTranscriptFile("/tmp/session-0.jsonl")).toBeUndefined();
});
});

View File

@@ -12,6 +12,7 @@ import {
} from "./session-utils.js";
const TRANSCRIPT_SESSION_KEY_CACHE = new Map<string, string>();
const TRANSCRIPT_SESSION_KEY_CACHE_MAX = 256;
function resolveTranscriptPathForComparison(value: string | undefined): string | undefined {
const trimmed = normalizeOptionalString(value);
@@ -135,6 +136,16 @@ export function resolveSessionKeyForTranscriptFile(sessionFile: string): string
? freshestMatch?.key
: undefined;
if (resolvedKey) {
// Evict oldest-inserted entry when cache exceeds size cap (FIFO bound).
if (
!TRANSCRIPT_SESSION_KEY_CACHE.has(targetPath) &&
TRANSCRIPT_SESSION_KEY_CACHE.size >= TRANSCRIPT_SESSION_KEY_CACHE_MAX
) {
const oldest = TRANSCRIPT_SESSION_KEY_CACHE.keys().next().value;
if (oldest !== undefined) {
TRANSCRIPT_SESSION_KEY_CACHE.delete(oldest);
}
}
TRANSCRIPT_SESSION_KEY_CACHE.set(targetPath, resolvedKey);
return resolvedKey;
}

View File

@@ -232,3 +232,27 @@ describe("agent-events sequencing", () => {
]);
});
});
test("clearAgentRunContext also cleans up seqByRun to prevent memory leak (#63643)", () => {
// Regression test: seqByRun entries were never deleted when a run ended,
// causing unbounded growth over time.
registerAgentRunContext("run-leak", { sessionKey: "main" });
emitAgentEvent({ runId: "run-leak", stream: "lifecycle", data: {} });
emitAgentEvent({ runId: "run-leak", stream: "lifecycle", data: {} });
// After clearing run context, the sequence counter should also be removed.
clearAgentRunContext("run-leak");
// Emitting a new event on the same runId should start seq from 1 again,
// proving the old entry was deleted.
const seqs: number[] = [];
const stop = onAgentEvent((evt) => {
if (evt.runId === "run-leak") {
seqs.push(evt.seq);
}
});
emitAgentEvent({ runId: "run-leak", stream: "lifecycle", data: {} });
stop();
expect(seqs).toEqual([1]);
});

View File

@@ -165,8 +165,9 @@ export function getAgentRunContext(runId: string) {
}
export function clearAgentRunContext(runId: string) {
getAgentEventState().runContextById.delete(runId);
getAgentEventState().seqByRun.delete(runId);
const state = getAgentEventState();
state.runContextById.delete(runId);
state.seqByRun.delete(runId);
}
/**