diff --git a/CHANGELOG.md b/CHANGELOG.md index f9b2bf1e56e..810418e54de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -118,6 +118,7 @@ Docs: https://docs.openclaw.ai - Inbound policy hardening: tighten callback and webhook sender checks across Mattermost and Google Chat, match Nextcloud Talk rooms by stable room token, and treat explicit empty Twitch allowlists as deny-all. (#46787) Thanks @zpbrent, @ijxpwastaken and @vincentkoc. - Webhooks/runtime: move auth earlier and tighten pre-auth body limits and timeouts across bundled webhook handlers, including slow-body handling for Mattermost slash commands. (#46802) Thanks @vincentkoc. - Email/webhook wrapping: sanitize sender and subject metadata before external-content wrapping so metadata fields cannot break the wrapper structure. (#46816) Thanks @vincentkoc. +- Gateway/chat: only reap orphaned stale chat buffers after the abort controller is gone, and clear abort-time streaming metadata so long-running sessions do not lose buffered output while stale maps still get reclaimed. (#52428) Thanks @karanuppal. - Tools/apply-patch: revalidate workspace-only delete and directory targets immediately before mutating host paths. (#46803) Thanks @vincentkoc. - Gateway/config views: strip embedded credentials from URL-based endpoint fields before returning read-only account and config snapshots. (#46799) Thanks @vincentkoc. - ACP/approvals: use canonical tool identity for prompting decisions and fail closed when conflicting tool identity hints are present. (#46817) Thanks @zpbrent and @vincentkoc. diff --git a/src/gateway/chat-abort.test.ts b/src/gateway/chat-abort.test.ts index f3aff5ebfe5..e1a20047bdd 100644 --- a/src/gateway/chat-abort.test.ts +++ b/src/gateway/chat-abort.test.ts @@ -35,6 +35,7 @@ function createOps(params: { chatAbortControllers: new Map([[runId, entry]]), chatRunBuffers: new Map(buffer !== undefined ? [[runId, buffer]] : []), chatDeltaSentAt: new Map([[runId, Date.now()]]), + chatDeltaLastBroadcastLen: new Map([[runId, buffer?.length ?? 0]]), chatAbortedRuns: new Map(), removeChatRun, agentRunSeq: new Map(), @@ -78,6 +79,7 @@ describe("abortChatRunById", () => { expect(ops.chatAbortControllers.has(runId)).toBe(false); expect(ops.chatRunBuffers.has(runId)).toBe(false); expect(ops.chatDeltaSentAt.has(runId)).toBe(false); + expect(ops.chatDeltaLastBroadcastLen.has(runId)).toBe(false); expect(ops.removeChatRun).toHaveBeenCalledWith(runId, runId, sessionKey); expect(ops.agentRunSeq.has(runId)).toBe(false); expect(ops.agentRunSeq.has("client-run-1")).toBe(false); diff --git a/src/gateway/chat-abort.ts b/src/gateway/chat-abort.ts index e74439f13df..25c0cde1150 100644 --- a/src/gateway/chat-abort.ts +++ b/src/gateway/chat-abort.ts @@ -33,6 +33,7 @@ export type ChatAbortOps = { chatAbortControllers: Map; chatRunBuffers: Map; chatDeltaSentAt: Map; + chatDeltaLastBroadcastLen: Map; chatAbortedRuns: Map; removeChatRun: ( sessionId: string, @@ -96,6 +97,7 @@ export function abortChatRunById( ops.chatAbortControllers.delete(runId); ops.chatRunBuffers.delete(runId); ops.chatDeltaSentAt.delete(runId); + ops.chatDeltaLastBroadcastLen.delete(runId); const removed = ops.removeChatRun(runId, runId, sessionKey); broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText }); ops.agentRunSeq.delete(runId); diff --git a/src/gateway/server-maintenance.test.ts b/src/gateway/server-maintenance.test.ts index fef324eea29..850a23f4d39 100644 --- a/src/gateway/server-maintenance.test.ts +++ b/src/gateway/server-maintenance.test.ts @@ -1,5 +1,6 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import type { HealthSummary } from "../commands/health.js"; +import type { ChatAbortControllerEntry } from "./chat-abort.js"; const cleanOldMediaMock = vi.fn(async () => {}); @@ -12,6 +13,18 @@ vi.mock("../media/store.js", async (importOriginal) => { }); const MEDIA_CLEANUP_TTL_MS = 24 * 60 * 60_000; +const ABORTED_RUN_TTL_MS = 60 * 60_000; + +function createActiveRun(sessionKey: string): ChatAbortControllerEntry { + const now = Date.now(); + return { + controller: new AbortController(), + sessionId: "sess-1", + sessionKey, + startedAtMs: now, + expiresAtMs: now + ABORTED_RUN_TTL_MS, + }; +} function createMaintenanceTimerDeps() { return { @@ -124,4 +137,70 @@ describe("startGatewayMaintenanceTimers", () => { stopMaintenanceTimers(timers); }); + + it("keeps stale buffers for active runs that still have abort controllers", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const runId = "run-active"; + deps.chatAbortControllers.set(runId, createActiveRun("main")); + deps.chatRunBuffers.set(runId, "buffer"); + deps.chatDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1); + deps.chatDeltaLastBroadcastLen.set(runId, 6); + + const timers = startGatewayMaintenanceTimers(deps); + + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.chatRunBuffers.get(runId)).toBe("buffer"); + expect(deps.chatDeltaSentAt.has(runId)).toBe(true); + expect(deps.chatDeltaLastBroadcastLen.get(runId)).toBe(6); + + stopMaintenanceTimers(timers); + }); + + it("sweeps orphaned stale buffers once the abort controller is gone", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const runId = "run-orphaned"; + deps.chatRunBuffers.set(runId, "buffer"); + deps.chatDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1); + deps.chatDeltaLastBroadcastLen.set(runId, 6); + + const timers = startGatewayMaintenanceTimers(deps); + + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.chatRunBuffers.has(runId)).toBe(false); + expect(deps.chatDeltaSentAt.has(runId)).toBe(false); + expect(deps.chatDeltaLastBroadcastLen.has(runId)).toBe(false); + + stopMaintenanceTimers(timers); + }); + + it("clears deltaLastBroadcastLen when aborted runs age out", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const runId = "run-aborted"; + deps.chatRunState.abortedRuns.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1); + deps.chatRunBuffers.set(runId, "buffer"); + deps.chatDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1); + deps.chatDeltaLastBroadcastLen.set(runId, 6); + + const timers = startGatewayMaintenanceTimers(deps); + + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.chatRunState.abortedRuns.has(runId)).toBe(false); + expect(deps.chatRunBuffers.has(runId)).toBe(false); + expect(deps.chatDeltaSentAt.has(runId)).toBe(false); + expect(deps.chatDeltaLastBroadcastLen.has(runId)).toBe(false); + + stopMaintenanceTimers(timers); + }); }); diff --git a/src/gateway/server-maintenance.ts b/src/gateway/server-maintenance.ts index 190fdfea161..9257ced55d0 100644 --- a/src/gateway/server-maintenance.ts +++ b/src/gateway/server-maintenance.ts @@ -112,6 +112,7 @@ export function startGatewayMaintenanceTimers(params: { chatAbortControllers: params.chatAbortControllers, chatRunBuffers: params.chatRunBuffers, chatDeltaSentAt: params.chatDeltaSentAt, + chatDeltaLastBroadcastLen: params.chatDeltaLastBroadcastLen, chatAbortedRuns: params.chatRunState.abortedRuns, removeChatRun: params.removeChatRun, agentRunSeq: params.agentRunSeq, @@ -130,15 +131,19 @@ export function startGatewayMaintenanceTimers(params: { params.chatRunState.abortedRuns.delete(runId); params.chatRunBuffers.delete(runId); params.chatDeltaSentAt.delete(runId); + params.chatDeltaLastBroadcastLen.delete(runId); } // Sweep stale buffers for runs that were never explicitly aborted. - // If a buffer has not been updated for longer than ABORTED_RUN_TTL_MS, - // the run is stuck — clean up its state to prevent unbounded heap growth. + // Only reap orphaned buffers after the abort controller is gone; active + // runs can legitimately sit idle while tools/models work. for (const [runId, lastSentAt] of params.chatDeltaSentAt) { if (params.chatRunState.abortedRuns.has(runId)) { continue; // already handled above } + if (params.chatAbortControllers.has(runId)) { + continue; + } if (now - lastSentAt <= ABORTED_RUN_TTL_MS) { continue; } diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 94dd856c828..f18ee70b9ef 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -885,6 +885,7 @@ function createChatAbortOps(context: GatewayRequestContext): ChatAbortOps { chatAbortControllers: context.chatAbortControllers, chatRunBuffers: context.chatRunBuffers, chatDeltaSentAt: context.chatDeltaSentAt, + chatDeltaLastBroadcastLen: context.chatDeltaLastBroadcastLen, chatAbortedRuns: context.chatAbortedRuns, removeChatRun: context.removeChatRun, agentRunSeq: context.agentRunSeq, diff --git a/src/gateway/server-methods/types.ts b/src/gateway/server-methods/types.ts index 39a6f458a5f..df80453961c 100644 --- a/src/gateway/server-methods/types.ts +++ b/src/gateway/server-methods/types.ts @@ -61,6 +61,7 @@ export type GatewayRequestContext = { chatAbortedRuns: Map; chatRunBuffers: Map; chatDeltaSentAt: Map; + chatDeltaLastBroadcastLen: Map; addChatRun: (sessionId: string, entry: { sessionKey: string; clientRunId: string }) => void; removeChatRun: ( sessionId: string, diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index bf3097e2289..20dfe75dc50 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -1101,6 +1101,7 @@ export async function startGatewayServer( chatAbortedRuns: chatRunState.abortedRuns, chatRunBuffers: chatRunState.buffers, chatDeltaSentAt: chatRunState.deltaSentAt, + chatDeltaLastBroadcastLen: chatRunState.deltaLastBroadcastLen, addChatRun, removeChatRun, subscribeSessionEvents: sessionEventSubscribers.subscribe,