fix(streaming): preserve split final tags

This commit is contained in:
Peter Steinberger
2026-04-29 04:07:31 +01:00
parent 68ef37011e
commit 5435591f6a
6 changed files with 247 additions and 18 deletions

View File

@@ -73,6 +73,7 @@ Docs: https://docs.openclaw.ai
- CLI/image describe: pass `--prompt` and `--timeout-ms` through `infer image describe` and `describe-many`, so custom vision instructions and slow local model budgets reach media-understanding providers such as Ollama, OpenAI, Google, and OpenRouter. Refs #63700. Thanks @cedricjanssens.
- Model selection: include the rejected provider/model ref and allowlist recovery hint when a stored session override is cleared, so local model selections such as Gemma GGUF variants do not fall back to the default with a generic message. Refs #71069. Thanks @CyberRaccoonTeam.
- OpenAI-compatible providers: drop malformed event-only or blank-data SSE frames before the OpenAI SDK stream parser sees them, so proxies that split `event:` from `data:` no longer crash streaming runs with `Unexpected end of JSON input`. Fixes #52802. Thanks @LyHug.
- Gateway/OpenAI-compatible streaming: strip `<final>` tags split across streamed model deltas before they reach SSE clients, so `/v1/chat/completions` no longer emits tag remnants or drops content when final-answer wrappers cross chunk boundaries. Fixes #63325. Thanks @tzwickl.
- Local model prompt caching: keep stable Project Context above volatile channel/session prompt guidance and stop embedding current channel names in the message tool description, so Ollama, MLX, llama.cpp, and other prefix-cache backends avoid avoidable full prompt reprocessing across channel turns. Fixes #40256; supersedes #40296. Thanks @rhclaw and @sriram369.
- Gateway/OpenAI-compatible API: guard provider policy lookup against runtime providers with non-array `models` values, so `/v1/chat/completions` no longer fails with `provider?.models?.some is not a function`. Fixes #66744; carries forward #66761. Thanks @MightyMoud, @MukundaKatta.
- WhatsApp/Web: pass explicit Baileys socket timings into every WhatsApp Web socket and expose `web.whatsapp.*` keepalive, connect, and query timeout settings so unstable networks can avoid repeated 408 disconnect and opening-handshake timeout loops. Fixes #56365. (#73580) Thanks @velvet-shark.

View File

@@ -531,15 +531,22 @@ export function handleMessageUpdate(
(deliveryPhase === "final_answer"
? ""
: ctx
.stripBlockTags(ctx.state.deltaBuffer, {
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
})
.stripBlockTags(
ctx.state.deltaBuffer,
{
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
},
{ final: evtType === "text_end" },
)
.trim());
if (next) {
const wasThinking = ctx.state.partialBlockState.thinking;
const visibleDelta = chunk ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState) : "";
const visibleDelta =
chunk || evtType === "text_end"
? ctx.stripBlockTags(chunk, ctx.state.partialBlockState, { final: evtType === "text_end" })
: "";
if (!wasThinking && ctx.state.partialBlockState.thinking) {
openReasoningStream(ctx);
}
@@ -678,7 +685,7 @@ export function handleMessageEnd(
warnIfAssistantEmittedToolText(ctx, assistantMessage);
const text = resolveSilentReplyFallbackText({
text: ctx.stripBlockTags(rawVisibleText, { thinking: false, final: false }),
text: ctx.stripBlockTags(rawVisibleText, { thinking: false, final: false }, { final: true }),
messagingToolSentTexts: ctx.state.messagingToolSentTexts,
});
const rawThinking =
@@ -700,6 +707,8 @@ export function handleMessageEnd(
ctx.state.blockState.thinking = false;
ctx.state.blockState.final = false;
ctx.state.blockState.inlineCode = createInlineCodeState();
ctx.state.blockState.pendingTagFragment = undefined;
ctx.state.partialBlockState.pendingTagFragment = undefined;
ctx.state.lastStreamedAssistant = undefined;
ctx.state.lastStreamedAssistantCleaned = undefined;
ctx.state.reasoningStreamOpen = false;

View File

@@ -44,8 +44,18 @@ export type EmbeddedPiSubscribeState = {
deltaBuffer: string;
blockBuffer: string;
blockState: { thinking: boolean; final: boolean; inlineCode: InlineCodeState };
partialBlockState: { thinking: boolean; final: boolean; inlineCode: InlineCodeState };
blockState: {
thinking: boolean;
final: boolean;
inlineCode: InlineCodeState;
pendingTagFragment?: string;
};
partialBlockState: {
thinking: boolean;
final: boolean;
inlineCode: InlineCodeState;
pendingTagFragment?: string;
};
lastStreamedAssistant?: string;
lastStreamedAssistantCleaned?: string;
emittedAssistantUpdate: boolean;
@@ -112,7 +122,13 @@ export type EmbeddedPiSubscribeContext = {
emitToolOutput: (toolName?: string, meta?: string, output?: string, result?: unknown) => void;
stripBlockTags: (
text: string,
state: { thinking: boolean; final: boolean; inlineCode?: InlineCodeState },
state: {
thinking: boolean;
final: boolean;
inlineCode?: InlineCodeState;
pendingTagFragment?: string;
},
options?: { final?: boolean },
) => string;
emitBlockChunk: (text: string, options?: { assistantMessageIndex?: number }) => void;
flushBlockReplyBuffer: (options?: { assistantMessageIndex?: number }) => void | Promise<void>;

View File

@@ -3,6 +3,7 @@ import { describe, expect, it, vi } from "vitest";
import {
createStubSessionHarness,
emitAssistantTextDelta,
emitAssistantTextEnd,
emitMessageStartAndEndForAssistantText,
extractAgentEventPayloads,
} from "./pi-embedded-subscribe.e2e-harness.js";
@@ -78,6 +79,108 @@ describe("subscribeEmbeddedPiSession", () => {
expect(onPartialReply).toHaveBeenCalled();
expect(onPartialReply.mock.calls[0][0].text).toBe("Hello world");
});
it("strips final tags split across streamed deltas without emitting tag remnants", () => {
const { session, emit } = createStubSessionHarness();
const onAgentEvent = vi.fn();
subscribeEmbeddedPiSession({
session,
runId: "run",
onAgentEvent,
});
emit({ type: "message_start", message: { role: "assistant" } });
for (const delta of ["<", "final>Title\n", "Line one\nLine two</", "final>"]) {
emitAssistantTextDelta({ emit, delta });
}
const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls);
const streamedText = payloads.map((payload) => payload.delta).join("");
expect(streamedText).toBe("Title\nLine one\nLine two");
expect(streamedText).not.toContain("<");
expect(streamedText).not.toContain("final>");
expect(payloads.some((payload) => payload.replace)).toBe(false);
});
it("preserves final content when enforced final tags are split across streamed deltas", () => {
const { session, emit } = createStubSessionHarness();
const onPartialReply = vi.fn();
subscribeEmbeddedPiSession({
session,
runId: "run",
enforceFinalTag: true,
onPartialReply,
});
emit({ type: "message_start", message: { role: "assistant" } });
for (const delta of ["<fi", "nal>Visible", " content</fi", "nal>"]) {
emitAssistantTextDelta({ emit, delta });
}
const streamedText = onPartialReply.mock.calls
.map((call) => (call[0] as { delta?: unknown }).delta)
.filter((delta): delta is string => typeof delta === "string")
.join("");
expect(streamedText).toBe("Visible content");
});
it("does not buffer ordinary trailing less-than text as a tag fragment", () => {
const { session, emit } = createStubSessionHarness();
const onAgentEvent = vi.fn();
subscribeEmbeddedPiSession({
session,
runId: "run",
onAgentEvent,
});
emit({ type: "message_start", message: { role: "assistant" } });
emitAssistantTextDelta({ emit, delta: "1 < 2" });
const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls);
expect(payloads.map((payload) => payload.delta).join("")).toBe("1 < 2");
});
it("flushes a literal trailing final-tag prefix when the text stream ends", () => {
const { session, emit } = createStubSessionHarness();
const onAgentEvent = vi.fn();
subscribeEmbeddedPiSession({
session,
runId: "run",
onAgentEvent,
});
emit({ type: "message_start", message: { role: "assistant" } });
emitAssistantTextDelta({ emit, delta: "Answer ends with <fi" });
emitAssistantTextEnd({ emit });
const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls);
expect(payloads.map((payload) => payload.delta).join("")).toBe("Answer ends with <fi");
});
it("preserves literal trailing tag-prefix text from message end fallback", () => {
const { session, emit } = createStubSessionHarness();
const onAgentEvent = vi.fn();
subscribeEmbeddedPiSession({
session,
runId: "run",
onAgentEvent,
});
emitMessageStartAndEndForAssistantText({ emit, text: "Answer ends with <" });
const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls);
expect(payloads.map((payload) => payload.delta).join("")).toBe("Answer ends with <");
});
it("does not require <final> when enforcement is off", () => {
const { session, emit } = createStubSessionHarness();

View File

@@ -42,8 +42,51 @@ import {
import { hasNonzeroUsage, normalizeUsage, type UsageLike } from "./usage.js";
const FINAL_TAG_SCAN_RE = /<\s*(\/?)\s*final\s*>/gi;
const STREAM_STRIPPED_BLOCK_TAG_NAMES = [
"final",
"think",
"thinking",
"thought",
"antthinking",
"antml:think",
"antml:thinking",
"antml:thought",
] as const;
const log = createSubsystemLogger("agent/embedded");
function isPotentialTrailingBlockTagFragment(fragment: string): boolean {
if (!fragment.startsWith("<") || fragment.includes(">")) {
return false;
}
const normalized = fragment.toLowerCase().replace(/\s+/g, "");
if (!normalized.startsWith("<")) {
return false;
}
const candidate = normalized.slice(1).replace(/^\//, "");
if (!candidate) {
return true;
}
return STREAM_STRIPPED_BLOCK_TAG_NAMES.some((name) => name.startsWith(candidate));
}
function splitTrailingBlockTagFragment(
text: string,
isInsideCodeSpan: (index: number) => boolean,
): { text: string; pendingTagFragment?: string } {
const fragmentStart = text.lastIndexOf("<");
if (fragmentStart === -1 || isInsideCodeSpan(fragmentStart)) {
return { text };
}
const fragment = text.slice(fragmentStart);
if (!isPotentialTrailingBlockTagFragment(fragment)) {
return { text };
}
return {
text: text.slice(0, fragmentStart),
pendingTagFragment: fragment,
};
}
function collectPendingMediaFromInternalEvents(
events: SubscribeEmbeddedPiSessionParams["internalEvents"],
): string[] {
@@ -213,9 +256,11 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
state.blockState.thinking = false;
state.blockState.final = false;
state.blockState.inlineCode = createInlineCodeState();
state.blockState.pendingTagFragment = undefined;
state.partialBlockState.thinking = false;
state.partialBlockState.final = false;
state.partialBlockState.inlineCode = createInlineCodeState();
state.partialBlockState.pendingTagFragment = undefined;
state.lastStreamedAssistant = undefined;
state.lastStreamedAssistantCleaned = undefined;
state.emittedAssistantUpdate = false;
@@ -521,20 +566,36 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const stripBlockTags = (
text: string,
state: { thinking: boolean; final: boolean; inlineCode?: InlineCodeState },
state: {
thinking: boolean;
final: boolean;
inlineCode?: InlineCodeState;
pendingTagFragment?: string;
},
options?: { final?: boolean },
): string => {
if (!text) {
const input = `${state.pendingTagFragment ?? ""}${text}`;
state.pendingTagFragment = undefined;
if (!input) {
return text;
}
const inlineStateStart = state.inlineCode ?? createInlineCodeState();
const codeSpans = buildCodeSpanIndex(text, inlineStateStart);
const initialCodeSpans = buildCodeSpanIndex(input, inlineStateStart);
const { text: scanText, pendingTagFragment } = options?.final
? { text: input, pendingTagFragment: undefined }
: splitTrailingBlockTagFragment(input, initialCodeSpans.isInside);
state.pendingTagFragment = pendingTagFragment;
if (!scanText) {
return "";
}
const codeSpans = buildCodeSpanIndex(scanText, inlineStateStart);
let processed = "";
THINKING_TAG_SCAN_RE.lastIndex = 0;
let lastIndex = 0;
let inThinking = state.thinking;
for (const match of text.matchAll(THINKING_TAG_SCAN_RE)) {
for (const match of scanText.matchAll(THINKING_TAG_SCAN_RE)) {
const idx = match.index ?? 0;
if (codeSpans.isInside(idx)) {
continue;
@@ -543,8 +604,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
if (!inThinking) {
if (isClose) {
const afterIndex = idx + match[0].length;
const before = text.slice(lastIndex, idx);
const after = text.slice(afterIndex);
const before = scanText.slice(lastIndex, idx);
const after = scanText.slice(afterIndex);
if (hasOrphanReasoningCloseBoundary({ before, after })) {
processed = "";
} else {
@@ -553,13 +614,13 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
lastIndex = afterIndex;
continue;
}
processed += text.slice(lastIndex, idx);
processed += scanText.slice(lastIndex, idx);
}
inThinking = !isClose;
lastIndex = idx + match[0].length;
}
if (!inThinking) {
processed += text.slice(lastIndex);
processed += scanText.slice(lastIndex);
}
state.thinking = inThinking;

View File

@@ -2,6 +2,11 @@ import fs from "node:fs/promises";
import http from "node:http";
import path from "node:path";
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
import {
createStubSessionHarness,
emitAssistantTextDelta,
} from "../agents/pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "../agents/pi-embedded-subscribe.js";
import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js";
import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js";
import { emitAgentEvent } from "../infra/agent-events.js";
@@ -655,6 +660,40 @@ describe("OpenAI-compatible HTTP API (e2e)", () => {
expect(msg.content).toBe("hello");
}
{
agentCommand.mockClear();
agentCommand.mockImplementationOnce((async (opts: unknown) => {
const runId = (opts as { runId?: string } | undefined)?.runId ?? "";
const { session, emit } = createStubSessionHarness();
subscribeEmbeddedPiSession({ session, runId });
emit({ type: "message_start", message: { role: "assistant" } });
for (const delta of ["<", "final>Title\n", "Line one\nLine two</", "final>"]) {
emitAssistantTextDelta({ emit, delta });
}
return { payloads: [{ text: "Title\nLine one\nLine two" }] };
}) as never);
const splitFinalRes = await postChatCompletions(port, {
stream: true,
model: "openclaw",
messages: [{ role: "user", content: "hi" }],
});
expect(splitFinalRes.status).toBe(200);
const splitFinalText = await splitFinalRes.text();
const splitFinalData = parseSseDataLines(splitFinalText);
const splitFinalChunks = splitFinalData
.filter((d) => d !== "[DONE]")
.map((d) => JSON.parse(d) as Record<string, unknown>);
const splitFinalContent = splitFinalChunks
.flatMap((c) => (c.choices as Array<Record<string, unknown>> | undefined) ?? [])
.map((choice) => (choice.delta as Record<string, unknown> | undefined)?.content)
.filter((v): v is string => typeof v === "string")
.join("");
expect(splitFinalContent).toBe("Title\nLine one\nLine two");
expect(splitFinalContent).not.toContain("<");
expect(splitFinalContent).not.toContain("final>");
}
{
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({