diff --git a/CHANGELOG.md b/CHANGELOG.md index 94639e38dc2..1a554ba02ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,7 @@ Docs: https://docs.openclaw.ai - CLI/image describe: pass `--prompt` and `--timeout-ms` through `infer image describe` and `describe-many`, so custom vision instructions and slow local model budgets reach media-understanding providers such as Ollama, OpenAI, Google, and OpenRouter. Refs #63700. Thanks @cedricjanssens. - Model selection: include the rejected provider/model ref and allowlist recovery hint when a stored session override is cleared, so local model selections such as Gemma GGUF variants do not fall back to the default with a generic message. Refs #71069. Thanks @CyberRaccoonTeam. - OpenAI-compatible providers: drop malformed event-only or blank-data SSE frames before the OpenAI SDK stream parser sees them, so proxies that split `event:` from `data:` no longer crash streaming runs with `Unexpected end of JSON input`. Fixes #52802. Thanks @LyHug. +- Gateway/OpenAI-compatible streaming: strip `` tags split across streamed model deltas before they reach SSE clients, so `/v1/chat/completions` no longer emits tag remnants or drops content when final-answer wrappers cross chunk boundaries. Fixes #63325. Thanks @tzwickl. - Local model prompt caching: keep stable Project Context above volatile channel/session prompt guidance and stop embedding current channel names in the message tool description, so Ollama, MLX, llama.cpp, and other prefix-cache backends avoid avoidable full prompt reprocessing across channel turns. Fixes #40256; supersedes #40296. Thanks @rhclaw and @sriram369. - Gateway/OpenAI-compatible API: guard provider policy lookup against runtime providers with non-array `models` values, so `/v1/chat/completions` no longer fails with `provider?.models?.some is not a function`. Fixes #66744; carries forward #66761. Thanks @MightyMoud, @MukundaKatta. - WhatsApp/Web: pass explicit Baileys socket timings into every WhatsApp Web socket and expose `web.whatsapp.*` keepalive, connect, and query timeout settings so unstable networks can avoid repeated 408 disconnect and opening-handshake timeout loops. Fixes #56365. (#73580) Thanks @velvet-shark. diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 0a318715929..994e3b1b258 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -531,15 +531,22 @@ export function handleMessageUpdate( (deliveryPhase === "final_answer" ? "" : ctx - .stripBlockTags(ctx.state.deltaBuffer, { - thinking: false, - final: false, - inlineCode: createInlineCodeState(), - }) + .stripBlockTags( + ctx.state.deltaBuffer, + { + thinking: false, + final: false, + inlineCode: createInlineCodeState(), + }, + { final: evtType === "text_end" }, + ) .trim()); if (next) { const wasThinking = ctx.state.partialBlockState.thinking; - const visibleDelta = chunk ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState) : ""; + const visibleDelta = + chunk || evtType === "text_end" + ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState, { final: evtType === "text_end" }) + : ""; if (!wasThinking && ctx.state.partialBlockState.thinking) { openReasoningStream(ctx); } @@ -678,7 +685,7 @@ export function handleMessageEnd( warnIfAssistantEmittedToolText(ctx, assistantMessage); const text = resolveSilentReplyFallbackText({ - text: ctx.stripBlockTags(rawVisibleText, { thinking: false, final: false }), + text: ctx.stripBlockTags(rawVisibleText, { thinking: false, final: false }, { final: true }), messagingToolSentTexts: ctx.state.messagingToolSentTexts, }); const rawThinking = @@ -700,6 +707,8 @@ export function handleMessageEnd( ctx.state.blockState.thinking = false; ctx.state.blockState.final = false; ctx.state.blockState.inlineCode = createInlineCodeState(); + ctx.state.blockState.pendingTagFragment = undefined; + ctx.state.partialBlockState.pendingTagFragment = undefined; ctx.state.lastStreamedAssistant = undefined; ctx.state.lastStreamedAssistantCleaned = undefined; ctx.state.reasoningStreamOpen = false; diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index c1c394f9fe6..f2fd3674a8a 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -44,8 +44,18 @@ export type EmbeddedPiSubscribeState = { deltaBuffer: string; blockBuffer: string; - blockState: { thinking: boolean; final: boolean; inlineCode: InlineCodeState }; - partialBlockState: { thinking: boolean; final: boolean; inlineCode: InlineCodeState }; + blockState: { + thinking: boolean; + final: boolean; + inlineCode: InlineCodeState; + pendingTagFragment?: string; + }; + partialBlockState: { + thinking: boolean; + final: boolean; + inlineCode: InlineCodeState; + pendingTagFragment?: string; + }; lastStreamedAssistant?: string; lastStreamedAssistantCleaned?: string; emittedAssistantUpdate: boolean; @@ -112,7 +122,13 @@ export type EmbeddedPiSubscribeContext = { emitToolOutput: (toolName?: string, meta?: string, output?: string, result?: unknown) => void; stripBlockTags: ( text: string, - state: { thinking: boolean; final: boolean; inlineCode?: InlineCodeState }, + state: { + thinking: boolean; + final: boolean; + inlineCode?: InlineCodeState; + pendingTagFragment?: string; + }, + options?: { final?: boolean }, ) => string; emitBlockChunk: (text: string, options?: { assistantMessageIndex?: number }) => void; flushBlockReplyBuffer: (options?: { assistantMessageIndex?: number }) => void | Promise; diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts index fbbbd964d94..caab1759257 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts @@ -3,6 +3,7 @@ import { describe, expect, it, vi } from "vitest"; import { createStubSessionHarness, emitAssistantTextDelta, + emitAssistantTextEnd, emitMessageStartAndEndForAssistantText, extractAgentEventPayloads, } from "./pi-embedded-subscribe.e2e-harness.js"; @@ -78,6 +79,108 @@ describe("subscribeEmbeddedPiSession", () => { expect(onPartialReply).toHaveBeenCalled(); expect(onPartialReply.mock.calls[0][0].text).toBe("Hello world"); }); + + it("strips final tags split across streamed deltas without emitting tag remnants", () => { + const { session, emit } = createStubSessionHarness(); + + const onAgentEvent = vi.fn(); + + subscribeEmbeddedPiSession({ + session, + runId: "run", + onAgentEvent, + }); + + emit({ type: "message_start", message: { role: "assistant" } }); + for (const delta of ["<", "final>Title\n", "Line one\nLine two"]) { + emitAssistantTextDelta({ emit, delta }); + } + + const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls); + const streamedText = payloads.map((payload) => payload.delta).join(""); + expect(streamedText).toBe("Title\nLine one\nLine two"); + expect(streamedText).not.toContain("<"); + expect(streamedText).not.toContain("final>"); + expect(payloads.some((payload) => payload.replace)).toBe(false); + }); + + it("preserves final content when enforced final tags are split across streamed deltas", () => { + const { session, emit } = createStubSessionHarness(); + + const onPartialReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session, + runId: "run", + enforceFinalTag: true, + onPartialReply, + }); + + emit({ type: "message_start", message: { role: "assistant" } }); + for (const delta of ["Visible", " content"]) { + emitAssistantTextDelta({ emit, delta }); + } + + const streamedText = onPartialReply.mock.calls + .map((call) => (call[0] as { delta?: unknown }).delta) + .filter((delta): delta is string => typeof delta === "string") + .join(""); + expect(streamedText).toBe("Visible content"); + }); + + it("does not buffer ordinary trailing less-than text as a tag fragment", () => { + const { session, emit } = createStubSessionHarness(); + + const onAgentEvent = vi.fn(); + + subscribeEmbeddedPiSession({ + session, + runId: "run", + onAgentEvent, + }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emitAssistantTextDelta({ emit, delta: "1 < 2" }); + + const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls); + expect(payloads.map((payload) => payload.delta).join("")).toBe("1 < 2"); + }); + + it("flushes a literal trailing final-tag prefix when the text stream ends", () => { + const { session, emit } = createStubSessionHarness(); + + const onAgentEvent = vi.fn(); + + subscribeEmbeddedPiSession({ + session, + runId: "run", + onAgentEvent, + }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emitAssistantTextDelta({ emit, delta: "Answer ends with payload.delta).join("")).toBe("Answer ends with { + const { session, emit } = createStubSessionHarness(); + + const onAgentEvent = vi.fn(); + + subscribeEmbeddedPiSession({ + session, + runId: "run", + onAgentEvent, + }); + + emitMessageStartAndEndForAssistantText({ emit, text: "Answer ends with <" }); + + const payloads = extractAgentEventPayloads(onAgentEvent.mock.calls); + expect(payloads.map((payload) => payload.delta).join("")).toBe("Answer ends with <"); + }); it("does not require when enforcement is off", () => { const { session, emit } = createStubSessionHarness(); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 6b14571ed7d..95711bc533e 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -42,8 +42,51 @@ import { import { hasNonzeroUsage, normalizeUsage, type UsageLike } from "./usage.js"; const FINAL_TAG_SCAN_RE = /<\s*(\/?)\s*final\s*>/gi; +const STREAM_STRIPPED_BLOCK_TAG_NAMES = [ + "final", + "think", + "thinking", + "thought", + "antthinking", + "antml:think", + "antml:thinking", + "antml:thought", +] as const; const log = createSubsystemLogger("agent/embedded"); +function isPotentialTrailingBlockTagFragment(fragment: string): boolean { + if (!fragment.startsWith("<") || fragment.includes(">")) { + return false; + } + const normalized = fragment.toLowerCase().replace(/\s+/g, ""); + if (!normalized.startsWith("<")) { + return false; + } + const candidate = normalized.slice(1).replace(/^\//, ""); + if (!candidate) { + return true; + } + return STREAM_STRIPPED_BLOCK_TAG_NAMES.some((name) => name.startsWith(candidate)); +} + +function splitTrailingBlockTagFragment( + text: string, + isInsideCodeSpan: (index: number) => boolean, +): { text: string; pendingTagFragment?: string } { + const fragmentStart = text.lastIndexOf("<"); + if (fragmentStart === -1 || isInsideCodeSpan(fragmentStart)) { + return { text }; + } + const fragment = text.slice(fragmentStart); + if (!isPotentialTrailingBlockTagFragment(fragment)) { + return { text }; + } + return { + text: text.slice(0, fragmentStart), + pendingTagFragment: fragment, + }; +} + function collectPendingMediaFromInternalEvents( events: SubscribeEmbeddedPiSessionParams["internalEvents"], ): string[] { @@ -213,9 +256,11 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.blockState.thinking = false; state.blockState.final = false; state.blockState.inlineCode = createInlineCodeState(); + state.blockState.pendingTagFragment = undefined; state.partialBlockState.thinking = false; state.partialBlockState.final = false; state.partialBlockState.inlineCode = createInlineCodeState(); + state.partialBlockState.pendingTagFragment = undefined; state.lastStreamedAssistant = undefined; state.lastStreamedAssistantCleaned = undefined; state.emittedAssistantUpdate = false; @@ -521,20 +566,36 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar const stripBlockTags = ( text: string, - state: { thinking: boolean; final: boolean; inlineCode?: InlineCodeState }, + state: { + thinking: boolean; + final: boolean; + inlineCode?: InlineCodeState; + pendingTagFragment?: string; + }, + options?: { final?: boolean }, ): string => { - if (!text) { + const input = `${state.pendingTagFragment ?? ""}${text}`; + state.pendingTagFragment = undefined; + if (!input) { return text; } const inlineStateStart = state.inlineCode ?? createInlineCodeState(); - const codeSpans = buildCodeSpanIndex(text, inlineStateStart); + const initialCodeSpans = buildCodeSpanIndex(input, inlineStateStart); + const { text: scanText, pendingTagFragment } = options?.final + ? { text: input, pendingTagFragment: undefined } + : splitTrailingBlockTagFragment(input, initialCodeSpans.isInside); + state.pendingTagFragment = pendingTagFragment; + if (!scanText) { + return ""; + } + const codeSpans = buildCodeSpanIndex(scanText, inlineStateStart); let processed = ""; THINKING_TAG_SCAN_RE.lastIndex = 0; let lastIndex = 0; let inThinking = state.thinking; - for (const match of text.matchAll(THINKING_TAG_SCAN_RE)) { + for (const match of scanText.matchAll(THINKING_TAG_SCAN_RE)) { const idx = match.index ?? 0; if (codeSpans.isInside(idx)) { continue; @@ -543,8 +604,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar if (!inThinking) { if (isClose) { const afterIndex = idx + match[0].length; - const before = text.slice(lastIndex, idx); - const after = text.slice(afterIndex); + const before = scanText.slice(lastIndex, idx); + const after = scanText.slice(afterIndex); if (hasOrphanReasoningCloseBoundary({ before, after })) { processed = ""; } else { @@ -553,13 +614,13 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar lastIndex = afterIndex; continue; } - processed += text.slice(lastIndex, idx); + processed += scanText.slice(lastIndex, idx); } inThinking = !isClose; lastIndex = idx + match[0].length; } if (!inThinking) { - processed += text.slice(lastIndex); + processed += scanText.slice(lastIndex); } state.thinking = inThinking; diff --git a/src/gateway/openai-http.test.ts b/src/gateway/openai-http.test.ts index b1a753abd50..51896e8c266 100644 --- a/src/gateway/openai-http.test.ts +++ b/src/gateway/openai-http.test.ts @@ -2,6 +2,11 @@ import fs from "node:fs/promises"; import http from "node:http"; import path from "node:path"; import { afterAll, beforeAll, describe, expect, it, vi } from "vitest"; +import { + createStubSessionHarness, + emitAssistantTextDelta, +} from "../agents/pi-embedded-subscribe.e2e-harness.js"; +import { subscribeEmbeddedPiSession } from "../agents/pi-embedded-subscribe.js"; import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js"; import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js"; import { emitAgentEvent } from "../infra/agent-events.js"; @@ -655,6 +660,40 @@ describe("OpenAI-compatible HTTP API (e2e)", () => { expect(msg.content).toBe("hello"); } + { + agentCommand.mockClear(); + agentCommand.mockImplementationOnce((async (opts: unknown) => { + const runId = (opts as { runId?: string } | undefined)?.runId ?? ""; + const { session, emit } = createStubSessionHarness(); + subscribeEmbeddedPiSession({ session, runId }); + emit({ type: "message_start", message: { role: "assistant" } }); + for (const delta of ["<", "final>Title\n", "Line one\nLine two"]) { + emitAssistantTextDelta({ emit, delta }); + } + return { payloads: [{ text: "Title\nLine one\nLine two" }] }; + }) as never); + + const splitFinalRes = await postChatCompletions(port, { + stream: true, + model: "openclaw", + messages: [{ role: "user", content: "hi" }], + }); + expect(splitFinalRes.status).toBe(200); + const splitFinalText = await splitFinalRes.text(); + const splitFinalData = parseSseDataLines(splitFinalText); + const splitFinalChunks = splitFinalData + .filter((d) => d !== "[DONE]") + .map((d) => JSON.parse(d) as Record); + const splitFinalContent = splitFinalChunks + .flatMap((c) => (c.choices as Array> | undefined) ?? []) + .map((choice) => (choice.delta as Record | undefined)?.content) + .filter((v): v is string => typeof v === "string") + .join(""); + expect(splitFinalContent).toBe("Title\nLine one\nLine two"); + expect(splitFinalContent).not.toContain("<"); + expect(splitFinalContent).not.toContain("final>"); + } + { agentCommand.mockClear(); agentCommand.mockResolvedValueOnce({