mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-19 01:04:45 +00:00
[Fix] Throttle agent event fanout (#80335)
Merged via squash.
Prepared head SHA: 5dddb405ad
Co-authored-by: samzong <13782141+samzong@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
@@ -150,6 +150,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Plugins doctor: report stale plugin config warnings and avoid claiming full plugin health when config warnings remain. (#81515) Thanks @BKF-Gitty.
|
||||
- Sessions: display `model: "<agentId>-acp"` / `modelProvider: "acpx"` (ACP-runtime sentinel) for ACP control-plane sessions in `openclaw sessions` output, instead of the agent's configured model which was misleading. Catalog finding 20. (#79543)
|
||||
- Slack: normalize message read `before` and `after` timestamp bounds before calling Slack history or thread reply APIs. Fixes #80835. (#81338) Thanks @honor2030.
|
||||
- Gateway: throttle assistant/thinking agent event fanout during streaming bursts without dropping buffered deltas. (#80335) Thanks @samzong.
|
||||
|
||||
### Changes
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import fs from "node:fs";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { repairToolUseResultPairing } from "../../agents/session-transcript-repair.js";
|
||||
import * as transcriptEvents from "../../sessions/transcript-events.js";
|
||||
import type { SessionTranscriptUpdate } from "../../sessions/transcript-events.js";
|
||||
import { resolveSessionTranscriptPathInDir } from "./paths.js";
|
||||
@@ -18,6 +19,7 @@ describe("appendAssistantMessageToSessionTranscript", () => {
|
||||
type ExactAssistantMessage = Parameters<
|
||||
typeof appendExactAssistantMessageToSessionTranscript
|
||||
>[0]["message"];
|
||||
type TranscriptRepairMessage = Parameters<typeof repairToolUseResultPairing>[0][number];
|
||||
type TranscriptUpdateEmitterSpy = {
|
||||
mock: {
|
||||
calls: [string | SessionTranscriptUpdate][];
|
||||
@@ -283,6 +285,78 @@ describe("appendAssistantMessageToSessionTranscript", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps delivery mirrors in transcripts while repair preserves real tool results", async () => {
|
||||
writeTranscriptStore();
|
||||
const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir());
|
||||
const toolCallId = "call_maniple_list";
|
||||
|
||||
const toolCallResult = await appendSessionTranscriptMessage({
|
||||
transcriptPath: sessionFile,
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [
|
||||
{
|
||||
type: "toolCall",
|
||||
id: toolCallId,
|
||||
name: "maniple__list_workers",
|
||||
arguments: {},
|
||||
},
|
||||
],
|
||||
stopReason: "toolUse",
|
||||
},
|
||||
});
|
||||
|
||||
const mirrorResult = await appendAssistantMessageToSessionTranscript({
|
||||
sessionKey,
|
||||
text: "Maniple List Workers",
|
||||
storePath: fixture.storePath(),
|
||||
});
|
||||
|
||||
expect(mirrorResult.ok).toBe(true);
|
||||
if (!mirrorResult.ok) {
|
||||
return;
|
||||
}
|
||||
expect(mirrorResult.messageId).not.toBe(toolCallResult.messageId);
|
||||
const linesAfterMirror = fs.readFileSync(sessionFile, "utf-8").trim().split("\n");
|
||||
expect(linesAfterMirror).toHaveLength(3);
|
||||
const mirrorLine = JSON.parse(linesAfterMirror[2]);
|
||||
expect(mirrorLine.message.model).toBe("delivery-mirror");
|
||||
|
||||
await appendSessionTranscriptMessage({
|
||||
transcriptPath: sessionFile,
|
||||
message: {
|
||||
role: "toolResult",
|
||||
toolCallId,
|
||||
toolName: "maniple__list_workers",
|
||||
content: [{ type: "text", text: "workers listed" }],
|
||||
isError: false,
|
||||
},
|
||||
});
|
||||
|
||||
const messages = fs
|
||||
.readFileSync(sessionFile, "utf-8")
|
||||
.trim()
|
||||
.split("\n")
|
||||
.map((line) => JSON.parse(line) as { message?: TranscriptRepairMessage })
|
||||
.flatMap((entry) => (entry.message ? [entry.message] : []));
|
||||
expect(messages.map((message) => message.role)).toEqual([
|
||||
"assistant",
|
||||
"assistant",
|
||||
"toolResult",
|
||||
]);
|
||||
const repair = repairToolUseResultPairing(messages, {
|
||||
missingToolResultText: "aborted",
|
||||
});
|
||||
|
||||
expect(repair.added).toHaveLength(0);
|
||||
expect(repair.messages.map((message) => message.role)).toEqual([
|
||||
"assistant",
|
||||
"toolResult",
|
||||
"assistant",
|
||||
]);
|
||||
expect((repair.messages[2] as { model?: string }).model).toBe("delivery-mirror");
|
||||
});
|
||||
|
||||
it("finds session entry using normalized (lowercased) key", async () => {
|
||||
const storeKey = "agent:main:imessage:direct:+15551234567";
|
||||
const store = {
|
||||
|
||||
@@ -302,7 +302,6 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
|
||||
if (latestEquivalentAssistantId) {
|
||||
return { ok: true, sessionFile, messageId: latestEquivalentAssistantId };
|
||||
}
|
||||
|
||||
const message = {
|
||||
...params.message,
|
||||
...(explicitIdempotencyKey ? { idempotencyKey: explicitIdempotencyKey } : {}),
|
||||
|
||||
@@ -54,6 +54,21 @@ function createOps(params: {
|
||||
chatDeltaSentAt: new Map([[runId, Date.now()]]),
|
||||
chatDeltaLastBroadcastLen: new Map([[runId, buffer?.length ?? 0]]),
|
||||
chatDeltaLastBroadcastText: new Map(buffer !== undefined ? [[runId, buffer]] : []),
|
||||
agentDeltaSentAt: new Map([[`${runId}:assistant`, Date.now()]]),
|
||||
bufferedAgentEvents: new Map([
|
||||
[
|
||||
`${runId}:assistant`,
|
||||
{
|
||||
payload: {
|
||||
runId,
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "buffer", delta: "buffer" },
|
||||
},
|
||||
},
|
||||
],
|
||||
]),
|
||||
chatAbortedRuns: new Map(),
|
||||
removeChatRun,
|
||||
agentRunSeq: new Map(),
|
||||
@@ -110,6 +125,8 @@ describe("abortChatRunById", () => {
|
||||
expect(ops.chatDeltaSentAt.has(runId)).toBe(false);
|
||||
expect(ops.chatDeltaLastBroadcastLen.has(runId)).toBe(false);
|
||||
expect(ops.chatDeltaLastBroadcastText.has(runId)).toBe(false);
|
||||
expect(ops.agentDeltaSentAt?.has(`${runId}:assistant`)).toBe(false);
|
||||
expect(ops.bufferedAgentEvents?.has(`${runId}:assistant`)).toBe(false);
|
||||
expect(ops.removeChatRun).toHaveBeenCalledWith(runId, runId, sessionKey);
|
||||
expect(ops.agentRunSeq.has(runId)).toBe(false);
|
||||
expect(ops.agentRunSeq.has("client-run-1")).toBe(false);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js";
|
||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||
import type { BufferedAgentEvent } from "./server-chat-state.js";
|
||||
|
||||
const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000;
|
||||
|
||||
@@ -113,6 +114,8 @@ export type ChatAbortOps = {
|
||||
chatDeltaSentAt: Map<string, number>;
|
||||
chatDeltaLastBroadcastLen: Map<string, number>;
|
||||
chatDeltaLastBroadcastText: Map<string, string>;
|
||||
agentDeltaSentAt: Map<string, number>;
|
||||
bufferedAgentEvents: Map<string, BufferedAgentEvent>;
|
||||
chatAbortedRuns: Map<string, number>;
|
||||
removeChatRun: (
|
||||
sessionId: string,
|
||||
@@ -178,6 +181,12 @@ export function abortChatRunById(
|
||||
ops.chatDeltaSentAt.delete(runId);
|
||||
ops.chatDeltaLastBroadcastLen.delete(runId);
|
||||
ops.chatDeltaLastBroadcastText.delete(runId);
|
||||
ops.agentDeltaSentAt.delete(runId);
|
||||
ops.agentDeltaSentAt.delete(`${runId}:assistant`);
|
||||
ops.agentDeltaSentAt.delete(`${runId}:thinking`);
|
||||
ops.bufferedAgentEvents.delete(runId);
|
||||
ops.bufferedAgentEvents.delete(`${runId}:assistant`);
|
||||
ops.bufferedAgentEvents.delete(`${runId}:thinking`);
|
||||
const removed = ops.removeChatRun(runId, runId, sessionKey);
|
||||
broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText });
|
||||
emitAgentEvent({
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
import type { AgentEventPayload } from "../infra/agent-events.js";
|
||||
|
||||
export type ChatRunEntry = {
|
||||
sessionKey: string;
|
||||
clientRunId: string;
|
||||
};
|
||||
|
||||
export type BufferedAgentEvent = {
|
||||
sessionKey?: string;
|
||||
payload: AgentEventPayload & { spawnedBy?: string };
|
||||
};
|
||||
|
||||
export type ChatRunRegistry = {
|
||||
add: (sessionId: string, entry: ChatRunEntry) => void;
|
||||
peek: (sessionId: string) => ChatRunEntry | undefined;
|
||||
@@ -71,6 +78,8 @@ export type ChatRunState = {
|
||||
/** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */
|
||||
deltaLastBroadcastLen: Map<string, number>;
|
||||
deltaLastBroadcastText: Map<string, string>;
|
||||
agentDeltaSentAt: Map<string, number>;
|
||||
bufferedAgentEvents: Map<string, BufferedAgentEvent>;
|
||||
abortedRuns: Map<string, number>;
|
||||
clear: () => void;
|
||||
};
|
||||
@@ -82,6 +91,8 @@ export function createChatRunState(): ChatRunState {
|
||||
const deltaSentAt = new Map<string, number>();
|
||||
const deltaLastBroadcastLen = new Map<string, number>();
|
||||
const deltaLastBroadcastText = new Map<string, string>();
|
||||
const agentDeltaSentAt = new Map<string, number>();
|
||||
const bufferedAgentEvents = new Map<string, BufferedAgentEvent>();
|
||||
const abortedRuns = new Map<string, number>();
|
||||
|
||||
const clear = () => {
|
||||
@@ -91,6 +102,8 @@ export function createChatRunState(): ChatRunState {
|
||||
deltaSentAt.clear();
|
||||
deltaLastBroadcastLen.clear();
|
||||
deltaLastBroadcastText.clear();
|
||||
agentDeltaSentAt.clear();
|
||||
bufferedAgentEvents.clear();
|
||||
abortedRuns.clear();
|
||||
};
|
||||
|
||||
@@ -101,6 +114,8 @@ export function createChatRunState(): ChatRunState {
|
||||
deltaSentAt,
|
||||
deltaLastBroadcastLen,
|
||||
deltaLastBroadcastText,
|
||||
agentDeltaSentAt,
|
||||
bufferedAgentEvents,
|
||||
abortedRuns,
|
||||
clear,
|
||||
};
|
||||
|
||||
@@ -141,10 +141,18 @@ describe("agent event handler", () => {
|
||||
return broadcast.mock.calls.filter(([event]) => event === "chat");
|
||||
}
|
||||
|
||||
function agentBroadcastCalls(broadcast: ReturnType<typeof vi.fn>) {
|
||||
return broadcast.mock.calls.filter(([event]) => event === "agent");
|
||||
}
|
||||
|
||||
function sessionChatCalls(nodeSendToSession: ReturnType<typeof vi.fn>) {
|
||||
return nodeSendToSession.mock.calls.filter(([, event]) => event === "chat");
|
||||
}
|
||||
|
||||
function sessionAgentCalls(nodeSendToSession: ReturnType<typeof vi.fn>) {
|
||||
return nodeSendToSession.mock.calls.filter(([, event]) => event === "agent");
|
||||
}
|
||||
|
||||
function requireCall<T>(call: T | undefined, label: string): T {
|
||||
if (call === undefined) {
|
||||
throw new Error(`expected ${label}`);
|
||||
@@ -393,6 +401,300 @@ describe("agent event handler", () => {
|
||||
nowSpy?.mockRestore();
|
||||
});
|
||||
|
||||
it("coalesces assistant agent events under the chat delta throttle", () => {
|
||||
let now = 10_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
|
||||
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
|
||||
chatRunState.registry.add("run-agent-throttle", {
|
||||
sessionKey: "session-agent-throttle",
|
||||
clientRunId: "client-agent-throttle",
|
||||
});
|
||||
|
||||
for (let i = 0; i < 5; i += 1) {
|
||||
now = 10_000 + i * 20;
|
||||
handler({
|
||||
runId: "run-agent-throttle",
|
||||
seq: i + 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "x".repeat(i + 1), delta: "x" },
|
||||
});
|
||||
}
|
||||
|
||||
const agentCalls = agentBroadcastCalls(broadcast);
|
||||
expect(agentCalls).toHaveLength(1);
|
||||
expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(1);
|
||||
expect(chatBroadcastCalls(broadcast)).toHaveLength(1);
|
||||
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(1);
|
||||
expect((agentCalls[0]?.[1] as { data?: { text?: string } }).data?.text).toBe("x");
|
||||
nowSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("flushes coalesced assistant agent text before lifecycle end", () => {
|
||||
let now = 20_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
|
||||
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
|
||||
chatRunState.registry.add("run-agent-flush", {
|
||||
sessionKey: "session-agent-flush",
|
||||
clientRunId: "client-agent-flush",
|
||||
});
|
||||
|
||||
handler({
|
||||
runId: "run-agent-flush",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Hello", delta: "Hello" },
|
||||
});
|
||||
now = 20_050;
|
||||
handler({
|
||||
runId: "run-agent-flush",
|
||||
seq: 2,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Hello world", delta: " world" },
|
||||
});
|
||||
now = 20_090;
|
||||
handler({
|
||||
runId: "run-agent-flush",
|
||||
seq: 3,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Hello world!", delta: "!" },
|
||||
});
|
||||
handler({
|
||||
runId: "run-agent-flush",
|
||||
seq: 4,
|
||||
stream: "lifecycle",
|
||||
ts: Date.now(),
|
||||
data: { phase: "end" },
|
||||
});
|
||||
|
||||
const agentCalls = agentBroadcastCalls(broadcast);
|
||||
expect(agentCalls).toHaveLength(3);
|
||||
expect((agentCalls[0]?.[1] as { data?: { text?: string } }).data?.text).toBe("Hello");
|
||||
expect((agentCalls[1]?.[1] as { data?: { delta?: string } }).data?.delta).toBe(" world!");
|
||||
expect((agentCalls[1]?.[1] as { data?: { text?: string } }).data?.text).toBe("Hello world!");
|
||||
expect((agentCalls[1]?.[1] as { seq?: number }).seq).toBe(3);
|
||||
expect((agentCalls[2]?.[1] as { stream?: string; data?: { phase?: string } }).stream).toBe(
|
||||
"lifecycle",
|
||||
);
|
||||
expect((agentCalls[2]?.[1] as { data?: { phase?: string } }).data?.phase).toBe("end");
|
||||
expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(3);
|
||||
nowSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("flushes pending assistant agent deltas before post-window text", () => {
|
||||
let now = 22_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
|
||||
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
|
||||
chatRunState.registry.add("run-agent-window", {
|
||||
sessionKey: "session-agent-window",
|
||||
clientRunId: "client-agent-window",
|
||||
});
|
||||
|
||||
handler({
|
||||
runId: "run-agent-window",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Hel", delta: "Hel" },
|
||||
});
|
||||
now = 22_050;
|
||||
handler({
|
||||
runId: "run-agent-window",
|
||||
seq: 2,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Hello", delta: "lo" },
|
||||
});
|
||||
now = 22_200;
|
||||
handler({
|
||||
runId: "run-agent-window",
|
||||
seq: 3,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Hello!", delta: "!" },
|
||||
});
|
||||
|
||||
const agentCalls = agentBroadcastCalls(broadcast);
|
||||
expect(agentCalls).toHaveLength(3);
|
||||
expect((agentCalls[0]?.[1] as { data?: { delta?: string } }).data?.delta).toBe("Hel");
|
||||
expect((agentCalls[1]?.[1] as { data?: { delta?: string } }).data?.delta).toBe("lo");
|
||||
expect((agentCalls[1]?.[1] as { seq?: number }).seq).toBe(2);
|
||||
expect((agentCalls[2]?.[1] as { data?: { delta?: string } }).data?.delta).toBe("!");
|
||||
expect((agentCalls[2]?.[1] as { seq?: number }).seq).toBe(3);
|
||||
expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(3);
|
||||
nowSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("flushes older cross-stream agent deltas before immediate text", () => {
|
||||
let now = 23_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
|
||||
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
|
||||
chatRunState.registry.add("run-agent-cross-stream", {
|
||||
sessionKey: "session-agent-cross-stream",
|
||||
clientRunId: "client-agent-cross-stream",
|
||||
});
|
||||
|
||||
handler({
|
||||
runId: "run-agent-cross-stream",
|
||||
seq: 1,
|
||||
stream: "thinking",
|
||||
ts: Date.now(),
|
||||
data: { text: "Think", delta: "Think" },
|
||||
});
|
||||
now = 23_050;
|
||||
handler({
|
||||
runId: "run-agent-cross-stream",
|
||||
seq: 2,
|
||||
stream: "thinking",
|
||||
ts: Date.now(),
|
||||
data: { text: "Thinking", delta: "ing" },
|
||||
});
|
||||
now = 23_080;
|
||||
handler({
|
||||
runId: "run-agent-cross-stream",
|
||||
seq: 3,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Answer", delta: "Answer" },
|
||||
});
|
||||
|
||||
const agentCalls = agentBroadcastCalls(broadcast);
|
||||
expect(agentCalls.map(([, payload]) => (payload as { seq?: number }).seq)).toEqual([1, 2, 3]);
|
||||
expect(agentCalls.map(([, payload]) => (payload as { stream?: string }).stream)).toEqual([
|
||||
"thinking",
|
||||
"thinking",
|
||||
"assistant",
|
||||
]);
|
||||
expect((agentCalls[1]?.[1] as { data?: { delta?: string } }).data?.delta).toBe("ing");
|
||||
expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(3);
|
||||
nowSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("does not let lifecycle start throttle the first assistant agent event", () => {
|
||||
let now = 25_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
|
||||
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
|
||||
chatRunState.registry.add("run-agent-start", {
|
||||
sessionKey: "session-agent-start",
|
||||
clientRunId: "client-agent-start",
|
||||
});
|
||||
|
||||
handler({
|
||||
runId: "run-agent-start",
|
||||
seq: 1,
|
||||
stream: "lifecycle",
|
||||
ts: Date.now(),
|
||||
data: { phase: "start" },
|
||||
});
|
||||
now = 25_050;
|
||||
handler({
|
||||
runId: "run-agent-start",
|
||||
seq: 2,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Hello", delta: "Hello" },
|
||||
});
|
||||
|
||||
const agentCalls = agentBroadcastCalls(broadcast);
|
||||
expect(agentCalls).toHaveLength(2);
|
||||
expect((agentCalls[0]?.[1] as { stream?: string }).stream).toBe("lifecycle");
|
||||
expect((agentCalls[1]?.[1] as { data?: { text?: string } }).data?.text).toBe("Hello");
|
||||
expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(2);
|
||||
nowSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("coalesces thinking agent events under the chat delta throttle", () => {
|
||||
let now = 27_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
|
||||
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
|
||||
chatRunState.registry.add("run-agent-thinking", {
|
||||
sessionKey: "session-agent-thinking",
|
||||
clientRunId: "client-agent-thinking",
|
||||
});
|
||||
|
||||
for (let i = 0; i < 5; i += 1) {
|
||||
now = 27_000 + i * 20;
|
||||
handler({
|
||||
runId: "run-agent-thinking",
|
||||
seq: i + 1,
|
||||
stream: "thinking",
|
||||
ts: Date.now(),
|
||||
data: { text: "t".repeat(i + 1), delta: "t" },
|
||||
});
|
||||
}
|
||||
|
||||
const agentCalls = agentBroadcastCalls(broadcast);
|
||||
expect(agentCalls).toHaveLength(1);
|
||||
expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(1);
|
||||
expect((agentCalls[0]?.[1] as { stream?: string }).stream).toBe("thinking");
|
||||
expect((agentCalls[0]?.[1] as { data?: { text?: string } }).data?.text).toBe("t");
|
||||
nowSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("does not drop non-cumulative assistant agent events while coalescing text", () => {
|
||||
let now = 30_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
|
||||
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
|
||||
chatRunState.registry.add("run-agent-media", {
|
||||
sessionKey: "session-agent-media",
|
||||
clientRunId: "client-agent-media",
|
||||
});
|
||||
|
||||
handler({
|
||||
runId: "run-agent-media",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Look", delta: "Look" },
|
||||
});
|
||||
now = 30_050;
|
||||
handler({
|
||||
runId: "run-agent-media",
|
||||
seq: 2,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Look", delta: "", mediaUrls: ["https://example.test/image.png"] },
|
||||
});
|
||||
now = 30_070;
|
||||
handler({
|
||||
runId: "run-agent-media",
|
||||
seq: 3,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Look elsewhere", delta: "", replace: true },
|
||||
});
|
||||
now = 30_090;
|
||||
handler({
|
||||
runId: "run-agent-media",
|
||||
seq: 4,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "Look elsewhere now", delta: " now" },
|
||||
});
|
||||
handler({
|
||||
runId: "run-agent-media",
|
||||
seq: 5,
|
||||
stream: "lifecycle",
|
||||
ts: Date.now(),
|
||||
data: { phase: "end" },
|
||||
});
|
||||
|
||||
const agentCalls = agentBroadcastCalls(broadcast);
|
||||
expect(agentCalls).toHaveLength(5);
|
||||
expect((agentCalls[1]?.[1] as { data?: { mediaUrls?: string[] } }).data?.mediaUrls).toEqual([
|
||||
"https://example.test/image.png",
|
||||
]);
|
||||
expect((agentCalls[2]?.[1] as { data?: { replace?: boolean } }).data?.replace).toBe(true);
|
||||
expect((agentCalls[3]?.[1] as { data?: { text?: string } }).data?.text).toBe(
|
||||
"Look elsewhere now",
|
||||
);
|
||||
expect(sessionAgentCalls(nodeSendToSession)).toHaveLength(5);
|
||||
nowSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("strips inline directives from assistant chat events", () => {
|
||||
const { broadcast, nodeSendToSession, nowSpy } = emitRun1AssistantText(
|
||||
createHarness({ now: 1_000 }),
|
||||
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
shouldSuppressAssistantEventForLiveChat,
|
||||
} from "./live-chat-projector.js";
|
||||
import type {
|
||||
BufferedAgentEvent,
|
||||
ChatRunState,
|
||||
SessionEventSubscriberRegistry,
|
||||
ToolEventRecipientRegistry,
|
||||
@@ -230,12 +231,31 @@ export function createAgentEventHandler({
|
||||
}: AgentEventHandlerOptions) {
|
||||
const pendingTerminalLifecycleErrors = new Map<string, NodeJS.Timeout>();
|
||||
|
||||
type AgentTextThrottleStream = "assistant" | "thinking";
|
||||
|
||||
const agentTextThrottleKey = (clientRunId: string, stream: AgentTextThrottleStream) =>
|
||||
`${clientRunId}:${stream}`;
|
||||
|
||||
const agentTextThrottleKeys = (clientRunId: string) => [
|
||||
clientRunId,
|
||||
agentTextThrottleKey(clientRunId, "assistant"),
|
||||
agentTextThrottleKey(clientRunId, "thinking"),
|
||||
];
|
||||
|
||||
const clearAgentTextThrottleState = (clientRunId: string) => {
|
||||
for (const key of agentTextThrottleKeys(clientRunId)) {
|
||||
chatRunState.agentDeltaSentAt.delete(key);
|
||||
chatRunState.bufferedAgentEvents.delete(key);
|
||||
}
|
||||
};
|
||||
|
||||
const clearBufferedChatState = (clientRunId: string) => {
|
||||
chatRunState.rawBuffers.delete(clientRunId);
|
||||
chatRunState.buffers.delete(clientRunId);
|
||||
chatRunState.deltaSentAt.delete(clientRunId);
|
||||
chatRunState.deltaLastBroadcastLen.delete(clientRunId);
|
||||
chatRunState.deltaLastBroadcastText.delete(clientRunId);
|
||||
clearAgentTextThrottleState(clientRunId);
|
||||
};
|
||||
|
||||
const clearPendingTerminalLifecycleError = (runId: string) => {
|
||||
@@ -410,6 +430,7 @@ export function createAgentEventHandler({
|
||||
}
|
||||
|
||||
toolEventRecipients.markFinal(evt.runId);
|
||||
clearBufferedChatState(clientRunId);
|
||||
clearAgentRunContext(evt.runId);
|
||||
agentRunSeq.delete(evt.runId);
|
||||
agentRunSeq.delete(clientRunId);
|
||||
@@ -601,6 +622,7 @@ export function createAgentEventHandler({
|
||||
chatRunState.rawBuffers.delete(clientRunId);
|
||||
chatRunState.buffers.delete(clientRunId);
|
||||
chatRunState.deltaSentAt.delete(clientRunId);
|
||||
clearAgentTextThrottleState(clientRunId);
|
||||
const spawnedBy = resolveSpawnedBy(sessionKey);
|
||||
if (jobState === "done") {
|
||||
const payload = {
|
||||
@@ -636,6 +658,123 @@ export function createAgentEventHandler({
|
||||
nodeSendToSession(sessionKey, "chat", payload);
|
||||
};
|
||||
|
||||
const sendAgentPayload = (
|
||||
sessionKey: string | undefined,
|
||||
payload: AgentEventPayload & { spawnedBy?: string },
|
||||
) => {
|
||||
broadcast("agent", payload);
|
||||
if (sessionKey) {
|
||||
nodeSendToSession(sessionKey, "agent", payload);
|
||||
}
|
||||
};
|
||||
|
||||
const flushBufferedAgentDeltaIfNeeded = (
|
||||
clientRunId: string,
|
||||
stream?: AgentTextThrottleStream,
|
||||
) => {
|
||||
const keys = stream
|
||||
? [agentTextThrottleKey(clientRunId, stream)]
|
||||
: agentTextThrottleKeys(clientRunId);
|
||||
const bufferedEntries = keys.flatMap((key) => {
|
||||
const buffered = chatRunState.bufferedAgentEvents.get(key);
|
||||
if (!buffered) {
|
||||
return [];
|
||||
}
|
||||
return [{ key, buffered }];
|
||||
});
|
||||
bufferedEntries.sort((a, b) => a.buffered.payload.seq - b.buffered.payload.seq);
|
||||
for (const { key, buffered } of bufferedEntries) {
|
||||
sendAgentPayload(buffered.sessionKey, buffered.payload);
|
||||
chatRunState.bufferedAgentEvents.delete(key);
|
||||
chatRunState.agentDeltaSentAt.set(key, Date.now());
|
||||
}
|
||||
return bufferedEntries.length > 0;
|
||||
};
|
||||
|
||||
const resolveAgentTextThrottleStream = (
|
||||
evt: AgentEventPayload,
|
||||
): AgentTextThrottleStream | null => {
|
||||
if (evt.stream === "assistant") {
|
||||
return "assistant";
|
||||
}
|
||||
if (evt.stream === "thinking") {
|
||||
return "thinking";
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
const isAgentTextThrottleEvent = (evt: AgentEventPayload) =>
|
||||
resolveAgentTextThrottleStream(evt) !== null && typeof evt.data?.text === "string";
|
||||
|
||||
const shouldCoalesceAgentTextEvent = (evt: AgentEventPayload) =>
|
||||
isAgentTextThrottleEvent(evt) &&
|
||||
typeof evt.data.delta === "string" &&
|
||||
evt.data.delta.length > 0 &&
|
||||
!(Array.isArray(evt.data.mediaUrls) && evt.data.mediaUrls.length > 0) &&
|
||||
typeof evt.data.mediaUrl !== "string" &&
|
||||
evt.data.replace !== true &&
|
||||
(evt.stream !== "assistant" || !shouldSuppressAssistantEventForLiveChat(evt.data));
|
||||
|
||||
const shouldAdvanceAgentTextThrottle = (evt: AgentEventPayload) =>
|
||||
isAgentTextThrottleEvent(evt) &&
|
||||
(typeof evt.data.delta === "string" || evt.data.replace === true);
|
||||
|
||||
const buildBufferedAgentEvent = (
|
||||
sessionKey: string | undefined,
|
||||
payload: AgentEventPayload & { spawnedBy?: string },
|
||||
): BufferedAgentEvent => (sessionKey ? { sessionKey, payload } : { payload });
|
||||
|
||||
const mergeBufferedAgentPayload = (
|
||||
previous: BufferedAgentEvent,
|
||||
next: BufferedAgentEvent,
|
||||
): BufferedAgentEvent => {
|
||||
if (previous.payload.stream !== next.payload.stream) {
|
||||
return next;
|
||||
}
|
||||
const previousDelta = previous.payload.data.delta;
|
||||
const nextDelta = next.payload.data.delta;
|
||||
if (typeof previousDelta !== "string" || typeof nextDelta !== "string") {
|
||||
return next;
|
||||
}
|
||||
return {
|
||||
...next,
|
||||
payload: {
|
||||
...next.payload,
|
||||
data: {
|
||||
...next.payload.data,
|
||||
delta: `${previousDelta}${nextDelta}`,
|
||||
},
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
const sendOrBufferAgentTextEvent = (
|
||||
clientRunId: string,
|
||||
sessionKey: string | undefined,
|
||||
payload: AgentEventPayload & { spawnedBy?: string },
|
||||
) => {
|
||||
const stream = resolveAgentTextThrottleStream(payload);
|
||||
if (!stream) {
|
||||
sendAgentPayload(sessionKey, payload);
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
const key = agentTextThrottleKey(clientRunId, stream);
|
||||
const last = chatRunState.agentDeltaSentAt.get(key);
|
||||
if (last !== undefined && now - last < 150) {
|
||||
const nextBuffered = buildBufferedAgentEvent(sessionKey, payload);
|
||||
const buffered = chatRunState.bufferedAgentEvents.get(key);
|
||||
chatRunState.bufferedAgentEvents.set(
|
||||
key,
|
||||
buffered ? mergeBufferedAgentPayload(buffered, nextBuffered) : nextBuffered,
|
||||
);
|
||||
return;
|
||||
}
|
||||
flushBufferedAgentDeltaIfNeeded(clientRunId);
|
||||
sendAgentPayload(sessionKey, payload);
|
||||
chatRunState.agentDeltaSentAt.set(key, now);
|
||||
};
|
||||
|
||||
const resolveToolVerboseLevel = (runId: string, sessionKey?: string) => {
|
||||
const runContext = getAgentRunContext(runId);
|
||||
const runVerbose = normalizeVerboseLevel(runContext?.verboseLevel);
|
||||
@@ -702,6 +841,7 @@ export function createAgentEventHandler({
|
||||
const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off";
|
||||
const suppressHeartbeatToolEvents =
|
||||
isToolEvent && shouldSuppressHeartbeatToolEvents(clientRunId, evt.runId);
|
||||
const shouldCoalesceAgentEvent = shouldCoalesceAgentTextEvent(evt);
|
||||
// Channel/node subscribers respect verbose; authenticated Control UI
|
||||
// recipients need tool result payloads to render live tool cards.
|
||||
const channelToolPayload =
|
||||
@@ -714,6 +854,7 @@ export function createAgentEventHandler({
|
||||
})()
|
||||
: agentPayload;
|
||||
if (last > 0 && evt.seq !== last + 1 && isControlUiVisible) {
|
||||
flushBufferedAgentDeltaIfNeeded(clientRunId);
|
||||
broadcast("agent", {
|
||||
runId: eventRunId,
|
||||
stream: "error",
|
||||
@@ -741,6 +882,7 @@ export function createAgentEventHandler({
|
||||
!suppressHeartbeatToolEvents
|
||||
) {
|
||||
flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq);
|
||||
flushBufferedAgentDeltaIfNeeded(clientRunId);
|
||||
}
|
||||
// Always broadcast tool events to registered WS recipients with
|
||||
// tool-events capability, regardless of verboseLevel. The verbose
|
||||
@@ -772,27 +914,40 @@ export function createAgentEventHandler({
|
||||
}
|
||||
} else {
|
||||
const itemPhase = isItemEvent && typeof evt.data?.phase === "string" ? evt.data.phase : "";
|
||||
if (itemPhase === "start" && isControlUiVisible && sessionKey && !isAborted) {
|
||||
flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq);
|
||||
if (itemPhase === "start" && isControlUiVisible && !isAborted) {
|
||||
if (sessionKey) {
|
||||
flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq);
|
||||
}
|
||||
flushBufferedAgentDeltaIfNeeded(clientRunId);
|
||||
}
|
||||
if (isControlUiVisible) {
|
||||
broadcast("agent", agentPayload);
|
||||
if (shouldCoalesceAgentEvent) {
|
||||
sendOrBufferAgentTextEvent(clientRunId, sessionKey, agentPayload);
|
||||
} else {
|
||||
flushBufferedAgentDeltaIfNeeded(clientRunId);
|
||||
sendAgentPayload(sessionKey, agentPayload);
|
||||
const textThrottleStream = resolveAgentTextThrottleStream(evt);
|
||||
if (textThrottleStream && shouldAdvanceAgentTextThrottle(evt)) {
|
||||
chatRunState.agentDeltaSentAt.set(
|
||||
agentTextThrottleKey(clientRunId, textThrottleStream),
|
||||
Date.now(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isControlUiVisible && sessionKey) {
|
||||
// Send non-heartbeat tool events to node/channel subscribers only when
|
||||
// verbose is enabled; WS clients already received the event above.
|
||||
if (!isToolEvent || (!suppressHeartbeatToolEvents && toolVerbose !== "off")) {
|
||||
// Send tool events to node/channel subscribers only when verbose is enabled;
|
||||
// WS clients already received the event above via broadcastToConnIds.
|
||||
if (isToolEvent && !suppressHeartbeatToolEvents && toolVerbose !== "off") {
|
||||
nodeSendToSession(
|
||||
sessionKey,
|
||||
"agent",
|
||||
isToolEvent
|
||||
? projectToolSearchCodeEventForChannelPayload({
|
||||
...channelToolPayload,
|
||||
...buildSessionEventSnapshot(sessionKey),
|
||||
})
|
||||
: agentPayload,
|
||||
projectToolSearchCodeEventForChannelPayload({
|
||||
...channelToolPayload,
|
||||
...buildSessionEventSnapshot(sessionKey),
|
||||
}),
|
||||
);
|
||||
}
|
||||
if (
|
||||
|
||||
@@ -41,7 +41,12 @@ function createMaintenanceTimerDeps() {
|
||||
logHealth: { error: () => {} },
|
||||
dedupe: new Map(),
|
||||
chatAbortControllers: new Map(),
|
||||
chatRunState: { abortedRuns: new Map(), deltaLastBroadcastText: new Map() },
|
||||
chatRunState: {
|
||||
abortedRuns: new Map(),
|
||||
deltaLastBroadcastText: new Map(),
|
||||
agentDeltaSentAt: new Map(),
|
||||
bufferedAgentEvents: new Map(),
|
||||
},
|
||||
chatRunBuffers: new Map(),
|
||||
chatDeltaSentAt: new Map(),
|
||||
chatDeltaLastBroadcastLen: new Map(),
|
||||
@@ -209,6 +214,33 @@ describe("startGatewayMaintenanceTimers", () => {
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("sweeps orphaned stale agent throttle state once the abort controller is gone", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));
|
||||
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
|
||||
const deps = createMaintenanceTimerDeps();
|
||||
const runId = "run-agent-orphaned";
|
||||
deps.chatRunState.agentDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1);
|
||||
deps.chatRunState.bufferedAgentEvents.set(runId, {
|
||||
payload: {
|
||||
runId,
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "buffer", delta: "buffer" },
|
||||
},
|
||||
});
|
||||
|
||||
const timers = startGatewayMaintenanceTimers(deps);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60_000);
|
||||
|
||||
expect(deps.chatRunState.agentDeltaSentAt.has(runId)).toBe(false);
|
||||
expect(deps.chatRunState.bufferedAgentEvents.has(runId)).toBe(false);
|
||||
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
it("clears deltaLastBroadcastLen when aborted runs age out", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));
|
||||
@@ -220,6 +252,16 @@ describe("startGatewayMaintenanceTimers", () => {
|
||||
deps.chatDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1);
|
||||
deps.chatDeltaLastBroadcastLen.set(runId, 6);
|
||||
deps.chatRunState.deltaLastBroadcastText.set(runId, "buffer");
|
||||
deps.chatRunState.agentDeltaSentAt.set(runId, Date.now() - ABORTED_RUN_TTL_MS - 1);
|
||||
deps.chatRunState.bufferedAgentEvents.set(runId, {
|
||||
payload: {
|
||||
runId,
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "buffer", delta: "buffer" },
|
||||
},
|
||||
});
|
||||
|
||||
const timers = startGatewayMaintenanceTimers(deps);
|
||||
|
||||
@@ -230,6 +272,8 @@ describe("startGatewayMaintenanceTimers", () => {
|
||||
expect(deps.chatDeltaSentAt.has(runId)).toBe(false);
|
||||
expect(deps.chatDeltaLastBroadcastLen.has(runId)).toBe(false);
|
||||
expect(deps.chatRunState.deltaLastBroadcastText.has(runId)).toBe(false);
|
||||
expect(deps.chatRunState.agentDeltaSentAt.has(runId)).toBe(false);
|
||||
expect(deps.chatRunState.bufferedAgentEvents.has(runId)).toBe(false);
|
||||
|
||||
stopMaintenanceTimers(timers);
|
||||
});
|
||||
|
||||
@@ -3,6 +3,7 @@ import { sweepStaleRunContexts } from "../infra/agent-events.js";
|
||||
import { cleanOldMedia } from "../media/store.js";
|
||||
import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js";
|
||||
import { pruneStaleControlPlaneBuckets } from "./control-plane-rate-limit.js";
|
||||
import type { ChatRunState } from "./server-chat-state.js";
|
||||
import type { ChatRunEntry } from "./server-chat.js";
|
||||
import {
|
||||
DEDUPE_MAX,
|
||||
@@ -33,7 +34,10 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
logHealth: { error: (msg: string) => void };
|
||||
dedupe: Map<string, DedupeEntry>;
|
||||
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
|
||||
chatRunState: { abortedRuns: Map<string, number>; deltaLastBroadcastText: Map<string, string> };
|
||||
chatRunState: Pick<
|
||||
ChatRunState,
|
||||
"abortedRuns" | "deltaLastBroadcastText" | "agentDeltaSentAt" | "bufferedAgentEvents"
|
||||
>;
|
||||
chatRunBuffers: Map<string, string>;
|
||||
chatDeltaSentAt: Map<string, number>;
|
||||
chatDeltaLastBroadcastLen: Map<string, number>;
|
||||
@@ -127,6 +131,25 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
}
|
||||
}
|
||||
|
||||
const clearAgentThrottleStateForRun = (runId: string) => {
|
||||
params.chatRunState.agentDeltaSentAt.delete(runId);
|
||||
params.chatRunState.agentDeltaSentAt.delete(`${runId}:assistant`);
|
||||
params.chatRunState.agentDeltaSentAt.delete(`${runId}:thinking`);
|
||||
params.chatRunState.bufferedAgentEvents.delete(runId);
|
||||
params.chatRunState.bufferedAgentEvents.delete(`${runId}:assistant`);
|
||||
params.chatRunState.bufferedAgentEvents.delete(`${runId}:thinking`);
|
||||
};
|
||||
|
||||
const resolveAgentThrottleRunId = (key: string) => {
|
||||
if (key.endsWith(":assistant")) {
|
||||
return key.slice(0, -":assistant".length);
|
||||
}
|
||||
if (key.endsWith(":thinking")) {
|
||||
return key.slice(0, -":thinking".length);
|
||||
}
|
||||
return key;
|
||||
};
|
||||
|
||||
for (const [runId, entry] of params.chatAbortControllers) {
|
||||
if (now <= entry.expiresAtMs) {
|
||||
continue;
|
||||
@@ -138,6 +161,8 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
chatDeltaSentAt: params.chatDeltaSentAt,
|
||||
chatDeltaLastBroadcastLen: params.chatDeltaLastBroadcastLen,
|
||||
chatDeltaLastBroadcastText: params.chatRunState.deltaLastBroadcastText,
|
||||
agentDeltaSentAt: params.chatRunState.agentDeltaSentAt,
|
||||
bufferedAgentEvents: params.chatRunState.bufferedAgentEvents,
|
||||
chatAbortedRuns: params.chatRunState.abortedRuns,
|
||||
removeChatRun: params.removeChatRun,
|
||||
agentRunSeq: params.agentRunSeq,
|
||||
@@ -158,6 +183,7 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
params.chatDeltaSentAt.delete(runId);
|
||||
params.chatDeltaLastBroadcastLen.delete(runId);
|
||||
params.chatRunState.deltaLastBroadcastText.delete(runId);
|
||||
clearAgentThrottleStateForRun(runId);
|
||||
}
|
||||
|
||||
// Prune expired control-plane rate-limit buckets to prevent unbounded
|
||||
@@ -181,6 +207,21 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
params.chatDeltaSentAt.delete(runId);
|
||||
params.chatDeltaLastBroadcastLen.delete(runId);
|
||||
params.chatRunState.deltaLastBroadcastText.delete(runId);
|
||||
clearAgentThrottleStateForRun(runId);
|
||||
}
|
||||
for (const [key, lastSentAt] of params.chatRunState.agentDeltaSentAt) {
|
||||
const runId = resolveAgentThrottleRunId(key);
|
||||
if (params.chatRunState.abortedRuns.has(runId)) {
|
||||
continue;
|
||||
}
|
||||
if (params.chatAbortControllers.has(runId)) {
|
||||
continue;
|
||||
}
|
||||
if (now - lastSentAt <= ABORTED_RUN_TTL_MS) {
|
||||
continue;
|
||||
}
|
||||
params.chatRunState.agentDeltaSentAt.delete(key);
|
||||
params.chatRunState.bufferedAgentEvents.delete(key);
|
||||
}
|
||||
// Sweep stale agent run contexts (orphaned when lifecycle end/error is missed).
|
||||
sweepStaleRunContexts();
|
||||
|
||||
@@ -157,6 +157,8 @@ const makeContext = (): GatewayRequestContext =>
|
||||
chatDeltaSentAt: new Map(),
|
||||
chatDeltaLastBroadcastLen: new Map(),
|
||||
chatDeltaLastBroadcastText: new Map(),
|
||||
agentDeltaSentAt: new Map(),
|
||||
bufferedAgentEvents: new Map(),
|
||||
chatAbortedRuns: new Map(),
|
||||
agentRunSeq: new Map(),
|
||||
broadcast: vi.fn(),
|
||||
|
||||
@@ -100,6 +100,45 @@ describe("chat.abort authorization", () => {
|
||||
expect(context.chatAbortControllers.has("run-1")).toBe(false);
|
||||
});
|
||||
|
||||
it("clears agent text throttle state through the real abort caller", async () => {
|
||||
const context = createChatAbortContext({
|
||||
chatAbortControllers: new Map([
|
||||
["run-1", createActiveRun("main", { owner: { connId: "conn-owner", deviceId: "dev-1" } })],
|
||||
]),
|
||||
agentDeltaSentAt: new Map([["run-1:assistant", Date.now()]]),
|
||||
bufferedAgentEvents: new Map([
|
||||
[
|
||||
"run-1:assistant",
|
||||
{
|
||||
payload: {
|
||||
runId: "run-1",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "pending", delta: "pending" },
|
||||
},
|
||||
},
|
||||
],
|
||||
]),
|
||||
});
|
||||
|
||||
const respond = await invokeChatAbortHandler({
|
||||
handler: chatHandlers["chat.abort"],
|
||||
context,
|
||||
request: { sessionKey: "main", runId: "run-1" },
|
||||
client: {
|
||||
connId: "conn-owner",
|
||||
connect: { device: { id: "dev-1" }, scopes: ["operator.write"] },
|
||||
},
|
||||
});
|
||||
|
||||
const [ok, payload] = respond.mock.calls.at(-1) ?? [];
|
||||
expect(ok).toBe(true);
|
||||
expect(payload).toMatchObject({ aborted: true, runIds: ["run-1"] });
|
||||
expect(context.agentDeltaSentAt.has("run-1:assistant")).toBe(false);
|
||||
expect(context.bufferedAgentEvents.has("run-1:assistant")).toBe(false);
|
||||
});
|
||||
|
||||
it("only aborts session-scoped runs owned by the requester", async () => {
|
||||
const context = createChatAbortContext({
|
||||
chatAbortControllers: new Map([
|
||||
|
||||
@@ -27,6 +27,8 @@ type ChatAbortTestContext = Record<string, unknown> & {
|
||||
chatDeltaSentAt: Map<string, number>;
|
||||
chatDeltaLastBroadcastLen: Map<string, number>;
|
||||
chatDeltaLastBroadcastText: Map<string, string>;
|
||||
agentDeltaSentAt: Map<string, number>;
|
||||
bufferedAgentEvents: Map<string, unknown>;
|
||||
chatAbortedRuns: Map<string, number>;
|
||||
removeChatRun: (...args: unknown[]) => { sessionKey: string; clientRunId: string } | undefined;
|
||||
agentRunSeq: Map<string, number>;
|
||||
@@ -46,6 +48,8 @@ export function createChatAbortContext(
|
||||
chatDeltaSentAt: new Map(),
|
||||
chatDeltaLastBroadcastLen: new Map(),
|
||||
chatDeltaLastBroadcastText: new Map(),
|
||||
agentDeltaSentAt: new Map(),
|
||||
bufferedAgentEvents: new Map(),
|
||||
chatAbortedRuns: new Map<string, number>(),
|
||||
removeChatRun: vi
|
||||
.fn()
|
||||
|
||||
@@ -503,6 +503,8 @@ function createChatContext(): Pick<
|
||||
| "chatDeltaSentAt"
|
||||
| "chatDeltaLastBroadcastLen"
|
||||
| "chatDeltaLastBroadcastText"
|
||||
| "agentDeltaSentAt"
|
||||
| "bufferedAgentEvents"
|
||||
| "chatAbortedRuns"
|
||||
| "addChatRun"
|
||||
| "removeChatRun"
|
||||
@@ -520,6 +522,8 @@ function createChatContext(): Pick<
|
||||
chatDeltaSentAt: new Map(),
|
||||
chatDeltaLastBroadcastLen: new Map(),
|
||||
chatDeltaLastBroadcastText: new Map(),
|
||||
agentDeltaSentAt: new Map(),
|
||||
bufferedAgentEvents: new Map(),
|
||||
chatAbortedRuns: new Map(),
|
||||
addChatRun: vi.fn(),
|
||||
removeChatRun: vi.fn(),
|
||||
|
||||
@@ -1496,6 +1496,8 @@ function createChatAbortOps(context: GatewayRequestContext): ChatAbortOps {
|
||||
chatDeltaSentAt: context.chatDeltaSentAt,
|
||||
chatDeltaLastBroadcastLen: context.chatDeltaLastBroadcastLen,
|
||||
chatDeltaLastBroadcastText: context.chatDeltaLastBroadcastText,
|
||||
agentDeltaSentAt: context.agentDeltaSentAt,
|
||||
bufferedAgentEvents: context.bufferedAgentEvents,
|
||||
chatAbortedRuns: context.chatAbortedRuns,
|
||||
removeChatRun: context.removeChatRun,
|
||||
agentRunSeq: context.agentRunSeq,
|
||||
|
||||
@@ -13,6 +13,7 @@ import type { PluginNodeCapabilitySurface } from "../plugin-node-capability.js";
|
||||
import type { ConnectParams, ErrorShape, RequestFrame } from "../protocol/index.js";
|
||||
import type { GatewayBroadcastFn, GatewayBroadcastToConnIdsFn } from "../server-broadcast-types.js";
|
||||
import type { ChannelRuntimeSnapshot } from "../server-channel-runtime.types.js";
|
||||
import type { BufferedAgentEvent } from "../server-chat-state.js";
|
||||
import type { DedupeEntry } from "../server-shared.js";
|
||||
import type { GatewayEventLoopHealth } from "../server/event-loop-health.js";
|
||||
|
||||
@@ -76,6 +77,8 @@ export type GatewayRequestContext = {
|
||||
chatDeltaSentAt: Map<string, number>;
|
||||
chatDeltaLastBroadcastLen: Map<string, number>;
|
||||
chatDeltaLastBroadcastText: Map<string, string>;
|
||||
agentDeltaSentAt: Map<string, number>;
|
||||
bufferedAgentEvents: Map<string, BufferedAgentEvent>;
|
||||
addChatRun: (sessionId: string, entry: { sessionKey: string; clientRunId: string }) => void;
|
||||
removeChatRun: (
|
||||
sessionId: string,
|
||||
|
||||
@@ -44,6 +44,8 @@ describe("createGatewayRequestContext", () => {
|
||||
chatDeltaSentAt: new Map(),
|
||||
chatDeltaLastBroadcastLen: new Map(),
|
||||
chatDeltaLastBroadcastText: new Map(),
|
||||
agentDeltaSentAt: new Map(),
|
||||
bufferedAgentEvents: new Map(),
|
||||
addChatRun: vi.fn(),
|
||||
removeChatRun: vi.fn(),
|
||||
subscribeSessionEvents: vi.fn(),
|
||||
|
||||
@@ -39,6 +39,8 @@ type GatewayRequestContextParams = {
|
||||
chatDeltaSentAt: GatewayRequestContext["chatDeltaSentAt"];
|
||||
chatDeltaLastBroadcastLen: GatewayRequestContext["chatDeltaLastBroadcastLen"];
|
||||
chatDeltaLastBroadcastText: GatewayRequestContext["chatDeltaLastBroadcastText"];
|
||||
agentDeltaSentAt: GatewayRequestContext["agentDeltaSentAt"];
|
||||
bufferedAgentEvents: GatewayRequestContext["bufferedAgentEvents"];
|
||||
addChatRun: GatewayRequestContext["addChatRun"];
|
||||
removeChatRun: GatewayRequestContext["removeChatRun"];
|
||||
subscribeSessionEvents: GatewayRequestContext["subscribeSessionEvents"];
|
||||
@@ -136,6 +138,8 @@ export function createGatewayRequestContext(
|
||||
chatDeltaSentAt: params.chatDeltaSentAt,
|
||||
chatDeltaLastBroadcastLen: params.chatDeltaLastBroadcastLen,
|
||||
chatDeltaLastBroadcastText: params.chatDeltaLastBroadcastText,
|
||||
agentDeltaSentAt: params.agentDeltaSentAt,
|
||||
bufferedAgentEvents: params.bufferedAgentEvents,
|
||||
addChatRun: params.addChatRun,
|
||||
removeChatRun: params.removeChatRun,
|
||||
subscribeSessionEvents: params.subscribeSessionEvents,
|
||||
|
||||
@@ -48,7 +48,12 @@ describe("startGatewayEarlyRuntime", () => {
|
||||
logHealth: { error: () => {} },
|
||||
dedupe: new Map(),
|
||||
chatAbortControllers: new Map(),
|
||||
chatRunState: { abortedRuns: new Map(), deltaLastBroadcastText: new Map() },
|
||||
chatRunState: {
|
||||
abortedRuns: new Map(),
|
||||
deltaLastBroadcastText: new Map(),
|
||||
agentDeltaSentAt: new Map(),
|
||||
bufferedAgentEvents: new Map(),
|
||||
},
|
||||
chatRunBuffers: new Map(),
|
||||
chatDeltaSentAt: new Map(),
|
||||
chatDeltaLastBroadcastLen: new Map(),
|
||||
|
||||
@@ -1270,6 +1270,8 @@ export async function startGatewayServer(
|
||||
chatDeltaSentAt: chatRunState.deltaSentAt,
|
||||
chatDeltaLastBroadcastLen: chatRunState.deltaLastBroadcastLen,
|
||||
chatDeltaLastBroadcastText: chatRunState.deltaLastBroadcastText,
|
||||
agentDeltaSentAt: chatRunState.agentDeltaSentAt,
|
||||
bufferedAgentEvents: chatRunState.bufferedAgentEvents,
|
||||
addChatRun,
|
||||
removeChatRun,
|
||||
subscribeSessionEvents: sessionEventSubscribers.subscribe,
|
||||
|
||||
@@ -39,6 +39,21 @@ describe("talk realtime gateway relay", () => {
|
||||
const broadcast = vi.fn();
|
||||
const nodeSendToSession = vi.fn();
|
||||
const removeChatRun = vi.fn(() => ({ sessionKey: "main", clientRunId: "run-1" }));
|
||||
const agentDeltaSentAt = new Map([["run-1:assistant", Date.now()]]);
|
||||
const bufferedAgentEvents = new Map([
|
||||
[
|
||||
"run-1:assistant",
|
||||
{
|
||||
payload: {
|
||||
runId: "run-1",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "pending", delta: "pending" },
|
||||
},
|
||||
},
|
||||
],
|
||||
]);
|
||||
const context = {
|
||||
broadcastToConnIds: vi.fn(),
|
||||
broadcast,
|
||||
@@ -59,6 +74,8 @@ describe("talk realtime gateway relay", () => {
|
||||
chatDeltaSentAt: new Map(),
|
||||
chatDeltaLastBroadcastLen: new Map(),
|
||||
chatDeltaLastBroadcastText: new Map(),
|
||||
agentDeltaSentAt,
|
||||
bufferedAgentEvents,
|
||||
chatAbortedRuns: new Map(),
|
||||
removeChatRun,
|
||||
agentRunSeq: new Map(),
|
||||
@@ -83,6 +100,8 @@ describe("talk realtime gateway relay", () => {
|
||||
broadcast,
|
||||
nodeSendToSession,
|
||||
removeChatRun,
|
||||
agentDeltaSentAt,
|
||||
bufferedAgentEvents,
|
||||
session,
|
||||
};
|
||||
}
|
||||
@@ -487,8 +506,15 @@ describe("talk realtime gateway relay", () => {
|
||||
});
|
||||
|
||||
it("aborts linked agent consult runs when the relay turn is cancelled", () => {
|
||||
const { abortController, broadcast, nodeSendToSession, removeChatRun, session } =
|
||||
createAbortableRelayRunFixture();
|
||||
const {
|
||||
abortController,
|
||||
broadcast,
|
||||
nodeSendToSession,
|
||||
removeChatRun,
|
||||
agentDeltaSentAt,
|
||||
bufferedAgentEvents,
|
||||
session,
|
||||
} = createAbortableRelayRunFixture();
|
||||
cancelTalkRealtimeRelayTurn({
|
||||
relaySessionId: session.relaySessionId,
|
||||
connId: "conn-1",
|
||||
@@ -497,16 +523,26 @@ describe("talk realtime gateway relay", () => {
|
||||
|
||||
expect(abortController.signal.aborted).toBe(true);
|
||||
expect(removeChatRun).toHaveBeenCalledWith("run-1", "run-1", "main");
|
||||
expect(agentDeltaSentAt.has("run-1:assistant")).toBe(false);
|
||||
expect(bufferedAgentEvents.has("run-1:assistant")).toBe(false);
|
||||
expectChatAbortPayload(broadcast, "barge-in");
|
||||
expectNodeAbortPayload(nodeSendToSession);
|
||||
});
|
||||
|
||||
it("aborts linked agent consult runs when the relay session closes", () => {
|
||||
const { abortController, broadcast, nodeSendToSession, session } =
|
||||
createAbortableRelayRunFixture();
|
||||
const {
|
||||
abortController,
|
||||
broadcast,
|
||||
nodeSendToSession,
|
||||
agentDeltaSentAt,
|
||||
bufferedAgentEvents,
|
||||
session,
|
||||
} = createAbortableRelayRunFixture();
|
||||
stopTalkRealtimeRelaySession({ relaySessionId: session.relaySessionId, connId: "conn-1" });
|
||||
|
||||
expect(abortController.signal.aborted).toBe(true);
|
||||
expect(agentDeltaSentAt.has("run-1:assistant")).toBe(false);
|
||||
expect(bufferedAgentEvents.has("run-1:assistant")).toBe(false);
|
||||
expectChatAbortPayload(broadcast, "relay-closed");
|
||||
expectNodeAbortPayload(nodeSendToSession);
|
||||
});
|
||||
@@ -517,6 +553,21 @@ describe("talk realtime gateway relay", () => {
|
||||
const broadcast = vi.fn();
|
||||
const nodeSendToSession = vi.fn();
|
||||
const removeChatRun = vi.fn(() => ({ sessionKey: "main", clientRunId: "run-1" }));
|
||||
const agentDeltaSentAt = new Map([["run-1:assistant", Date.now()]]);
|
||||
const bufferedAgentEvents = new Map([
|
||||
[
|
||||
"run-1:assistant",
|
||||
{
|
||||
payload: {
|
||||
runId: "run-1",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "pending", delta: "pending" },
|
||||
},
|
||||
},
|
||||
],
|
||||
]);
|
||||
const provider: RealtimeVoiceProviderPlugin = {
|
||||
id: "relay-test",
|
||||
label: "Relay Test",
|
||||
@@ -555,6 +606,8 @@ describe("talk realtime gateway relay", () => {
|
||||
chatDeltaSentAt: new Map(),
|
||||
chatDeltaLastBroadcastLen: new Map(),
|
||||
chatDeltaLastBroadcastText: new Map(),
|
||||
agentDeltaSentAt,
|
||||
bufferedAgentEvents,
|
||||
chatAbortedRuns: new Map(),
|
||||
removeChatRun,
|
||||
agentRunSeq: new Map(),
|
||||
@@ -577,6 +630,8 @@ describe("talk realtime gateway relay", () => {
|
||||
bridgeRequest?.onClose?.("error");
|
||||
|
||||
expect(abortController.signal.aborted).toBe(true);
|
||||
expect(agentDeltaSentAt.has("run-1:assistant")).toBe(false);
|
||||
expect(bufferedAgentEvents.has("run-1:assistant")).toBe(false);
|
||||
expectChatAbortPayload(broadcast, "relay-closed");
|
||||
expectNodeAbortPayload(nodeSendToSession);
|
||||
});
|
||||
|
||||
@@ -3,6 +3,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { createPluginRecord } from "./loader-records.js";
|
||||
import { createPluginRegistry } from "./registry.js";
|
||||
import { getPluginRuntimeGatewayRequestScope } from "./runtime/gateway-request-scope.js";
|
||||
import { createPluginRuntime } from "./runtime/index.js";
|
||||
import type { PluginRuntime } from "./runtime/types.js";
|
||||
|
||||
function createTestRegistry(runtime: PluginRuntime) {
|
||||
@@ -19,33 +20,41 @@ function createTestRegistry(runtime: PluginRuntime) {
|
||||
}
|
||||
|
||||
describe("plugin registry runtime config scope", () => {
|
||||
it("runs deprecated config helpers with the owning plugin scope", async () => {
|
||||
let loadScope = getPluginRuntimeGatewayRequestScope();
|
||||
let writeScope = getPluginRuntimeGatewayRequestScope();
|
||||
it("runs config helpers with the owning plugin scope", async () => {
|
||||
let currentScope = getPluginRuntimeGatewayRequestScope();
|
||||
let mutateScope = getPluginRuntimeGatewayRequestScope();
|
||||
let replaceScope = getPluginRuntimeGatewayRequestScope();
|
||||
const config = {} as OpenClawConfig;
|
||||
const replaceResult = {
|
||||
previousHash: null,
|
||||
nextHash: "next",
|
||||
} as unknown as Awaited<ReturnType<PluginRuntime["config"]["replaceConfigFile"]>>;
|
||||
const mutateConfigFile: PluginRuntime["config"]["mutateConfigFile"] = async () => ({
|
||||
...replaceResult,
|
||||
result: undefined,
|
||||
});
|
||||
const mutateConfigFile: PluginRuntime["config"]["mutateConfigFile"] = async () => {
|
||||
mutateScope = getPluginRuntimeGatewayRequestScope();
|
||||
return {
|
||||
...replaceResult,
|
||||
result: undefined,
|
||||
};
|
||||
};
|
||||
const replaceConfigFile: PluginRuntime["config"]["replaceConfigFile"] = async () => {
|
||||
replaceScope = getPluginRuntimeGatewayRequestScope();
|
||||
return replaceResult;
|
||||
};
|
||||
const loadConfig: PluginRuntime["config"]["loadConfig"] = () => config;
|
||||
const writeConfigFile: PluginRuntime["config"]["writeConfigFile"] = async () => {};
|
||||
const configRuntime = {
|
||||
current: vi.fn(() => config),
|
||||
mutateConfigFile,
|
||||
replaceConfigFile: async () => replaceResult,
|
||||
loadConfig: vi.fn(() => {
|
||||
loadScope = getPluginRuntimeGatewayRequestScope();
|
||||
current: vi.fn(() => {
|
||||
currentScope = getPluginRuntimeGatewayRequestScope();
|
||||
return config;
|
||||
}),
|
||||
writeConfigFile: vi.fn(async () => {
|
||||
writeScope = getPluginRuntimeGatewayRequestScope();
|
||||
}),
|
||||
mutateConfigFile,
|
||||
replaceConfigFile,
|
||||
loadConfig,
|
||||
writeConfigFile,
|
||||
} satisfies PluginRuntime["config"];
|
||||
const pluginRegistry = createTestRegistry({
|
||||
config: configRuntime,
|
||||
} as unknown as PluginRuntime);
|
||||
const runtime = createPluginRuntime();
|
||||
runtime.config = configRuntime;
|
||||
const pluginRegistry = createTestRegistry(runtime);
|
||||
const record = createPluginRecord({
|
||||
id: "legacy-plugin",
|
||||
name: "Legacy Plugin",
|
||||
@@ -56,14 +65,25 @@ describe("plugin registry runtime config scope", () => {
|
||||
});
|
||||
const api = pluginRegistry.createApi(record, { config });
|
||||
|
||||
expect(api.runtime.config.loadConfig()).toBe(config);
|
||||
await api.runtime.config.writeConfigFile(config);
|
||||
expect(api.runtime.config.current()).toBe(config);
|
||||
await api.runtime.config.mutateConfigFile({
|
||||
afterWrite: { mode: "none", reason: "test" },
|
||||
mutate: () => undefined,
|
||||
});
|
||||
await api.runtime.config.replaceConfigFile({
|
||||
nextConfig: config,
|
||||
afterWrite: { mode: "none", reason: "test" },
|
||||
});
|
||||
|
||||
expect(loadScope).toMatchObject({
|
||||
expect(currentScope).toMatchObject({
|
||||
pluginId: "legacy-plugin",
|
||||
pluginSource: "/plugins/legacy-plugin/index.js",
|
||||
});
|
||||
expect(writeScope).toMatchObject({
|
||||
expect(mutateScope).toMatchObject({
|
||||
pluginId: "legacy-plugin",
|
||||
pluginSource: "/plugins/legacy-plugin/index.js",
|
||||
});
|
||||
expect(replaceScope).toMatchObject({
|
||||
pluginId: "legacy-plugin",
|
||||
pluginSource: "/plugins/legacy-plugin/index.js",
|
||||
});
|
||||
|
||||
@@ -2399,9 +2399,10 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
|
||||
const config = Reflect.get(target, prop, receiver);
|
||||
return {
|
||||
...config,
|
||||
loadConfig: () => runWithPluginScope(() => config.loadConfig()),
|
||||
writeConfigFile: (cfg, options) =>
|
||||
runWithPluginScope(() => config.writeConfigFile(cfg, options)),
|
||||
current: () => runWithPluginScope(() => config.current()),
|
||||
mutateConfigFile: (params) => runWithPluginScope(() => config.mutateConfigFile(params)),
|
||||
replaceConfigFile: (params) =>
|
||||
runWithPluginScope(() => config.replaceConfigFile(params)),
|
||||
} satisfies PluginRuntime["config"];
|
||||
}
|
||||
if (prop === "llm") {
|
||||
|
||||
Reference in New Issue
Block a user