mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-12 09:41:11 +00:00
fix(auto-reply): strip leading NO_REPLY tokens to prevent silent-reply leak (#63068)
* fix(auto-reply): strip leading NO_REPLY tokens to prevent silent-reply leak * fix(auto-reply): preserve substantive NO_REPLY leading text * fix(agents): preserve ACP silent-prefix cumulative deltas * fix(auto-reply): harden silent-token streaming paths * fix(auto-reply): normalize glued silent tokens consistently --------- Co-authored-by: termtek <termtek@ubuntu.tail2b72cd.ts.net>
This commit is contained in:
@@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai
|
||||
- npm packaging: derive required root runtime mirrors from bundled plugin manifests and built root chunks, then install packed release tarballs without the repo `node_modules` so release checks catch missing plugin deps before publish.
|
||||
- Reply/doctor: resolve reply-run SecretRefs before preflight helpers touch config, surface gateway OAuth reauth failures to users, and make `openclaw doctor` call out exact reauth commands.
|
||||
- Android/pairing: clear stale setup-code auth on new QR scans, bootstrap operator and node sessions from fresh pairing, prefer stored device tokens after bootstrap handoff, and pause pairing auto-retry while the app is backgrounded so scan-once Android pairing recovers reliably again. (#63199) Thanks @obviyus.
|
||||
- Auto-reply/NO_REPLY: strip glued leading `NO_REPLY` tokens before reply normalization and ACP-visible streaming so silent sentinel text no longer leaks into user-visible replies while preserving substantive `NO_REPLY ...` text. Thanks @frankekn.
|
||||
|
||||
## 2026.4.8
|
||||
|
||||
|
||||
@@ -2,7 +2,11 @@ import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import { resolveFallbackRetryPrompt, sessionFileHasContent } from "./attempt-execution.js";
|
||||
import {
|
||||
createAcpVisibleTextAccumulator,
|
||||
resolveFallbackRetryPrompt,
|
||||
sessionFileHasContent,
|
||||
} from "./attempt-execution.js";
|
||||
|
||||
describe("resolveFallbackRetryPrompt", () => {
|
||||
const originalBody = "Summarize the quarterly earnings report and highlight key trends.";
|
||||
@@ -157,3 +161,47 @@ describe("sessionFileHasContent", () => {
|
||||
expect(await sessionFileHasContent(link)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("createAcpVisibleTextAccumulator", () => {
|
||||
it("preserves cumulative raw snapshots after stripping a glued NO_REPLY prefix", () => {
|
||||
const acc = createAcpVisibleTextAccumulator();
|
||||
|
||||
expect(acc.consume("NO_REPLYThe user")).toEqual({
|
||||
text: "The user",
|
||||
delta: "The user",
|
||||
});
|
||||
|
||||
expect(acc.consume("NO_REPLYThe user is saying")).toEqual({
|
||||
text: "The user is saying",
|
||||
delta: " is saying",
|
||||
});
|
||||
|
||||
expect(acc.finalize()).toBe("The user is saying");
|
||||
expect(acc.finalizeRaw()).toBe("The user is saying");
|
||||
});
|
||||
|
||||
it("keeps append-only deltas working after stripping a glued NO_REPLY prefix", () => {
|
||||
const acc = createAcpVisibleTextAccumulator();
|
||||
|
||||
expect(acc.consume("NO_REPLYThe user")).toEqual({
|
||||
text: "The user",
|
||||
delta: "The user",
|
||||
});
|
||||
|
||||
expect(acc.consume(" is saying")).toEqual({
|
||||
text: "The user is saying",
|
||||
delta: " is saying",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves punctuation-start text that begins with NO_REPLY-like content", () => {
|
||||
const acc = createAcpVisibleTextAccumulator();
|
||||
|
||||
expect(acc.consume("NO_REPLY: explanation")).toEqual({
|
||||
text: "NO_REPLY: explanation",
|
||||
delta: "NO_REPLY: explanation",
|
||||
});
|
||||
|
||||
expect(acc.finalize()).toBe("NO_REPLY: explanation");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -7,6 +7,8 @@ import {
|
||||
isSilentReplyPrefixText,
|
||||
isSilentReplyText,
|
||||
SILENT_REPLY_TOKEN,
|
||||
startsWithSilentToken,
|
||||
stripLeadingSilentToken,
|
||||
} from "../../auto-reply/tokens.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
import { mergeSessionEntry, type SessionEntry, updateSessionStore } from "../../config/sessions.js";
|
||||
@@ -150,6 +152,7 @@ export function prependInternalEventContext(
|
||||
export function createAcpVisibleTextAccumulator() {
|
||||
let pendingSilentPrefix = "";
|
||||
let visibleText = "";
|
||||
let rawVisibleText = "";
|
||||
const startsWithWordChar = (chunk: string): boolean => /^[\p{L}\p{N}]/u.test(chunk);
|
||||
|
||||
const resolveNextCandidate = (base: string, chunk: string): string => {
|
||||
@@ -169,16 +172,16 @@ export function createAcpVisibleTextAccumulator() {
|
||||
return `${base}${chunk}`;
|
||||
};
|
||||
|
||||
const mergeVisibleChunk = (base: string, chunk: string): { text: string; delta: string } => {
|
||||
const mergeVisibleChunk = (base: string, chunk: string): { rawText: string; delta: string } => {
|
||||
if (!base) {
|
||||
return { text: chunk, delta: chunk };
|
||||
return { rawText: chunk, delta: chunk };
|
||||
}
|
||||
if (chunk.startsWith(base) && chunk.length > base.length) {
|
||||
const delta = chunk.slice(base.length);
|
||||
return { text: chunk, delta };
|
||||
return { rawText: chunk, delta };
|
||||
}
|
||||
return {
|
||||
text: `${base}${chunk}`,
|
||||
rawText: `${base}${chunk}`,
|
||||
delta: chunk,
|
||||
};
|
||||
};
|
||||
@@ -199,8 +202,22 @@ export function createAcpVisibleTextAccumulator() {
|
||||
pendingSilentPrefix = leadCandidate;
|
||||
return null;
|
||||
}
|
||||
// Strip leading NO_REPLY token when it is glued to visible text
|
||||
// (e.g. "NO_REPLYThe user is saying...") so the token never leaks.
|
||||
if (startsWithSilentToken(trimmedLeadCandidate, SILENT_REPLY_TOKEN)) {
|
||||
const stripped = stripLeadingSilentToken(leadCandidate, SILENT_REPLY_TOKEN);
|
||||
if (stripped) {
|
||||
pendingSilentPrefix = "";
|
||||
rawVisibleText = leadCandidate;
|
||||
visibleText = stripped;
|
||||
return { text: stripped, delta: stripped };
|
||||
}
|
||||
pendingSilentPrefix = leadCandidate;
|
||||
return null;
|
||||
}
|
||||
if (pendingSilentPrefix) {
|
||||
pendingSilentPrefix = "";
|
||||
rawVisibleText = leadCandidate;
|
||||
visibleText = leadCandidate;
|
||||
return {
|
||||
text: visibleText,
|
||||
@@ -209,9 +226,13 @@ export function createAcpVisibleTextAccumulator() {
|
||||
}
|
||||
}
|
||||
|
||||
const nextVisible = mergeVisibleChunk(visibleText, chunk);
|
||||
visibleText = nextVisible.text;
|
||||
return nextVisible.delta ? nextVisible : null;
|
||||
const nextVisible = mergeVisibleChunk(rawVisibleText, chunk);
|
||||
rawVisibleText = nextVisible.rawText;
|
||||
if (!nextVisible.delta) {
|
||||
return null;
|
||||
}
|
||||
visibleText = `${visibleText}${nextVisible.delta}`;
|
||||
return { text: visibleText, delta: nextVisible.delta };
|
||||
},
|
||||
finalize(): string {
|
||||
return visibleText.trim();
|
||||
|
||||
@@ -290,6 +290,49 @@ describe("runAgentTurnWithFallback", () => {
|
||||
expect(onToolResult.mock.calls[0]?.[0]?.text).toBeUndefined();
|
||||
});
|
||||
|
||||
it("strips a glued leading NO_REPLY token from streamed tool results", async () => {
|
||||
const onToolResult = vi.fn();
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
|
||||
await params.onToolResult?.({ text: "NO_REPLYThe user is saying hello" });
|
||||
return { payloads: [{ text: "final" }], meta: {} };
|
||||
});
|
||||
|
||||
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
|
||||
const pendingToolTasks = new Set<Promise<void>>();
|
||||
const typingSignals = createMockTypingSignaler();
|
||||
const result = await runAgentTurnWithFallback({
|
||||
commandBody: "hello",
|
||||
followupRun: createFollowupRun(),
|
||||
sessionCtx: {
|
||||
Provider: "whatsapp",
|
||||
MessageSid: "msg",
|
||||
} as unknown as TemplateContext,
|
||||
opts: {
|
||||
onToolResult,
|
||||
} satisfies GetReplyOptions,
|
||||
typingSignals,
|
||||
blockReplyPipeline: null,
|
||||
blockStreamingEnabled: false,
|
||||
resolvedBlockStreamingBreak: "message_end",
|
||||
applyReplyToMode: (payload) => payload,
|
||||
shouldEmitToolResult: () => true,
|
||||
shouldEmitToolOutput: () => false,
|
||||
pendingToolTasks,
|
||||
resetSessionAfterCompactionFailure: async () => false,
|
||||
resetSessionAfterRoleOrderingConflict: async () => false,
|
||||
isHeartbeat: false,
|
||||
sessionKey: "main",
|
||||
getActiveSessionEntry: () => undefined,
|
||||
resolvedVerboseLevel: "off",
|
||||
});
|
||||
|
||||
await Promise.all(pendingToolTasks);
|
||||
|
||||
expect(result.kind).toBe("success");
|
||||
expect(typingSignals.signalTextDelta).toHaveBeenCalledWith("The user is saying hello");
|
||||
expect(onToolResult).toHaveBeenCalledWith({ text: "The user is saying hello" });
|
||||
});
|
||||
|
||||
it("forwards item lifecycle events to reply options", async () => {
|
||||
const onItemEvent = vi.fn();
|
||||
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
|
||||
|
||||
@@ -58,6 +58,8 @@ import {
|
||||
isSilentReplyPrefixText,
|
||||
isSilentReplyText,
|
||||
SILENT_REPLY_TOKEN,
|
||||
startsWithSilentToken,
|
||||
stripLeadingSilentToken,
|
||||
} from "../tokens.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
import { resolveRunAuthProfile } from "./agent-runner-auth-profile.js";
|
||||
@@ -681,6 +683,9 @@ export async function runAgentTurnWithFallback(params: {
|
||||
) {
|
||||
return { skip: true };
|
||||
}
|
||||
if (text && startsWithSilentToken(text, SILENT_REPLY_TOKEN)) {
|
||||
text = stripLeadingSilentToken(text, SILENT_REPLY_TOKEN);
|
||||
}
|
||||
if (!text) {
|
||||
// Allow media-only payloads (e.g. tool result screenshots) through.
|
||||
if (reply.hasMedia) {
|
||||
|
||||
@@ -452,6 +452,12 @@ describe("runReplyAgent typing (heartbeat)", () => {
|
||||
expectedForwarded: ["No", "No, that is valid"],
|
||||
shouldType: true,
|
||||
},
|
||||
{
|
||||
partials: ["NO_REPLYThe user is saying hello"],
|
||||
finalText: "NO_REPLYThe user is saying hello",
|
||||
expectedForwarded: ["The user is saying hello"],
|
||||
shouldType: true,
|
||||
},
|
||||
] as const;
|
||||
|
||||
for (const testCase of cases) {
|
||||
|
||||
@@ -7,6 +7,8 @@ import {
|
||||
isSilentReplyPayloadText,
|
||||
isSilentReplyText,
|
||||
SILENT_REPLY_TOKEN,
|
||||
startsWithSilentToken,
|
||||
stripLeadingSilentToken,
|
||||
stripSilentToken,
|
||||
} from "../tokens.js";
|
||||
import type { ReplyPayload } from "../types.js";
|
||||
@@ -62,11 +64,17 @@ export function normalizeReplyPayload(
|
||||
// Strip NO_REPLY from mixed-content messages (e.g. "😄 NO_REPLY") so the
|
||||
// token never leaks to end users. If stripping leaves nothing, treat it as
|
||||
// silent just like the exact-match path above. (#30916, #30955)
|
||||
if (text && text.includes(silentToken) && !isSilentReplyText(text, silentToken)) {
|
||||
text = stripSilentToken(text, silentToken);
|
||||
if (!hasContent(text)) {
|
||||
opts.onSkip?.("silent");
|
||||
return null;
|
||||
if (text && !isSilentReplyText(text, silentToken)) {
|
||||
const hasLeadingSilentToken = startsWithSilentToken(text, silentToken);
|
||||
if (hasLeadingSilentToken) {
|
||||
text = stripLeadingSilentToken(text, silentToken);
|
||||
}
|
||||
if (hasLeadingSilentToken || text.includes(silentToken)) {
|
||||
text = stripSilentToken(text, silentToken);
|
||||
if (!hasContent(text)) {
|
||||
opts.onSkip?.("silent");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (text && !trimmed) {
|
||||
|
||||
@@ -125,12 +125,38 @@ describe("normalizeReplyPayload", () => {
|
||||
expect(result!.text).not.toContain("NO_REPLY");
|
||||
});
|
||||
|
||||
it("strips glued leading NO_REPLY text without leaking the token", () => {
|
||||
const result = normalizeReplyPayload({
|
||||
text: "NO_REPLYThe user is saying hello",
|
||||
});
|
||||
expect(result).not.toBeNull();
|
||||
expect(result!.text).toBe("The user is saying hello");
|
||||
});
|
||||
|
||||
it("strips glued leading NO_REPLY text case-insensitively", () => {
|
||||
const result = normalizeReplyPayload({
|
||||
text: "no_replyThe user is saying hello",
|
||||
});
|
||||
expect(result).not.toBeNull();
|
||||
expect(result!.text).toBe("The user is saying hello");
|
||||
});
|
||||
|
||||
it("keeps NO_REPLY when used as leading substantive text", () => {
|
||||
const result = normalizeReplyPayload({ text: "NO_REPLY -- nope" });
|
||||
expect(result).not.toBeNull();
|
||||
expect(result!.text).toBe("NO_REPLY -- nope");
|
||||
});
|
||||
|
||||
it("keeps punctuation-start content after a leading NO_REPLY token", () => {
|
||||
const colonResult = normalizeReplyPayload({ text: "NO_REPLY: explanation" });
|
||||
expect(colonResult).not.toBeNull();
|
||||
expect(colonResult!.text).toBe("NO_REPLY: explanation");
|
||||
|
||||
const dashResult = normalizeReplyPayload({ text: "NO_REPLY—note" });
|
||||
expect(dashResult).not.toBeNull();
|
||||
expect(dashResult!.text).toBe("NO_REPLY—note");
|
||||
});
|
||||
|
||||
it("suppresses message when stripping NO_REPLY leaves nothing", () => {
|
||||
const reasons: string[] = [];
|
||||
const result = normalizeReplyPayload(
|
||||
@@ -968,6 +994,22 @@ describe("createStreamingDirectiveAccumulator", () => {
|
||||
expect(afterReset?.replyToTag).toBe(false);
|
||||
expect(afterReset?.replyToId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("strips a glued leading NO_REPLY token from streamed text", () => {
|
||||
const accumulator = createStreamingDirectiveAccumulator();
|
||||
|
||||
const result = accumulator.consume("NO_REPLYThe user is saying hello");
|
||||
|
||||
expect(result?.text).toBe("The user is saying hello");
|
||||
});
|
||||
|
||||
it("keeps punctuation-start text after a leading NO_REPLY token", () => {
|
||||
const accumulator = createStreamingDirectiveAccumulator();
|
||||
|
||||
const result = accumulator.consume("NO_REPLY: explanation");
|
||||
|
||||
expect(result?.text).toBe("NO_REPLY: explanation");
|
||||
});
|
||||
});
|
||||
|
||||
describe("extractShortModelName", () => {
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
import { hasOutboundReplyContent } from "openclaw/plugin-sdk/reply-payload";
|
||||
import { splitMediaFromOutput } from "../../media/parse.js";
|
||||
import { parseInlineDirectives } from "../../utils/directive-tags.js";
|
||||
import { isSilentReplyPrefixText, isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
|
||||
import {
|
||||
isSilentReplyPrefixText,
|
||||
isSilentReplyText,
|
||||
SILENT_REPLY_TOKEN,
|
||||
startsWithSilentToken,
|
||||
stripLeadingSilentToken,
|
||||
} from "../tokens.js";
|
||||
import type { ReplyDirectiveParseResult } from "./reply-directives.js";
|
||||
|
||||
type PendingReplyState = {
|
||||
@@ -52,6 +58,8 @@ const parseChunk = (raw: string, options?: { silentToken?: string }): ParsedChun
|
||||
isSilentReplyText(text, silentToken) || isSilentReplyPrefixText(text, silentToken);
|
||||
if (isSilent) {
|
||||
text = "";
|
||||
} else if (startsWithSilentToken(text, silentToken)) {
|
||||
text = stripLeadingSilentToken(text, silentToken);
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { isSilentReplyPrefixText, isSilentReplyText, stripSilentToken } from "./tokens.js";
|
||||
import {
|
||||
isSilentReplyPrefixText,
|
||||
isSilentReplyText,
|
||||
startsWithSilentToken,
|
||||
stripLeadingSilentToken,
|
||||
stripSilentToken,
|
||||
} from "./tokens.js";
|
||||
|
||||
describe("isSilentReplyText", () => {
|
||||
it("returns true for exact token", () => {
|
||||
@@ -78,6 +84,28 @@ describe("stripSilentToken", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("stripLeadingSilentToken", () => {
|
||||
it("strips glued leading token text", () => {
|
||||
expect(stripLeadingSilentToken("NO_REPLYThe user is saying")).toBe("The user is saying");
|
||||
});
|
||||
});
|
||||
|
||||
describe("startsWithSilentToken", () => {
|
||||
it("matches leading glued silent tokens case-insensitively", () => {
|
||||
expect(startsWithSilentToken("NO_REPLYThe user is saying")).toBe(true);
|
||||
expect(startsWithSilentToken("No_RePlYThe user is saying")).toBe(true);
|
||||
expect(startsWithSilentToken("no_replyThe user is saying")).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects separated substantive prefixes and exact-token-only text", () => {
|
||||
expect(startsWithSilentToken("NO_REPLY -- nope")).toBe(false);
|
||||
expect(startsWithSilentToken("NO_REPLY: explanation")).toBe(false);
|
||||
expect(startsWithSilentToken("NO_REPLY—note")).toBe(false);
|
||||
expect(startsWithSilentToken("NO_REPLY")).toBe(false);
|
||||
expect(startsWithSilentToken(" NO_REPLY ")).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("isSilentReplyPrefixText", () => {
|
||||
it("matches uppercase token lead fragments", () => {
|
||||
expect(isSilentReplyPrefixText("NO")).toBe(true);
|
||||
|
||||
@@ -5,6 +5,7 @@ export const SILENT_REPLY_TOKEN = "NO_REPLY";
|
||||
|
||||
const silentExactRegexByToken = new Map<string, RegExp>();
|
||||
const silentTrailingRegexByToken = new Map<string, RegExp>();
|
||||
const silentLeadingAttachedRegexByToken = new Map<string, RegExp>();
|
||||
|
||||
function getSilentExactRegex(token: string): RegExp {
|
||||
const cached = silentExactRegexByToken.get(token);
|
||||
@@ -86,6 +87,58 @@ export function stripSilentToken(text: string, token: string = SILENT_REPLY_TOKE
|
||||
return text.replace(getSilentTrailingRegex(token), "").trim();
|
||||
}
|
||||
|
||||
const silentLeadingRegexByToken = new Map<string, RegExp>();
|
||||
|
||||
function getSilentLeadingAttachedRegex(token: string): RegExp {
|
||||
const cached = silentLeadingAttachedRegexByToken.get(token);
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
const escaped = escapeRegExp(token);
|
||||
// Match one or more leading occurrences of the token where the final token
|
||||
// is glued directly to visible word-start content (for example
|
||||
// `NO_REPLYhello`), without treating punctuation-start text like
|
||||
// `NO_REPLY: explanation` as a silent prefix.
|
||||
const regex = new RegExp(`^\\s*(?:${escaped}\\s+)*${escaped}(?=[\\p{L}\\p{N}])`, "iu");
|
||||
silentLeadingAttachedRegexByToken.set(token, regex);
|
||||
return regex;
|
||||
}
|
||||
|
||||
function getSilentLeadingRegex(token: string): RegExp {
|
||||
const cached = silentLeadingRegexByToken.get(token);
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
const escaped = escapeRegExp(token);
|
||||
// Match one or more leading occurrences of the token, each optionally followed by whitespace
|
||||
const regex = new RegExp(`^(?:\\s*${escaped})+\\s*`, "i");
|
||||
silentLeadingRegexByToken.set(token, regex);
|
||||
return regex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Strip leading silent reply tokens from text.
|
||||
* Handles cases like "NO_REPLYThe user is saying..." where the token
|
||||
* is not separated from the following text.
|
||||
*/
|
||||
export function stripLeadingSilentToken(text: string, token: string = SILENT_REPLY_TOKEN): string {
|
||||
return text.replace(getSilentLeadingRegex(token), "").trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether text starts with one or more leading silent reply tokens where
|
||||
* the final token is glued directly to visible content.
|
||||
*/
|
||||
export function startsWithSilentToken(
|
||||
text: string | undefined,
|
||||
token: string = SILENT_REPLY_TOKEN,
|
||||
): boolean {
|
||||
if (!text) {
|
||||
return false;
|
||||
}
|
||||
return getSilentLeadingAttachedRegex(token).test(text);
|
||||
}
|
||||
|
||||
export function isSilentReplyPrefixText(
|
||||
text: string | undefined,
|
||||
token: string = SILENT_REPLY_TOKEN,
|
||||
|
||||
@@ -292,6 +292,46 @@ describe("agent event handler", () => {
|
||||
nowSpy?.mockRestore();
|
||||
});
|
||||
|
||||
it("strips a glued leading NO_REPLY token from cumulative chat snapshots", () => {
|
||||
const { broadcast, nodeSendToSession, chatRunState, handler, nowSpy } = createHarness({
|
||||
now: 2_250,
|
||||
});
|
||||
chatRunState.registry.add("run-4b", { sessionKey: "session-4b", clientRunId: "client-4b" });
|
||||
|
||||
handler({
|
||||
runId: "run-4b",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "NO_REPLYThe user" },
|
||||
});
|
||||
handler({
|
||||
runId: "run-4b",
|
||||
seq: 2,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "NO_REPLYThe user is saying hello" },
|
||||
});
|
||||
emitLifecycleEnd(handler, "run-4b");
|
||||
|
||||
const chatCalls = chatBroadcastCalls(broadcast);
|
||||
const finalPayload = chatCalls.at(-1)?.[1] as {
|
||||
message?: { content?: Array<{ text?: string }> };
|
||||
state?: string;
|
||||
};
|
||||
expect(finalPayload.state).toBe("final");
|
||||
expect(finalPayload.message?.content?.[0]?.text).toBe("The user is saying hello");
|
||||
expect(
|
||||
chatCalls.every(([, payload]) => {
|
||||
const text = (payload as { message?: { content?: Array<{ text?: string }> } }).message
|
||||
?.content?.[0]?.text;
|
||||
return !text || !text.includes("NO_REPLY");
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(chatCalls.length);
|
||||
nowSpy?.mockRestore();
|
||||
});
|
||||
|
||||
it("flushes buffered text as delta before final when throttle suppresses the latest chunk", () => {
|
||||
let now = 10_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import { DEFAULT_HEARTBEAT_ACK_MAX_CHARS, stripHeartbeatToken } from "../auto-reply/heartbeat.js";
|
||||
import { normalizeVerboseLevel } from "../auto-reply/thinking.js";
|
||||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
|
||||
import {
|
||||
isSilentReplyText,
|
||||
SILENT_REPLY_TOKEN,
|
||||
startsWithSilentToken,
|
||||
stripLeadingSilentToken,
|
||||
} from "../auto-reply/tokens.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { type AgentEventPayload, getAgentRunContext } from "../infra/agent-events.js";
|
||||
import { resolveHeartbeatVisibility } from "../infra/heartbeat-visibility.js";
|
||||
@@ -201,6 +206,7 @@ export function createChatRunRegistry(): ChatRunRegistry {
|
||||
|
||||
export type ChatRunState = {
|
||||
registry: ChatRunRegistry;
|
||||
rawBuffers: Map<string, string>;
|
||||
buffers: Map<string, string>;
|
||||
deltaSentAt: Map<string, number>;
|
||||
/** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */
|
||||
@@ -211,6 +217,7 @@ export type ChatRunState = {
|
||||
|
||||
export function createChatRunState(): ChatRunState {
|
||||
const registry = createChatRunRegistry();
|
||||
const rawBuffers = new Map<string, string>();
|
||||
const buffers = new Map<string, string>();
|
||||
const deltaSentAt = new Map<string, number>();
|
||||
const deltaLastBroadcastLen = new Map<string, number>();
|
||||
@@ -218,6 +225,7 @@ export function createChatRunState(): ChatRunState {
|
||||
|
||||
const clear = () => {
|
||||
registry.clear();
|
||||
rawBuffers.clear();
|
||||
buffers.clear();
|
||||
deltaSentAt.clear();
|
||||
deltaLastBroadcastLen.clear();
|
||||
@@ -226,6 +234,7 @@ export function createChatRunState(): ChatRunState {
|
||||
|
||||
return {
|
||||
registry,
|
||||
rawBuffers,
|
||||
buffers,
|
||||
deltaSentAt,
|
||||
deltaLastBroadcastLen,
|
||||
@@ -474,6 +483,7 @@ export function createAgentEventHandler({
|
||||
const pendingTerminalLifecycleErrors = new Map<string, NodeJS.Timeout>();
|
||||
|
||||
const clearBufferedChatState = (clientRunId: string) => {
|
||||
chatRunState.rawBuffers.delete(clientRunId);
|
||||
chatRunState.buffers.delete(clientRunId);
|
||||
chatRunState.deltaSentAt.delete(clientRunId);
|
||||
chatRunState.deltaLastBroadcastLen.delete(clientRunId);
|
||||
@@ -672,22 +682,28 @@ export function createAgentEventHandler({
|
||||
const cleanedText = stripInlineDirectiveTagsForDisplay(text).text;
|
||||
const cleanedDelta =
|
||||
typeof delta === "string" ? stripInlineDirectiveTagsForDisplay(delta).text : "";
|
||||
const previousText = chatRunState.buffers.get(clientRunId) ?? "";
|
||||
const mergedText = resolveMergedAssistantText({
|
||||
previousText,
|
||||
const previousRawText = chatRunState.rawBuffers.get(clientRunId) ?? "";
|
||||
const mergedRawText = resolveMergedAssistantText({
|
||||
previousText: previousRawText,
|
||||
nextText: cleanedText,
|
||||
nextDelta: cleanedDelta,
|
||||
});
|
||||
if (!mergedText) {
|
||||
if (!mergedRawText) {
|
||||
return;
|
||||
}
|
||||
chatRunState.rawBuffers.set(clientRunId, mergedRawText);
|
||||
if (isSilentReplyText(mergedRawText, SILENT_REPLY_TOKEN)) {
|
||||
chatRunState.buffers.set(clientRunId, "");
|
||||
return;
|
||||
}
|
||||
if (isSilentReplyLeadFragment(mergedRawText)) {
|
||||
chatRunState.buffers.set(clientRunId, mergedRawText);
|
||||
return;
|
||||
}
|
||||
const mergedText = startsWithSilentToken(mergedRawText, SILENT_REPLY_TOKEN)
|
||||
? stripLeadingSilentToken(mergedRawText, SILENT_REPLY_TOKEN)
|
||||
: mergedRawText;
|
||||
chatRunState.buffers.set(clientRunId, mergedText);
|
||||
if (isSilentReplyText(mergedText, SILENT_REPLY_TOKEN)) {
|
||||
return;
|
||||
}
|
||||
if (isSilentReplyLeadFragment(mergedText)) {
|
||||
return;
|
||||
}
|
||||
if (shouldHideHeartbeatChatOutput(clientRunId, sourceRunId)) {
|
||||
return;
|
||||
}
|
||||
@@ -788,6 +804,7 @@ export function createAgentEventHandler({
|
||||
// Only flush if the buffer has grown since the last broadcast to avoid duplicates.
|
||||
flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, sourceRunId, seq);
|
||||
chatRunState.deltaLastBroadcastLen.delete(clientRunId);
|
||||
chatRunState.rawBuffers.delete(clientRunId);
|
||||
chatRunState.buffers.delete(clientRunId);
|
||||
chatRunState.deltaSentAt.delete(clientRunId);
|
||||
if (jobState === "done") {
|
||||
|
||||
Reference in New Issue
Block a user