From bb8aa0cfe2b0bf8ff0f9d00c96df1b129c6a030a Mon Sep 17 00:00:00 2001 From: samzong Date: Thu, 14 May 2026 13:21:46 +0800 Subject: [PATCH] [Fix] Throttle agent event fanout (#80335) Merged via squash. Prepared head SHA: 5dddb405ad9c9d8ea82833351a2267011b1d69b9 Co-authored-by: samzong <13782141+samzong@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 1 + src/config/sessions/transcript.test.ts | 74 +++++ src/config/sessions/transcript.ts | 1 - src/gateway/chat-abort.test.ts | 17 + src/gateway/chat-abort.ts | 9 + src/gateway/server-chat-state.ts | 15 + src/gateway/server-chat.agent-events.test.ts | 302 ++++++++++++++++++ src/gateway/server-chat.ts | 179 ++++++++++- src/gateway/server-maintenance.test.ts | 46 ++- src/gateway/server-maintenance.ts | 43 ++- src/gateway/server-methods/agent.test.ts | 2 + .../chat.abort-authorization.test.ts | 39 +++ .../server-methods/chat.abort.test-helpers.ts | 4 + .../chat.directive-tags.test.ts | 4 + src/gateway/server-methods/chat.ts | 2 + src/gateway/server-methods/shared-types.ts | 3 + src/gateway/server-request-context.test.ts | 2 + src/gateway/server-request-context.ts | 4 + src/gateway/server-startup-early.test.ts | 7 +- src/gateway/server.impl.ts | 2 + src/gateway/talk-realtime-relay.test.ts | 63 +++- src/plugins/registry.runtime-config.test.ts | 64 ++-- src/plugins/registry.ts | 7 +- 23 files changed, 845 insertions(+), 45 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2070b038ba5..81e85eab869 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -150,6 +150,7 @@ Docs: https://docs.openclaw.ai - Plugins doctor: report stale plugin config warnings and avoid claiming full plugin health when config warnings remain. (#81515) Thanks @BKF-Gitty. - Sessions: display `model: "-acp"` / `modelProvider: "acpx"` (ACP-runtime sentinel) for ACP control-plane sessions in `openclaw sessions` output, instead of the agent's configured model which was misleading. Catalog finding 20. (#79543) - Slack: normalize message read `before` and `after` timestamp bounds before calling Slack history or thread reply APIs. Fixes #80835. (#81338) Thanks @honor2030. +- Gateway: throttle assistant/thinking agent event fanout during streaming bursts without dropping buffered deltas. (#80335) Thanks @samzong. ### Changes diff --git a/src/config/sessions/transcript.test.ts b/src/config/sessions/transcript.test.ts index b8d3c42687a..57318efb900 100644 --- a/src/config/sessions/transcript.test.ts +++ b/src/config/sessions/transcript.test.ts @@ -1,5 +1,6 @@ import fs from "node:fs"; import { describe, expect, it, vi } from "vitest"; +import { repairToolUseResultPairing } from "../../agents/session-transcript-repair.js"; import * as transcriptEvents from "../../sessions/transcript-events.js"; import type { SessionTranscriptUpdate } from "../../sessions/transcript-events.js"; import { resolveSessionTranscriptPathInDir } from "./paths.js"; @@ -18,6 +19,7 @@ describe("appendAssistantMessageToSessionTranscript", () => { type ExactAssistantMessage = Parameters< typeof appendExactAssistantMessageToSessionTranscript >[0]["message"]; + type TranscriptRepairMessage = Parameters[0][number]; type TranscriptUpdateEmitterSpy = { mock: { calls: [string | SessionTranscriptUpdate][]; @@ -283,6 +285,78 @@ describe("appendAssistantMessageToSessionTranscript", () => { } }); + it("keeps delivery mirrors in transcripts while repair preserves real tool results", async () => { + writeTranscriptStore(); + const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir()); + const toolCallId = "call_maniple_list"; + + const toolCallResult = await appendSessionTranscriptMessage({ + transcriptPath: sessionFile, + message: { + role: "assistant", + content: [ + { + type: "toolCall", + id: toolCallId, + name: "maniple__list_workers", + arguments: {}, + }, + ], + stopReason: "toolUse", + }, + }); + + const mirrorResult = await appendAssistantMessageToSessionTranscript({ + sessionKey, + text: "Maniple List Workers", + storePath: fixture.storePath(), + }); + + expect(mirrorResult.ok).toBe(true); + if (!mirrorResult.ok) { + return; + } + expect(mirrorResult.messageId).not.toBe(toolCallResult.messageId); + const linesAfterMirror = fs.readFileSync(sessionFile, "utf-8").trim().split("\n"); + expect(linesAfterMirror).toHaveLength(3); + const mirrorLine = JSON.parse(linesAfterMirror[2]); + expect(mirrorLine.message.model).toBe("delivery-mirror"); + + await appendSessionTranscriptMessage({ + transcriptPath: sessionFile, + message: { + role: "toolResult", + toolCallId, + toolName: "maniple__list_workers", + content: [{ type: "text", text: "workers listed" }], + isError: false, + }, + }); + + const messages = fs + .readFileSync(sessionFile, "utf-8") + .trim() + .split("\n") + .map((line) => JSON.parse(line) as { message?: TranscriptRepairMessage }) + .flatMap((entry) => (entry.message ? [entry.message] : [])); + expect(messages.map((message) => message.role)).toEqual([ + "assistant", + "assistant", + "toolResult", + ]); + const repair = repairToolUseResultPairing(messages, { + missingToolResultText: "aborted", + }); + + expect(repair.added).toHaveLength(0); + expect(repair.messages.map((message) => message.role)).toEqual([ + "assistant", + "toolResult", + "assistant", + ]); + expect((repair.messages[2] as { model?: string }).model).toBe("delivery-mirror"); + }); + it("finds session entry using normalized (lowercased) key", async () => { const storeKey = "agent:main:imessage:direct:+15551234567"; const store = { diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index 950c389156f..8896694858f 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -302,7 +302,6 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { if (latestEquivalentAssistantId) { return { ok: true, sessionFile, messageId: latestEquivalentAssistantId }; } - const message = { ...params.message, ...(explicitIdempotencyKey ? { idempotencyKey: explicitIdempotencyKey } : {}), diff --git a/src/gateway/chat-abort.test.ts b/src/gateway/chat-abort.test.ts index f85b8f7033f..41e0d0eb753 100644 --- a/src/gateway/chat-abort.test.ts +++ b/src/gateway/chat-abort.test.ts @@ -54,6 +54,21 @@ function createOps(params: { chatDeltaSentAt: new Map([[runId, Date.now()]]), chatDeltaLastBroadcastLen: new Map([[runId, buffer?.length ?? 0]]), chatDeltaLastBroadcastText: new Map(buffer !== undefined ? [[runId, buffer]] : []), + agentDeltaSentAt: new Map([[`${runId}:assistant`, Date.now()]]), + bufferedAgentEvents: new Map([ + [ + `${runId}:assistant`, + { + payload: { + runId, + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "buffer", delta: "buffer" }, + }, + }, + ], + ]), chatAbortedRuns: new Map(), removeChatRun, agentRunSeq: new Map(), @@ -110,6 +125,8 @@ describe("abortChatRunById", () => { expect(ops.chatDeltaSentAt.has(runId)).toBe(false); expect(ops.chatDeltaLastBroadcastLen.has(runId)).toBe(false); expect(ops.chatDeltaLastBroadcastText.has(runId)).toBe(false); + expect(ops.agentDeltaSentAt?.has(`${runId}:assistant`)).toBe(false); + expect(ops.bufferedAgentEvents?.has(`${runId}:assistant`)).toBe(false); expect(ops.removeChatRun).toHaveBeenCalledWith(runId, runId, sessionKey); expect(ops.agentRunSeq.has(runId)).toBe(false); expect(ops.agentRunSeq.has("client-run-1")).toBe(false); diff --git a/src/gateway/chat-abort.ts b/src/gateway/chat-abort.ts index bbea420ee13..e3eb1a6ddfd 100644 --- a/src/gateway/chat-abort.ts +++ b/src/gateway/chat-abort.ts @@ -1,5 +1,6 @@ import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js"; import { emitAgentEvent } from "../infra/agent-events.js"; +import type { BufferedAgentEvent } from "./server-chat-state.js"; const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000; @@ -113,6 +114,8 @@ export type ChatAbortOps = { chatDeltaSentAt: Map; chatDeltaLastBroadcastLen: Map; chatDeltaLastBroadcastText: Map; + agentDeltaSentAt: Map; + bufferedAgentEvents: Map; chatAbortedRuns: Map; removeChatRun: ( sessionId: string, @@ -178,6 +181,12 @@ export function abortChatRunById( ops.chatDeltaSentAt.delete(runId); ops.chatDeltaLastBroadcastLen.delete(runId); ops.chatDeltaLastBroadcastText.delete(runId); + ops.agentDeltaSentAt.delete(runId); + ops.agentDeltaSentAt.delete(`${runId}:assistant`); + ops.agentDeltaSentAt.delete(`${runId}:thinking`); + ops.bufferedAgentEvents.delete(runId); + ops.bufferedAgentEvents.delete(`${runId}:assistant`); + ops.bufferedAgentEvents.delete(`${runId}:thinking`); const removed = ops.removeChatRun(runId, runId, sessionKey); broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText }); emitAgentEvent({ diff --git a/src/gateway/server-chat-state.ts b/src/gateway/server-chat-state.ts index 96978ccf98c..39deb2fdde3 100644 --- a/src/gateway/server-chat-state.ts +++ b/src/gateway/server-chat-state.ts @@ -1,8 +1,15 @@ +import type { AgentEventPayload } from "../infra/agent-events.js"; + export type ChatRunEntry = { sessionKey: string; clientRunId: string; }; +export type BufferedAgentEvent = { + sessionKey?: string; + payload: AgentEventPayload & { spawnedBy?: string }; +}; + export type ChatRunRegistry = { add: (sessionId: string, entry: ChatRunEntry) => void; peek: (sessionId: string) => ChatRunEntry | undefined; @@ -71,6 +78,8 @@ export type ChatRunState = { /** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */ deltaLastBroadcastLen: Map; deltaLastBroadcastText: Map; + agentDeltaSentAt: Map; + bufferedAgentEvents: Map; abortedRuns: Map; clear: () => void; }; @@ -82,6 +91,8 @@ export function createChatRunState(): ChatRunState { const deltaSentAt = new Map(); const deltaLastBroadcastLen = new Map(); const deltaLastBroadcastText = new Map(); + const agentDeltaSentAt = new Map(); + const bufferedAgentEvents = new Map(); const abortedRuns = new Map(); const clear = () => { @@ -91,6 +102,8 @@ export function createChatRunState(): ChatRunState { deltaSentAt.clear(); deltaLastBroadcastLen.clear(); deltaLastBroadcastText.clear(); + agentDeltaSentAt.clear(); + bufferedAgentEvents.clear(); abortedRuns.clear(); }; @@ -101,6 +114,8 @@ export function createChatRunState(): ChatRunState { deltaSentAt, deltaLastBroadcastLen, deltaLastBroadcastText, + agentDeltaSentAt, + bufferedAgentEvents, abortedRuns, clear, }; diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index ddeb1aa49aa..dc71356fedf 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -141,10 +141,18 @@ describe("agent event handler", () => { return broadcast.mock.calls.filter(([event]) => event === "chat"); } + function agentBroadcastCalls(broadcast: ReturnType) { + return broadcast.mock.calls.filter(([event]) => event === "agent"); + } + function sessionChatCalls(nodeSendToSession: ReturnType) { return nodeSendToSession.mock.calls.filter(([, event]) => event === "chat"); } + function sessionAgentCalls(nodeSendToSession: ReturnType) { + return nodeSendToSession.mock.calls.filter(([, event]) => event === "agent"); + } + function requireCall(call: T | undefined, label: string): T { if (call === undefined) { throw new Error(`expected ${label}`); @@ -393,6 +401,300 @@ describe("agent event handler", () => { nowSpy?.mockRestore(); }); + it("coalesces assistant agent events under the chat delta throttle", () => { + let now = 10_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-agent-throttle", { + sessionKey: "session-agent-throttle", + clientRunId: "client-agent-throttle", + }); + + for (let i = 0; i < 5; i += 1) { + now = 10_000 + i * 20; + handler({ + runId: "run-agent-throttle", + seq: i + 1, + stream: "assistant", + ts: Date.now(), + data: { text: "x".repeat(i + 1), delta: "x" }, + }); + } + + const agentCalls = agentBroadcastCalls(broadcast); + expect(agentCalls).toHaveLength(1); + expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(1); + expect(chatBroadcastCalls(broadcast)).toHaveLength(1); + expect(sessionChatCalls(nodeSendToSession)).toHaveLength(1); + expect((agentCalls[0]?.[1] as { data?: { text?: string } }).data?.text).toBe("x"); + nowSpy.mockRestore(); + }); + + it("flushes coalesced assistant agent text before lifecycle end", () => { + let now = 20_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-agent-flush", { + sessionKey: "session-agent-flush", + clientRunId: "client-agent-flush", + }); + + handler({ + runId: "run-agent-flush", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello", delta: "Hello" }, + }); + now = 20_050; + handler({ + runId: "run-agent-flush", + seq: 2, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello world", delta: " world" }, + }); + now = 20_090; + handler({ + runId: "run-agent-flush", + seq: 3, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello world!", delta: "!" }, + }); + handler({ + runId: "run-agent-flush", + seq: 4, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "end" }, + }); + + const agentCalls = agentBroadcastCalls(broadcast); + expect(agentCalls).toHaveLength(3); + expect((agentCalls[0]?.[1] as { data?: { text?: string } }).data?.text).toBe("Hello"); + expect((agentCalls[1]?.[1] as { data?: { delta?: string } }).data?.delta).toBe(" world!"); + expect((agentCalls[1]?.[1] as { data?: { text?: string } }).data?.text).toBe("Hello world!"); + expect((agentCalls[1]?.[1] as { seq?: number }).seq).toBe(3); + expect((agentCalls[2]?.[1] as { stream?: string; data?: { phase?: string } }).stream).toBe( + "lifecycle", + ); + expect((agentCalls[2]?.[1] as { data?: { phase?: string } }).data?.phase).toBe("end"); + expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(3); + nowSpy.mockRestore(); + }); + + it("flushes pending assistant agent deltas before post-window text", () => { + let now = 22_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-agent-window", { + sessionKey: "session-agent-window", + clientRunId: "client-agent-window", + }); + + handler({ + runId: "run-agent-window", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "Hel", delta: "Hel" }, + }); + now = 22_050; + handler({ + runId: "run-agent-window", + seq: 2, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello", delta: "lo" }, + }); + now = 22_200; + handler({ + runId: "run-agent-window", + seq: 3, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello!", delta: "!" }, + }); + + const agentCalls = agentBroadcastCalls(broadcast); + expect(agentCalls).toHaveLength(3); + expect((agentCalls[0]?.[1] as { data?: { delta?: string } }).data?.delta).toBe("Hel"); + expect((agentCalls[1]?.[1] as { data?: { delta?: string } }).data?.delta).toBe("lo"); + expect((agentCalls[1]?.[1] as { seq?: number }).seq).toBe(2); + expect((agentCalls[2]?.[1] as { data?: { delta?: string } }).data?.delta).toBe("!"); + expect((agentCalls[2]?.[1] as { seq?: number }).seq).toBe(3); + expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(3); + nowSpy.mockRestore(); + }); + + it("flushes older cross-stream agent deltas before immediate text", () => { + let now = 23_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-agent-cross-stream", { + sessionKey: "session-agent-cross-stream", + clientRunId: "client-agent-cross-stream", + }); + + handler({ + runId: "run-agent-cross-stream", + seq: 1, + stream: "thinking", + ts: Date.now(), + data: { text: "Think", delta: "Think" }, + }); + now = 23_050; + handler({ + runId: "run-agent-cross-stream", + seq: 2, + stream: "thinking", + ts: Date.now(), + data: { text: "Thinking", delta: "ing" }, + }); + now = 23_080; + handler({ + runId: "run-agent-cross-stream", + seq: 3, + stream: "assistant", + ts: Date.now(), + data: { text: "Answer", delta: "Answer" }, + }); + + const agentCalls = agentBroadcastCalls(broadcast); + expect(agentCalls.map(([, payload]) => (payload as { seq?: number }).seq)).toEqual([1, 2, 3]); + expect(agentCalls.map(([, payload]) => (payload as { stream?: string }).stream)).toEqual([ + "thinking", + "thinking", + "assistant", + ]); + expect((agentCalls[1]?.[1] as { data?: { delta?: string } }).data?.delta).toBe("ing"); + expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(3); + nowSpy.mockRestore(); + }); + + it("does not let lifecycle start throttle the first assistant agent event", () => { + let now = 25_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-agent-start", { + sessionKey: "session-agent-start", + clientRunId: "client-agent-start", + }); + + handler({ + runId: "run-agent-start", + seq: 1, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "start" }, + }); + now = 25_050; + handler({ + runId: "run-agent-start", + seq: 2, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello", delta: "Hello" }, + }); + + const agentCalls = agentBroadcastCalls(broadcast); + expect(agentCalls).toHaveLength(2); + expect((agentCalls[0]?.[1] as { stream?: string }).stream).toBe("lifecycle"); + expect((agentCalls[1]?.[1] as { data?: { text?: string } }).data?.text).toBe("Hello"); + expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(2); + nowSpy.mockRestore(); + }); + + it("coalesces thinking agent events under the chat delta throttle", () => { + let now = 27_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-agent-thinking", { + sessionKey: "session-agent-thinking", + clientRunId: "client-agent-thinking", + }); + + for (let i = 0; i < 5; i += 1) { + now = 27_000 + i * 20; + handler({ + runId: "run-agent-thinking", + seq: i + 1, + stream: "thinking", + ts: Date.now(), + data: { text: "t".repeat(i + 1), delta: "t" }, + }); + } + + const agentCalls = agentBroadcastCalls(broadcast); + expect(agentCalls).toHaveLength(1); + expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(1); + expect((agentCalls[0]?.[1] as { stream?: string }).stream).toBe("thinking"); + expect((agentCalls[0]?.[1] as { data?: { text?: string } }).data?.text).toBe("t"); + nowSpy.mockRestore(); + }); + + it("does not drop non-cumulative assistant agent events while coalescing text", () => { + let now = 30_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-agent-media", { + sessionKey: "session-agent-media", + clientRunId: "client-agent-media", + }); + + handler({ + runId: "run-agent-media", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "Look", delta: "Look" }, + }); + now = 30_050; + handler({ + runId: "run-agent-media", + seq: 2, + stream: "assistant", + ts: Date.now(), + data: { text: "Look", delta: "", mediaUrls: ["https://example.test/image.png"] }, + }); + now = 30_070; + handler({ + runId: "run-agent-media", + seq: 3, + stream: "assistant", + ts: Date.now(), + data: { text: "Look elsewhere", delta: "", replace: true }, + }); + now = 30_090; + handler({ + runId: "run-agent-media", + seq: 4, + stream: "assistant", + ts: Date.now(), + data: { text: "Look elsewhere now", delta: " now" }, + }); + handler({ + runId: "run-agent-media", + seq: 5, + stream: "lifecycle", + ts: Date.now(), + data: { phase: "end" }, + }); + + const agentCalls = agentBroadcastCalls(broadcast); + expect(agentCalls).toHaveLength(5); + expect((agentCalls[1]?.[1] as { data?: { mediaUrls?: string[] } }).data?.mediaUrls).toEqual([ + "https://example.test/image.png", + ]); + expect((agentCalls[2]?.[1] as { data?: { replace?: boolean } }).data?.replace).toBe(true); + expect((agentCalls[3]?.[1] as { data?: { text?: string } }).data?.text).toBe( + "Look elsewhere now", + ); + expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(5); + nowSpy.mockRestore(); + }); + it("strips inline directives from assistant chat events", () => { const { broadcast, nodeSendToSession, nowSpy } = emitRun1AssistantText( createHarness({ now: 1_000 }), diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 699499d51d8..f0c4af32b3f 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -14,6 +14,7 @@ import { shouldSuppressAssistantEventForLiveChat, } from "./live-chat-projector.js"; import type { + BufferedAgentEvent, ChatRunState, SessionEventSubscriberRegistry, ToolEventRecipientRegistry, @@ -230,12 +231,31 @@ export function createAgentEventHandler({ }: AgentEventHandlerOptions) { const pendingTerminalLifecycleErrors = new Map(); + type AgentTextThrottleStream = "assistant" | "thinking"; + + const agentTextThrottleKey = (clientRunId: string, stream: AgentTextThrottleStream) => + `${clientRunId}:${stream}`; + + const agentTextThrottleKeys = (clientRunId: string) => [ + clientRunId, + agentTextThrottleKey(clientRunId, "assistant"), + agentTextThrottleKey(clientRunId, "thinking"), + ]; + + const clearAgentTextThrottleState = (clientRunId: string) => { + for (const key of agentTextThrottleKeys(clientRunId)) { + chatRunState.agentDeltaSentAt.delete(key); + chatRunState.bufferedAgentEvents.delete(key); + } + }; + const clearBufferedChatState = (clientRunId: string) => { chatRunState.rawBuffers.delete(clientRunId); chatRunState.buffers.delete(clientRunId); chatRunState.deltaSentAt.delete(clientRunId); chatRunState.deltaLastBroadcastLen.delete(clientRunId); chatRunState.deltaLastBroadcastText.delete(clientRunId); + clearAgentTextThrottleState(clientRunId); }; const clearPendingTerminalLifecycleError = (runId: string) => { @@ -410,6 +430,7 @@ export function createAgentEventHandler({ } toolEventRecipients.markFinal(evt.runId); + clearBufferedChatState(clientRunId); clearAgentRunContext(evt.runId); agentRunSeq.delete(evt.runId); agentRunSeq.delete(clientRunId); @@ -601,6 +622,7 @@ export function createAgentEventHandler({ chatRunState.rawBuffers.delete(clientRunId); chatRunState.buffers.delete(clientRunId); chatRunState.deltaSentAt.delete(clientRunId); + clearAgentTextThrottleState(clientRunId); const spawnedBy = resolveSpawnedBy(sessionKey); if (jobState === "done") { const payload = { @@ -636,6 +658,123 @@ export function createAgentEventHandler({ nodeSendToSession(sessionKey, "chat", payload); }; + const sendAgentPayload = ( + sessionKey: string | undefined, + payload: AgentEventPayload & { spawnedBy?: string }, + ) => { + broadcast("agent", payload); + if (sessionKey) { + nodeSendToSession(sessionKey, "agent", payload); + } + }; + + const flushBufferedAgentDeltaIfNeeded = ( + clientRunId: string, + stream?: AgentTextThrottleStream, + ) => { + const keys = stream + ? [agentTextThrottleKey(clientRunId, stream)] + : agentTextThrottleKeys(clientRunId); + const bufferedEntries = keys.flatMap((key) => { + const buffered = chatRunState.bufferedAgentEvents.get(key); + if (!buffered) { + return []; + } + return [{ key, buffered }]; + }); + bufferedEntries.sort((a, b) => a.buffered.payload.seq - b.buffered.payload.seq); + for (const { key, buffered } of bufferedEntries) { + sendAgentPayload(buffered.sessionKey, buffered.payload); + chatRunState.bufferedAgentEvents.delete(key); + chatRunState.agentDeltaSentAt.set(key, Date.now()); + } + return bufferedEntries.length > 0; + }; + + const resolveAgentTextThrottleStream = ( + evt: AgentEventPayload, + ): AgentTextThrottleStream | null => { + if (evt.stream === "assistant") { + return "assistant"; + } + if (evt.stream === "thinking") { + return "thinking"; + } + return null; + }; + + const isAgentTextThrottleEvent = (evt: AgentEventPayload) => + resolveAgentTextThrottleStream(evt) !== null && typeof evt.data?.text === "string"; + + const shouldCoalesceAgentTextEvent = (evt: AgentEventPayload) => + isAgentTextThrottleEvent(evt) && + typeof evt.data.delta === "string" && + evt.data.delta.length > 0 && + !(Array.isArray(evt.data.mediaUrls) && evt.data.mediaUrls.length > 0) && + typeof evt.data.mediaUrl !== "string" && + evt.data.replace !== true && + (evt.stream !== "assistant" || !shouldSuppressAssistantEventForLiveChat(evt.data)); + + const shouldAdvanceAgentTextThrottle = (evt: AgentEventPayload) => + isAgentTextThrottleEvent(evt) && + (typeof evt.data.delta === "string" || evt.data.replace === true); + + const buildBufferedAgentEvent = ( + sessionKey: string | undefined, + payload: AgentEventPayload & { spawnedBy?: string }, + ): BufferedAgentEvent => (sessionKey ? { sessionKey, payload } : { payload }); + + const mergeBufferedAgentPayload = ( + previous: BufferedAgentEvent, + next: BufferedAgentEvent, + ): BufferedAgentEvent => { + if (previous.payload.stream !== next.payload.stream) { + return next; + } + const previousDelta = previous.payload.data.delta; + const nextDelta = next.payload.data.delta; + if (typeof previousDelta !== "string" || typeof nextDelta !== "string") { + return next; + } + return { + ...next, + payload: { + ...next.payload, + data: { + ...next.payload.data, + delta: `${previousDelta}${nextDelta}`, + }, + }, + }; + }; + + const sendOrBufferAgentTextEvent = ( + clientRunId: string, + sessionKey: string | undefined, + payload: AgentEventPayload & { spawnedBy?: string }, + ) => { + const stream = resolveAgentTextThrottleStream(payload); + if (!stream) { + sendAgentPayload(sessionKey, payload); + return; + } + const now = Date.now(); + const key = agentTextThrottleKey(clientRunId, stream); + const last = chatRunState.agentDeltaSentAt.get(key); + if (last !== undefined && now - last < 150) { + const nextBuffered = buildBufferedAgentEvent(sessionKey, payload); + const buffered = chatRunState.bufferedAgentEvents.get(key); + chatRunState.bufferedAgentEvents.set( + key, + buffered ? mergeBufferedAgentPayload(buffered, nextBuffered) : nextBuffered, + ); + return; + } + flushBufferedAgentDeltaIfNeeded(clientRunId); + sendAgentPayload(sessionKey, payload); + chatRunState.agentDeltaSentAt.set(key, now); + }; + const resolveToolVerboseLevel = (runId: string, sessionKey?: string) => { const runContext = getAgentRunContext(runId); const runVerbose = normalizeVerboseLevel(runContext?.verboseLevel); @@ -702,6 +841,7 @@ export function createAgentEventHandler({ const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off"; const suppressHeartbeatToolEvents = isToolEvent && shouldSuppressHeartbeatToolEvents(clientRunId, evt.runId); + const shouldCoalesceAgentEvent = shouldCoalesceAgentTextEvent(evt); // Channel/node subscribers respect verbose; authenticated Control UI // recipients need tool result payloads to render live tool cards. const channelToolPayload = @@ -714,6 +854,7 @@ export function createAgentEventHandler({ })() : agentPayload; if (last > 0 && evt.seq !== last + 1 && isControlUiVisible) { + flushBufferedAgentDeltaIfNeeded(clientRunId); broadcast("agent", { runId: eventRunId, stream: "error", @@ -741,6 +882,7 @@ export function createAgentEventHandler({ !suppressHeartbeatToolEvents ) { flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq); + flushBufferedAgentDeltaIfNeeded(clientRunId); } // Always broadcast tool events to registered WS recipients with // tool-events capability, regardless of verboseLevel. The verbose @@ -772,27 +914,40 @@ export function createAgentEventHandler({ } } else { const itemPhase = isItemEvent && typeof evt.data?.phase === "string" ? evt.data.phase : ""; - if (itemPhase === "start" && isControlUiVisible && sessionKey && !isAborted) { - flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq); + if (itemPhase === "start" && isControlUiVisible && !isAborted) { + if (sessionKey) { + flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq); + } + flushBufferedAgentDeltaIfNeeded(clientRunId); } if (isControlUiVisible) { - broadcast("agent", agentPayload); + if (shouldCoalesceAgentEvent) { + sendOrBufferAgentTextEvent(clientRunId, sessionKey, agentPayload); + } else { + flushBufferedAgentDeltaIfNeeded(clientRunId); + sendAgentPayload(sessionKey, agentPayload); + const textThrottleStream = resolveAgentTextThrottleStream(evt); + if (textThrottleStream && shouldAdvanceAgentTextThrottle(evt)) { + chatRunState.agentDeltaSentAt.set( + agentTextThrottleKey(clientRunId, textThrottleStream), + Date.now(), + ); + } + } } } if (isControlUiVisible && sessionKey) { - // Send non-heartbeat tool events to node/channel subscribers only when - // verbose is enabled; WS clients already received the event above. - if (!isToolEvent || (!suppressHeartbeatToolEvents && toolVerbose !== "off")) { + // Send tool events to node/channel subscribers only when verbose is enabled; + // WS clients already received the event above via broadcastToConnIds. + if (isToolEvent && !suppressHeartbeatToolEvents && toolVerbose !== "off") { nodeSendToSession( sessionKey, "agent", - isToolEvent - ? projectToolSearchCodeEventForChannelPayload({ - ...channelToolPayload, - ...buildSessionEventSnapshot(sessionKey), - }) - : agentPayload, + projectToolSearchCodeEventForChannelPayload({ + ...channelToolPayload, + ...buildSessionEventSnapshot(sessionKey), + }), ); } if ( diff --git a/src/gateway/server-maintenance.test.ts b/src/gateway/server-maintenance.test.ts index 378fd441375..c9c6f071fec 100644 --- a/src/gateway/server-maintenance.test.ts +++ b/src/gateway/server-maintenance.test.ts @@ -41,7 +41,12 @@ function createMaintenanceTimerDeps() { logHealth: { error: () => {} }, dedupe: new Map(), chatAbortControllers: new Map(), - chatRunState: { abortedRuns: new Map(), deltaLastBroadcastText: new Map() }, + chatRunState: { + abortedRuns: new Map(), + deltaLastBroadcastText: new Map(), + agentDeltaSentAt: new Map(), + bufferedAgentEvents: new Map(), + }, chatRunBuffers: new Map(), chatDeltaSentAt: new Map(), chatDeltaLastBroadcastLen: new Map(), @@ -209,6 +214,33 @@ describe("startGatewayMaintenanceTimers", () => { stopMaintenanceTimers(timers); }); + it("sweeps orphaned stale agent throttle state once the abort controller is gone", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); + const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js"); + const deps = createMaintenanceTimerDeps(); + const runId = "run-agent-orphaned"; + deps.chatRunState.agentDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1); + deps.chatRunState.bufferedAgentEvents.set(runId, { + payload: { + runId, + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "buffer", delta: "buffer" }, + }, + }); + + const timers = startGatewayMaintenanceTimers(deps); + + await vi.advanceTimersByTimeAsync(60_000); + + expect(deps.chatRunState.agentDeltaSentAt.has(runId)).toBe(false); + expect(deps.chatRunState.bufferedAgentEvents.has(runId)).toBe(false); + + stopMaintenanceTimers(timers); + }); + it("clears deltaLastBroadcastLen when aborted runs age out", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); @@ -220,6 +252,16 @@ describe("startGatewayMaintenanceTimers", () => { deps.chatDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1); deps.chatDeltaLastBroadcastLen.set(runId, 6); deps.chatRunState.deltaLastBroadcastText.set(runId, "buffer"); + deps.chatRunState.agentDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1); + deps.chatRunState.bufferedAgentEvents.set(runId, { + payload: { + runId, + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "buffer", delta: "buffer" }, + }, + }); const timers = startGatewayMaintenanceTimers(deps); @@ -230,6 +272,8 @@ describe("startGatewayMaintenanceTimers", () => { expect(deps.chatDeltaSentAt.has(runId)).toBe(false); expect(deps.chatDeltaLastBroadcastLen.has(runId)).toBe(false); expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false); + expect(deps.chatRunState.agentDeltaSentAt.has(runId)).toBe(false); + expect(deps.chatRunState.bufferedAgentEvents.has(runId)).toBe(false); stopMaintenanceTimers(timers); }); diff --git a/src/gateway/server-maintenance.ts b/src/gateway/server-maintenance.ts index c8802d52382..3cef87e179a 100644 --- a/src/gateway/server-maintenance.ts +++ b/src/gateway/server-maintenance.ts @@ -3,6 +3,7 @@ import { sweepStaleRunContexts } from "../infra/agent-events.js"; import { cleanOldMedia } from "../media/store.js"; import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js"; import { pruneStaleControlPlaneBuckets } from "./control-plane-rate-limit.js"; +import type { ChatRunState } from "./server-chat-state.js"; import type { ChatRunEntry } from "./server-chat.js"; import { DEDUPE_MAX, @@ -33,7 +34,10 @@ export function startGatewayMaintenanceTimers(params: { logHealth: { error: (msg: string) => void }; dedupe: Map; chatAbortControllers: Map; - chatRunState: { abortedRuns: Map; deltaLastBroadcastText: Map }; + chatRunState: Pick< + ChatRunState, + "abortedRuns" | "deltaLastBroadcastText" | "agentDeltaSentAt" | "bufferedAgentEvents" + >; chatRunBuffers: Map; chatDeltaSentAt: Map; chatDeltaLastBroadcastLen: Map; @@ -127,6 +131,25 @@ export function startGatewayMaintenanceTimers(params: { } } + const clearAgentThrottleStateForRun = (runId: string) => { + params.chatRunState.agentDeltaSentAt.delete(runId); + params.chatRunState.agentDeltaSentAt.delete(`${runId}:assistant`); + params.chatRunState.agentDeltaSentAt.delete(`${runId}:thinking`); + params.chatRunState.bufferedAgentEvents.delete(runId); + params.chatRunState.bufferedAgentEvents.delete(`${runId}:assistant`); + params.chatRunState.bufferedAgentEvents.delete(`${runId}:thinking`); + }; + + const resolveAgentThrottleRunId = (key: string) => { + if (key.endsWith(":assistant")) { + return key.slice(0, -":assistant".length); + } + if (key.endsWith(":thinking")) { + return key.slice(0, -":thinking".length); + } + return key; + }; + for (const [runId, entry] of params.chatAbortControllers) { if (now <= entry.expiresAtMs) { continue; @@ -138,6 +161,8 @@ export function startGatewayMaintenanceTimers(params: { chatDeltaSentAt: params.chatDeltaSentAt, chatDeltaLastBroadcastLen: params.chatDeltaLastBroadcastLen, chatDeltaLastBroadcastText: params.chatRunState.deltaLastBroadcastText, + agentDeltaSentAt: params.chatRunState.agentDeltaSentAt, + bufferedAgentEvents: params.chatRunState.bufferedAgentEvents, chatAbortedRuns: params.chatRunState.abortedRuns, removeChatRun: params.removeChatRun, agentRunSeq: params.agentRunSeq, @@ -158,6 +183,7 @@ export function startGatewayMaintenanceTimers(params: { params.chatDeltaSentAt.delete(runId); params.chatDeltaLastBroadcastLen.delete(runId); params.chatRunState.deltaLastBroadcastText.delete(runId); + clearAgentThrottleStateForRun(runId); } // Prune expired control-plane rate-limit buckets to prevent unbounded @@ -181,6 +207,21 @@ export function startGatewayMaintenanceTimers(params: { params.chatDeltaSentAt.delete(runId); params.chatDeltaLastBroadcastLen.delete(runId); params.chatRunState.deltaLastBroadcastText.delete(runId); + clearAgentThrottleStateForRun(runId); + } + for (const [key, lastSentAt] of params.chatRunState.agentDeltaSentAt) { + const runId = resolveAgentThrottleRunId(key); + if (params.chatRunState.abortedRuns.has(runId)) { + continue; + } + if (params.chatAbortControllers.has(runId)) { + continue; + } + if (now - lastSentAt <= ABORTED_RUN_TTL_MS) { + continue; + } + params.chatRunState.agentDeltaSentAt.delete(key); + params.chatRunState.bufferedAgentEvents.delete(key); } // Sweep stale agent run contexts (orphaned when lifecycle end/error is missed). sweepStaleRunContexts(); diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 13ed1bbfb3a..e7ff6a7fe72 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -157,6 +157,8 @@ const makeContext = (): GatewayRequestContext => chatDeltaSentAt: new Map(), chatDeltaLastBroadcastLen: new Map(), chatDeltaLastBroadcastText: new Map(), + agentDeltaSentAt: new Map(), + bufferedAgentEvents: new Map(), chatAbortedRuns: new Map(), agentRunSeq: new Map(), broadcast: vi.fn(), diff --git a/src/gateway/server-methods/chat.abort-authorization.test.ts b/src/gateway/server-methods/chat.abort-authorization.test.ts index e1248525a54..c7441b0689f 100644 --- a/src/gateway/server-methods/chat.abort-authorization.test.ts +++ b/src/gateway/server-methods/chat.abort-authorization.test.ts @@ -100,6 +100,45 @@ describe("chat.abort authorization", () => { expect(context.chatAbortControllers.has("run-1")).toBe(false); }); + it("clears agent text throttle state through the real abort caller", async () => { + const context = createChatAbortContext({ + chatAbortControllers: new Map([ + ["run-1", createActiveRun("main", { owner: { connId: "conn-owner", deviceId: "dev-1" } })], + ]), + agentDeltaSentAt: new Map([["run-1:assistant", Date.now()]]), + bufferedAgentEvents: new Map([ + [ + "run-1:assistant", + { + payload: { + runId: "run-1", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "pending", delta: "pending" }, + }, + }, + ], + ]), + }); + + const respond = await invokeChatAbortHandler({ + handler: chatHandlers["chat.abort"], + context, + request: { sessionKey: "main", runId: "run-1" }, + client: { + connId: "conn-owner", + connect: { device: { id: "dev-1" }, scopes: ["operator.write"] }, + }, + }); + + const [ok, payload] = respond.mock.calls.at(-1) ?? []; + expect(ok).toBe(true); + expect(payload).toMatchObject({ aborted: true, runIds: ["run-1"] }); + expect(context.agentDeltaSentAt.has("run-1:assistant")).toBe(false); + expect(context.bufferedAgentEvents.has("run-1:assistant")).toBe(false); + }); + it("only aborts session-scoped runs owned by the requester", async () => { const context = createChatAbortContext({ chatAbortControllers: new Map([ diff --git a/src/gateway/server-methods/chat.abort.test-helpers.ts b/src/gateway/server-methods/chat.abort.test-helpers.ts index 7a6cb3b2730..c2c5a1860cb 100644 --- a/src/gateway/server-methods/chat.abort.test-helpers.ts +++ b/src/gateway/server-methods/chat.abort.test-helpers.ts @@ -27,6 +27,8 @@ type ChatAbortTestContext = Record & { chatDeltaSentAt: Map; chatDeltaLastBroadcastLen: Map; chatDeltaLastBroadcastText: Map; + agentDeltaSentAt: Map; + bufferedAgentEvents: Map; chatAbortedRuns: Map; removeChatRun: (...args: unknown[]) => { sessionKey: string; clientRunId: string } | undefined; agentRunSeq: Map; @@ -46,6 +48,8 @@ export function createChatAbortContext( chatDeltaSentAt: new Map(), chatDeltaLastBroadcastLen: new Map(), chatDeltaLastBroadcastText: new Map(), + agentDeltaSentAt: new Map(), + bufferedAgentEvents: new Map(), chatAbortedRuns: new Map(), removeChatRun: vi .fn() diff --git a/src/gateway/server-methods/chat.directive-tags.test.ts b/src/gateway/server-methods/chat.directive-tags.test.ts index 75714e79b5e..aca7afebc4f 100644 --- a/src/gateway/server-methods/chat.directive-tags.test.ts +++ b/src/gateway/server-methods/chat.directive-tags.test.ts @@ -503,6 +503,8 @@ function createChatContext(): Pick< | "chatDeltaSentAt" | "chatDeltaLastBroadcastLen" | "chatDeltaLastBroadcastText" + | "agentDeltaSentAt" + | "bufferedAgentEvents" | "chatAbortedRuns" | "addChatRun" | "removeChatRun" @@ -520,6 +522,8 @@ function createChatContext(): Pick< chatDeltaSentAt: new Map(), chatDeltaLastBroadcastLen: new Map(), chatDeltaLastBroadcastText: new Map(), + agentDeltaSentAt: new Map(), + bufferedAgentEvents: new Map(), chatAbortedRuns: new Map(), addChatRun: vi.fn(), removeChatRun: vi.fn(), diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 2cd360e9fe3..b4a45eb50f5 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -1496,6 +1496,8 @@ function createChatAbortOps(context: GatewayRequestContext): ChatAbortOps { chatDeltaSentAt: context.chatDeltaSentAt, chatDeltaLastBroadcastLen: context.chatDeltaLastBroadcastLen, chatDeltaLastBroadcastText: context.chatDeltaLastBroadcastText, + agentDeltaSentAt: context.agentDeltaSentAt, + bufferedAgentEvents: context.bufferedAgentEvents, chatAbortedRuns: context.chatAbortedRuns, removeChatRun: context.removeChatRun, agentRunSeq: context.agentRunSeq, diff --git a/src/gateway/server-methods/shared-types.ts b/src/gateway/server-methods/shared-types.ts index 42f0f95e417..fdc923d0533 100644 --- a/src/gateway/server-methods/shared-types.ts +++ b/src/gateway/server-methods/shared-types.ts @@ -13,6 +13,7 @@ import type { PluginNodeCapabilitySurface } from "../plugin-node-capability.js"; import type { ConnectParams, ErrorShape, RequestFrame } from "../protocol/index.js"; import type { GatewayBroadcastFn, GatewayBroadcastToConnIdsFn } from "../server-broadcast-types.js"; import type { ChannelRuntimeSnapshot } from "../server-channel-runtime.types.js"; +import type { BufferedAgentEvent } from "../server-chat-state.js"; import type { DedupeEntry } from "../server-shared.js"; import type { GatewayEventLoopHealth } from "../server/event-loop-health.js"; @@ -76,6 +77,8 @@ export type GatewayRequestContext = { chatDeltaSentAt: Map; chatDeltaLastBroadcastLen: Map; chatDeltaLastBroadcastText: Map; + agentDeltaSentAt: Map; + bufferedAgentEvents: Map; addChatRun: (sessionId: string, entry: { sessionKey: string; clientRunId: string }) => void; removeChatRun: ( sessionId: string, diff --git a/src/gateway/server-request-context.test.ts b/src/gateway/server-request-context.test.ts index f9b1891df07..e62c8b057aa 100644 --- a/src/gateway/server-request-context.test.ts +++ b/src/gateway/server-request-context.test.ts @@ -44,6 +44,8 @@ describe("createGatewayRequestContext", () => { chatDeltaSentAt: new Map(), chatDeltaLastBroadcastLen: new Map(), chatDeltaLastBroadcastText: new Map(), + agentDeltaSentAt: new Map(), + bufferedAgentEvents: new Map(), addChatRun: vi.fn(), removeChatRun: vi.fn(), subscribeSessionEvents: vi.fn(), diff --git a/src/gateway/server-request-context.ts b/src/gateway/server-request-context.ts index 0f13fd22c3c..a5fdb6b613c 100644 --- a/src/gateway/server-request-context.ts +++ b/src/gateway/server-request-context.ts @@ -39,6 +39,8 @@ type GatewayRequestContextParams = { chatDeltaSentAt: GatewayRequestContext["chatDeltaSentAt"]; chatDeltaLastBroadcastLen: GatewayRequestContext["chatDeltaLastBroadcastLen"]; chatDeltaLastBroadcastText: GatewayRequestContext["chatDeltaLastBroadcastText"]; + agentDeltaSentAt: GatewayRequestContext["agentDeltaSentAt"]; + bufferedAgentEvents: GatewayRequestContext["bufferedAgentEvents"]; addChatRun: GatewayRequestContext["addChatRun"]; removeChatRun: GatewayRequestContext["removeChatRun"]; subscribeSessionEvents: GatewayRequestContext["subscribeSessionEvents"]; @@ -136,6 +138,8 @@ export function createGatewayRequestContext( chatDeltaSentAt: params.chatDeltaSentAt, chatDeltaLastBroadcastLen: params.chatDeltaLastBroadcastLen, chatDeltaLastBroadcastText: params.chatDeltaLastBroadcastText, + agentDeltaSentAt: params.agentDeltaSentAt, + bufferedAgentEvents: params.bufferedAgentEvents, addChatRun: params.addChatRun, removeChatRun: params.removeChatRun, subscribeSessionEvents: params.subscribeSessionEvents, diff --git a/src/gateway/server-startup-early.test.ts b/src/gateway/server-startup-early.test.ts index cd484084c8a..b1ee1b1092e 100644 --- a/src/gateway/server-startup-early.test.ts +++ b/src/gateway/server-startup-early.test.ts @@ -48,7 +48,12 @@ describe("startGatewayEarlyRuntime", () => { logHealth: { error: () => {} }, dedupe: new Map(), chatAbortControllers: new Map(), - chatRunState: { abortedRuns: new Map(), deltaLastBroadcastText: new Map() }, + chatRunState: { + abortedRuns: new Map(), + deltaLastBroadcastText: new Map(), + agentDeltaSentAt: new Map(), + bufferedAgentEvents: new Map(), + }, chatRunBuffers: new Map(), chatDeltaSentAt: new Map(), chatDeltaLastBroadcastLen: new Map(), diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index b8c10ae4d18..7997105bb09 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -1270,6 +1270,8 @@ export async function startGatewayServer( chatDeltaSentAt: chatRunState.deltaSentAt, chatDeltaLastBroadcastLen: chatRunState.deltaLastBroadcastLen, chatDeltaLastBroadcastText: chatRunState.deltaLastBroadcastText, + agentDeltaSentAt: chatRunState.agentDeltaSentAt, + bufferedAgentEvents: chatRunState.bufferedAgentEvents, addChatRun, removeChatRun, subscribeSessionEvents: sessionEventSubscribers.subscribe, diff --git a/src/gateway/talk-realtime-relay.test.ts b/src/gateway/talk-realtime-relay.test.ts index 5e689ad1eb8..9668149e5ef 100644 --- a/src/gateway/talk-realtime-relay.test.ts +++ b/src/gateway/talk-realtime-relay.test.ts @@ -39,6 +39,21 @@ describe("talk realtime gateway relay", () => { const broadcast = vi.fn(); const nodeSendToSession = vi.fn(); const removeChatRun = vi.fn(() => ({ sessionKey: "main", clientRunId: "run-1" })); + const agentDeltaSentAt = new Map([["run-1:assistant", Date.now()]]); + const bufferedAgentEvents = new Map([ + [ + "run-1:assistant", + { + payload: { + runId: "run-1", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "pending", delta: "pending" }, + }, + }, + ], + ]); const context = { broadcastToConnIds: vi.fn(), broadcast, @@ -59,6 +74,8 @@ describe("talk realtime gateway relay", () => { chatDeltaSentAt: new Map(), chatDeltaLastBroadcastLen: new Map(), chatDeltaLastBroadcastText: new Map(), + agentDeltaSentAt, + bufferedAgentEvents, chatAbortedRuns: new Map(), removeChatRun, agentRunSeq: new Map(), @@ -83,6 +100,8 @@ describe("talk realtime gateway relay", () => { broadcast, nodeSendToSession, removeChatRun, + agentDeltaSentAt, + bufferedAgentEvents, session, }; } @@ -487,8 +506,15 @@ describe("talk realtime gateway relay", () => { }); it("aborts linked agent consult runs when the relay turn is cancelled", () => { - const { abortController, broadcast, nodeSendToSession, removeChatRun, session } = - createAbortableRelayRunFixture(); + const { + abortController, + broadcast, + nodeSendToSession, + removeChatRun, + agentDeltaSentAt, + bufferedAgentEvents, + session, + } = createAbortableRelayRunFixture(); cancelTalkRealtimeRelayTurn({ relaySessionId: session.relaySessionId, connId: "conn-1", @@ -497,16 +523,26 @@ describe("talk realtime gateway relay", () => { expect(abortController.signal.aborted).toBe(true); expect(removeChatRun).toHaveBeenCalledWith("run-1", "run-1", "main"); + expect(agentDeltaSentAt.has("run-1:assistant")).toBe(false); + expect(bufferedAgentEvents.has("run-1:assistant")).toBe(false); expectChatAbortPayload(broadcast, "barge-in"); expectNodeAbortPayload(nodeSendToSession); }); it("aborts linked agent consult runs when the relay session closes", () => { - const { abortController, broadcast, nodeSendToSession, session } = - createAbortableRelayRunFixture(); + const { + abortController, + broadcast, + nodeSendToSession, + agentDeltaSentAt, + bufferedAgentEvents, + session, + } = createAbortableRelayRunFixture(); stopTalkRealtimeRelaySession({ relaySessionId: session.relaySessionId, connId: "conn-1" }); expect(abortController.signal.aborted).toBe(true); + expect(agentDeltaSentAt.has("run-1:assistant")).toBe(false); + expect(bufferedAgentEvents.has("run-1:assistant")).toBe(false); expectChatAbortPayload(broadcast, "relay-closed"); expectNodeAbortPayload(nodeSendToSession); }); @@ -517,6 +553,21 @@ describe("talk realtime gateway relay", () => { const broadcast = vi.fn(); const nodeSendToSession = vi.fn(); const removeChatRun = vi.fn(() => ({ sessionKey: "main", clientRunId: "run-1" })); + const agentDeltaSentAt = new Map([["run-1:assistant", Date.now()]]); + const bufferedAgentEvents = new Map([ + [ + "run-1:assistant", + { + payload: { + runId: "run-1", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "pending", delta: "pending" }, + }, + }, + ], + ]); const provider: RealtimeVoiceProviderPlugin = { id: "relay-test", label: "Relay Test", @@ -555,6 +606,8 @@ describe("talk realtime gateway relay", () => { chatDeltaSentAt: new Map(), chatDeltaLastBroadcastLen: new Map(), chatDeltaLastBroadcastText: new Map(), + agentDeltaSentAt, + bufferedAgentEvents, chatAbortedRuns: new Map(), removeChatRun, agentRunSeq: new Map(), @@ -577,6 +630,8 @@ describe("talk realtime gateway relay", () => { bridgeRequest?.onClose?.("error"); expect(abortController.signal.aborted).toBe(true); + expect(agentDeltaSentAt.has("run-1:assistant")).toBe(false); + expect(bufferedAgentEvents.has("run-1:assistant")).toBe(false); expectChatAbortPayload(broadcast, "relay-closed"); expectNodeAbortPayload(nodeSendToSession); }); diff --git a/src/plugins/registry.runtime-config.test.ts b/src/plugins/registry.runtime-config.test.ts index 3d94c184f80..0aaae043cfc 100644 --- a/src/plugins/registry.runtime-config.test.ts +++ b/src/plugins/registry.runtime-config.test.ts @@ -3,6 +3,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; import { createPluginRecord } from "./loader-records.js"; import { createPluginRegistry } from "./registry.js"; import { getPluginRuntimeGatewayRequestScope } from "./runtime/gateway-request-scope.js"; +import { createPluginRuntime } from "./runtime/index.js"; import type { PluginRuntime } from "./runtime/types.js"; function createTestRegistry(runtime: PluginRuntime) { @@ -19,33 +20,41 @@ function createTestRegistry(runtime: PluginRuntime) { } describe("plugin registry runtime config scope", () => { - it("runs deprecated config helpers with the owning plugin scope", async () => { - let loadScope = getPluginRuntimeGatewayRequestScope(); - let writeScope = getPluginRuntimeGatewayRequestScope(); + it("runs config helpers with the owning plugin scope", async () => { + let currentScope = getPluginRuntimeGatewayRequestScope(); + let mutateScope = getPluginRuntimeGatewayRequestScope(); + let replaceScope = getPluginRuntimeGatewayRequestScope(); const config = {} as OpenClawConfig; const replaceResult = { previousHash: null, nextHash: "next", } as unknown as Awaited>; - const mutateConfigFile: PluginRuntime["config"]["mutateConfigFile"] = async () => ({ - ...replaceResult, - result: undefined, - }); + const mutateConfigFile: PluginRuntime["config"]["mutateConfigFile"] = async () => { + mutateScope = getPluginRuntimeGatewayRequestScope(); + return { + ...replaceResult, + result: undefined, + }; + }; + const replaceConfigFile: PluginRuntime["config"]["replaceConfigFile"] = async () => { + replaceScope = getPluginRuntimeGatewayRequestScope(); + return replaceResult; + }; + const loadConfig: PluginRuntime["config"]["loadConfig"] = () => config; + const writeConfigFile: PluginRuntime["config"]["writeConfigFile"] = async () => {}; const configRuntime = { - current: vi.fn(() => config), - mutateConfigFile, - replaceConfigFile: async () => replaceResult, - loadConfig: vi.fn(() => { - loadScope = getPluginRuntimeGatewayRequestScope(); + current: vi.fn(() => { + currentScope = getPluginRuntimeGatewayRequestScope(); return config; }), - writeConfigFile: vi.fn(async () => { - writeScope = getPluginRuntimeGatewayRequestScope(); - }), + mutateConfigFile, + replaceConfigFile, + loadConfig, + writeConfigFile, } satisfies PluginRuntime["config"]; - const pluginRegistry = createTestRegistry({ - config: configRuntime, - } as unknown as PluginRuntime); + const runtime = createPluginRuntime(); + runtime.config = configRuntime; + const pluginRegistry = createTestRegistry(runtime); const record = createPluginRecord({ id: "legacy-plugin", name: "Legacy Plugin", @@ -56,14 +65,25 @@ describe("plugin registry runtime config scope", () => { }); const api = pluginRegistry.createApi(record, { config }); - expect(api.runtime.config.loadConfig()).toBe(config); - await api.runtime.config.writeConfigFile(config); + expect(api.runtime.config.current()).toBe(config); + await api.runtime.config.mutateConfigFile({ + afterWrite: { mode: "none", reason: "test" }, + mutate: () => undefined, + }); + await api.runtime.config.replaceConfigFile({ + nextConfig: config, + afterWrite: { mode: "none", reason: "test" }, + }); - expect(loadScope).toMatchObject({ + expect(currentScope).toMatchObject({ pluginId: "legacy-plugin", pluginSource: "/plugins/legacy-plugin/index.js", }); - expect(writeScope).toMatchObject({ + expect(mutateScope).toMatchObject({ + pluginId: "legacy-plugin", + pluginSource: "/plugins/legacy-plugin/index.js", + }); + expect(replaceScope).toMatchObject({ pluginId: "legacy-plugin", pluginSource: "/plugins/legacy-plugin/index.js", }); diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index fbd338ff8fd..51364ce2c89 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -2399,9 +2399,10 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { const config = Reflect.get(target, prop, receiver); return { ...config, - loadConfig: () => runWithPluginScope(() => config.loadConfig()), - writeConfigFile: (cfg, options) => - runWithPluginScope(() => config.writeConfigFile(cfg, options)), + current: () => runWithPluginScope(() => config.current()), + mutateConfigFile: (params) => runWithPluginScope(() => config.mutateConfigFile(params)), + replaceConfigFile: (params) => + runWithPluginScope(() => config.replaceConfigFile(params)), } satisfies PluginRuntime["config"]; } if (prop === "llm") {