mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-24 08:21:39 +00:00
fix: guard stale chat buffer sweep (#52428) (thanks @karanuppal)
This commit is contained in:
@@ -118,6 +118,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Inbound policy hardening: tighten callback and webhook sender checks across Mattermost and Google Chat, match Nextcloud Talk rooms by stable room token, and treat explicit empty Twitch allowlists as deny-all. (#46787) Thanks @zpbrent, @ijxpwastaken and @vincentkoc.
|
||||
- Webhooks/runtime: move auth earlier and tighten pre-auth body limits and timeouts across bundled webhook handlers, including slow-body handling for Mattermost slash commands. (#46802) Thanks @vincentkoc.
|
||||
- Email/webhook wrapping: sanitize sender and subject metadata before external-content wrapping so metadata fields cannot break the wrapper structure. (#46816) Thanks @vincentkoc.
|
||||
- Gateway/chat: only reap orphaned stale chat buffers after the abort controller is gone, and clear abort-time streaming metadata so long-running sessions do not lose buffered output while stale maps still get reclaimed. (#52428) Thanks @karanuppal.
|
||||
- Tools/apply-patch: revalidate workspace-only delete and directory targets immediately before mutating host paths. (#46803) Thanks @vincentkoc.
|
||||
- Gateway/config views: strip embedded credentials from URL-based endpoint fields before returning read-only account and config snapshots. (#46799) Thanks @vincentkoc.
|
||||
- ACP/approvals: use canonical tool identity for prompting decisions and fail closed when conflicting tool identity hints are present. (#46817) Thanks @zpbrent and @vincentkoc.
|
||||
|
||||
@@ -35,6 +35,7 @@ function createOps(params: {
|
||||
chatAbortControllers: new Map([[runId, entry]]),
|
||||
chatRunBuffers: new Map(buffer !== undefined ? [[runId, buffer]] : []),
|
||||
chatDeltaSentAt: new Map([[runId, Date.now()]]),
|
||||
chatDeltaLastBroadcastLen: new Map([[runId, buffer?.length ?? 0]]),
|
||||
chatAbortedRuns: new Map(),
|
||||
removeChatRun,
|
||||
agentRunSeq: new Map(),
|
||||
@@ -78,6 +79,7 @@ describe("abortChatRunById", () => {
|
||||
expect(ops.chatAbortControllers.has(runId)).toBe(false);
|
||||
expect(ops.chatRunBuffers.has(runId)).toBe(false);
|
||||
expect(ops.chatDeltaSentAt.has(runId)).toBe(false);
|
||||
expect(ops.chatDeltaLastBroadcastLen.has(runId)).toBe(false);
|
||||
expect(ops.removeChatRun).toHaveBeenCalledWith(runId, runId, sessionKey);
|
||||
expect(ops.agentRunSeq.has(runId)).toBe(false);
|
||||
expect(ops.agentRunSeq.has("client-run-1")).toBe(false);
|
||||
|
||||
@@ -33,6 +33,7 @@ export type ChatAbortOps = {
|
||||
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
|
||||
chatRunBuffers: Map<string, string>;
|
||||
chatDeltaSentAt: Map<string, number>;
|
||||
chatDeltaLastBroadcastLen: Map<string, number>;
|
||||
chatAbortedRuns: Map<string, number>;
|
||||
removeChatRun: (
|
||||
sessionId: string,
|
||||
@@ -96,6 +97,7 @@ export function abortChatRunById(
|
||||
ops.chatAbortControllers.delete(runId);
|
||||
ops.chatRunBuffers.delete(runId);
|
||||
ops.chatDeltaSentAt.delete(runId);
|
||||
ops.chatDeltaLastBroadcastLen.delete(runId);
|
||||
const removed = ops.removeChatRun(runId, runId, sessionKey);
|
||||
broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText });
|
||||
ops.agentRunSeq.delete(runId);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { HealthSummary } from "../commands/health.js";
|
||||
import type { ChatAbortControllerEntry } from "./chat-abort.js";
|
||||
|
||||
const cleanOldMediaMock = vi.fn(async () => {});
|
||||
|
||||
@@ -12,6 +13,18 @@ vi.mock("../media/store.js", async (importOriginal) => {
|
||||
});
|
||||
|
||||
const MEDIA_CLEANUP_TTL_MS = 24 * 60 * 60_000;
|
||||
const ABORTED_RUN_TTL_MS = 60 * 60_000;
|
||||
|
||||
function createActiveRun(sessionKey: string): ChatAbortControllerEntry {
|
||||
const now = Date.now();
|
||||
return {
|
||||
controller: new AbortController(),
|
||||
sessionId: "sess-1",
|
||||
sessionKey,
|
||||
startedAtMs: now,
|
||||
expiresAtMs: now + ABORTED_RUN_TTL_MS,
|
||||
};
|
||||
}
|
||||
|
||||
function createMaintenanceTimerDeps() {
|
||||
return {
|
||||
@@ -124,4 +137,70 @@ describe("startGatewayMaintenanceTimers", () => {
|
||||
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("keeps stale buffers for active runs that still have abort controllers", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));
|
||||
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
|
||||
const deps = createMaintenanceTimerDeps();
|
||||
const runId = "run-active";
|
||||
deps.chatAbortControllers.set(runId, createActiveRun("main"));
|
||||
deps.chatRunBuffers.set(runId, "buffer");
|
||||
deps.chatDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1);
|
||||
deps.chatDeltaLastBroadcastLen.set(runId, 6);
|
||||
|
||||
const timers = startGatewayMaintenanceTimers(deps);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60_000);
|
||||
|
||||
expect(deps.chatRunBuffers.get(runId)).toBe("buffer");
|
||||
expect(deps.chatDeltaSentAt.has(runId)).toBe(true);
|
||||
expect(deps.chatDeltaLastBroadcastLen.get(runId)).toBe(6);
|
||||
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("sweeps orphaned stale buffers once the abort controller is gone", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));
|
||||
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
|
||||
const deps = createMaintenanceTimerDeps();
|
||||
const runId = "run-orphaned";
|
||||
deps.chatRunBuffers.set(runId, "buffer");
|
||||
deps.chatDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1);
|
||||
deps.chatDeltaLastBroadcastLen.set(runId, 6);
|
||||
|
||||
const timers = startGatewayMaintenanceTimers(deps);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60_000);
|
||||
|
||||
expect(deps.chatRunBuffers.has(runId)).toBe(false);
|
||||
expect(deps.chatDeltaSentAt.has(runId)).toBe(false);
|
||||
expect(deps.chatDeltaLastBroadcastLen.has(runId)).toBe(false);
|
||||
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("clears deltaLastBroadcastLen when aborted runs age out", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));
|
||||
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
|
||||
const deps = createMaintenanceTimerDeps();
|
||||
const runId = "run-aborted";
|
||||
deps.chatRunState.abortedRuns.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1);
|
||||
deps.chatRunBuffers.set(runId, "buffer");
|
||||
deps.chatDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1);
|
||||
deps.chatDeltaLastBroadcastLen.set(runId, 6);
|
||||
|
||||
const timers = startGatewayMaintenanceTimers(deps);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60_000);
|
||||
|
||||
expect(deps.chatRunState.abortedRuns.has(runId)).toBe(false);
|
||||
expect(deps.chatRunBuffers.has(runId)).toBe(false);
|
||||
expect(deps.chatDeltaSentAt.has(runId)).toBe(false);
|
||||
expect(deps.chatDeltaLastBroadcastLen.has(runId)).toBe(false);
|
||||
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -112,6 +112,7 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
chatAbortControllers: params.chatAbortControllers,
|
||||
chatRunBuffers: params.chatRunBuffers,
|
||||
chatDeltaSentAt: params.chatDeltaSentAt,
|
||||
chatDeltaLastBroadcastLen: params.chatDeltaLastBroadcastLen,
|
||||
chatAbortedRuns: params.chatRunState.abortedRuns,
|
||||
removeChatRun: params.removeChatRun,
|
||||
agentRunSeq: params.agentRunSeq,
|
||||
@@ -130,15 +131,19 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
params.chatRunState.abortedRuns.delete(runId);
|
||||
params.chatRunBuffers.delete(runId);
|
||||
params.chatDeltaSentAt.delete(runId);
|
||||
params.chatDeltaLastBroadcastLen.delete(runId);
|
||||
}
|
||||
|
||||
// Sweep stale buffers for runs that were never explicitly aborted.
|
||||
// If a buffer has not been updated for longer than ABORTED_RUN_TTL_MS,
|
||||
// the run is stuck — clean up its state to prevent unbounded heap growth.
|
||||
// Only reap orphaned buffers after the abort controller is gone; active
|
||||
// runs can legitimately sit idle while tools/models work.
|
||||
for (const [runId, lastSentAt] of params.chatDeltaSentAt) {
|
||||
if (params.chatRunState.abortedRuns.has(runId)) {
|
||||
continue; // already handled above
|
||||
}
|
||||
if (params.chatAbortControllers.has(runId)) {
|
||||
continue;
|
||||
}
|
||||
if (now - lastSentAt <= ABORTED_RUN_TTL_MS) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -885,6 +885,7 @@ function createChatAbortOps(context: GatewayRequestContext): ChatAbortOps {
|
||||
chatAbortControllers: context.chatAbortControllers,
|
||||
chatRunBuffers: context.chatRunBuffers,
|
||||
chatDeltaSentAt: context.chatDeltaSentAt,
|
||||
chatDeltaLastBroadcastLen: context.chatDeltaLastBroadcastLen,
|
||||
chatAbortedRuns: context.chatAbortedRuns,
|
||||
removeChatRun: context.removeChatRun,
|
||||
agentRunSeq: context.agentRunSeq,
|
||||
|
||||
@@ -61,6 +61,7 @@ export type GatewayRequestContext = {
|
||||
chatAbortedRuns: Map<string, number>;
|
||||
chatRunBuffers: Map<string, string>;
|
||||
chatDeltaSentAt: Map<string, number>;
|
||||
chatDeltaLastBroadcastLen: Map<string, number>;
|
||||
addChatRun: (sessionId: string, entry: { sessionKey: string; clientRunId: string }) => void;
|
||||
removeChatRun: (
|
||||
sessionId: string,
|
||||
|
||||
@@ -1101,6 +1101,7 @@ export async function startGatewayServer(
|
||||
chatAbortedRuns: chatRunState.abortedRuns,
|
||||
chatRunBuffers: chatRunState.buffers,
|
||||
chatDeltaSentAt: chatRunState.deltaSentAt,
|
||||
chatDeltaLastBroadcastLen: chatRunState.deltaLastBroadcastLen,
|
||||
addChatRun,
|
||||
removeChatRun,
|
||||
subscribeSessionEvents: sessionEventSubscribers.subscribe,
|
||||
|
||||
Reference in New Issue
Block a user