From 946f26f73b4afef469dde737461f530ccff00c1c Mon Sep 17 00:00:00 2001 From: Mason Huang Date: Wed, 29 Apr 2026 18:18:37 +0800 Subject: [PATCH] fix: classify malformed streaming fragments at transport boundary --- src/agents/anthropic-transport-stream.test.ts | 25 +++++++++++ src/agents/anthropic-transport-stream.ts | 16 ++++++- ...d-helpers.formatassistanterrortext.test.ts | 42 ++++--------------- src/agents/pi-embedded-helpers/errors.test.ts | 31 +++++--------- .../sanitize-user-facing-text.ts | 13 ++---- src/agents/transport-stream-shared.ts | 3 ++ 6 files changed, 62 insertions(+), 68 deletions(-) diff --git a/src/agents/anthropic-transport-stream.test.ts b/src/agents/anthropic-transport-stream.test.ts index 5fd7e259209..0c094315036 100644 --- a/src/agents/anthropic-transport-stream.test.ts +++ b/src/agents/anthropic-transport-stream.test.ts @@ -41,6 +41,14 @@ function createStalledSseResponse(params: { onCancel: (reason: unknown) => void params.onCancel(reason); }, }); + + return new Response(body, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + +function createRawSseResponse(body: string): Response { return new Response(body, { status: 200, headers: { "content-type": "text/event-stream" }, @@ -339,6 +347,23 @@ describe("anthropic transport stream", () => { expect(guardedFetchMock).not.toHaveBeenCalled(); }); + it("classifies malformed Anthropic SSE data as a stable transport error", async () => { + guardedFetchMock.mockResolvedValueOnce(createRawSseResponse('data: {"type":\n\n')); + + const result = await runTransportStream( + makeAnthropicTransportModel(), + { + messages: [{ role: "user", content: "hello" }], + } as AnthropicStreamContext, + { + apiKey: "sk-ant-api", + } as AnthropicStreamOptions, + ); + + expect(result.stopReason).toBe("error"); + expect(result.errorMessage).toBe("OpenClaw transport error: malformed_streaming_fragment"); + }); + it("preserves Anthropic OAuth identity and tool-name remapping with transport overrides", async () => { guardedFetchMock.mockResolvedValueOnce( createSseResponse([ diff --git a/src/agents/anthropic-transport-stream.ts b/src/agents/anthropic-transport-stream.ts index 115b6626a31..1ee3cfc759b 100644 --- a/src/agents/anthropic-transport-stream.ts +++ b/src/agents/anthropic-transport-stream.ts @@ -24,6 +24,7 @@ import { createWritableTransportEventStream, failTransportStream, finalizeTransportStream, + MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE, mergeTransportHeaders, sanitizeNonEmptyTransportPayloadText, sanitizeTransportPayloadText, @@ -534,6 +535,17 @@ function readAnthropicSseChunk( }); } +function parseAnthropicSseEventData(data: string): Record { + try { + return JSON.parse(data) as Record; + } catch (error) { + if (error instanceof SyntaxError) { + throw new Error(MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE, { cause: error }); + } + throw error; + } +} + async function* parseAnthropicSseBody( body: ReadableStream, signal?: AbortSignal, @@ -558,7 +570,7 @@ async function* parseAnthropicSseBody( .map((line) => line.slice(5).trimStart()) .join("\n"); if (data && data !== "[DONE]") { - yield JSON.parse(data) as Record; + yield parseAnthropicSseEventData(data); } frameEnd = buffer.indexOf("\n\n"); } @@ -571,7 +583,7 @@ async function* parseAnthropicSseBody( .map((line) => line.slice(5).trimStart()) .join("\n"); if (data && data !== "[DONE]") { - yield JSON.parse(data) as Record; + yield parseAnthropicSseEventData(data); } } } finally { diff --git a/src/agents/pi-embedded-helpers.formatassistanterrortext.test.ts b/src/agents/pi-embedded-helpers.formatassistanterrortext.test.ts index 78b6091c96a..78405eed667 100644 --- a/src/agents/pi-embedded-helpers.formatassistanterrortext.test.ts +++ b/src/agents/pi-embedded-helpers.formatassistanterrortext.test.ts @@ -10,6 +10,7 @@ import { sanitizeUserFacingText, } from "./pi-embedded-helpers.js"; import { makeAssistantMessageFixture } from "./test-helpers/assistant-message-fixtures.js"; +import { MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE } from "./transport-stream-shared.js"; describe("formatAssistantErrorText", () => { const makeAssistantError = (errorMessage: string): AssistantMessage => @@ -351,28 +352,8 @@ describe("formatAssistantErrorText", () => { ); }); - it("sanitizes streaming JSON parse errors from Anthropic SDK (#59076)", () => { - const msg = makeAssistantError( - "Expected ',' or '}' after property value in JSON at position 334 (line 1 column 335)", - ); - expect(formatAssistantErrorText(msg)).toBe( - "LLM streaming response contained a malformed fragment. Please try again.", - ); - }); - - it("sanitizes 'Expected double-quoted property name' JSON parse errors (#59076)", () => { - const msg = makeAssistantError( - "Expected double-quoted property name in JSON at position 8912 (line 219 column 5)", - ); - expect(formatAssistantErrorText(msg)).toBe( - "LLM streaming response contained a malformed fragment. Please try again.", - ); - }); - - it("sanitizes context-proven streaming 'Unexpected token' JSON parse errors (#59076)", () => { - const msg = makeAssistantError( - 'Could not parse Anthropic SSE event content_block_delta: Unexpected token } in JSON at position 14; data={"type":"content_block_delta","delta":{"type":"input_json_delta","partial_json":"}"},"index":1}', - ); + it("sanitizes transport-classified malformed streaming fragments (#59076)", () => { + const msg = makeAssistantError(MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE); expect(formatAssistantErrorText(msg)).toBe( "LLM streaming response contained a malformed fragment. Please try again.", ); @@ -475,19 +456,10 @@ describe("raw API error payload helpers", () => { }); describe("sanitizeUserFacingText — streaming JSON parse error (#59076)", () => { - it("rewrites JSON parse error in error context", () => { - const result = sanitizeUserFacingText( - "Expected ',' or '}' after property value in JSON at position 334 (line 1 column 335)", - { errorContext: true }, - ); - expect(result).toBe("LLM streaming response contained a malformed fragment. Please try again."); - }); - - it.each([ - "Unexpected end of JSON input", - "Unexpected non-whitespace character after JSON at position 4", - ])("rewrites plain JSON.parse error variants in error context: %s", (text) => { - const result = sanitizeUserFacingText(text, { errorContext: true }); + it("rewrites transport-classified malformed streaming fragments in error context", () => { + const result = sanitizeUserFacingText(MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE, { + errorContext: true, + }); expect(result).toBe("LLM streaming response contained a malformed fragment. Please try again."); }); diff --git a/src/agents/pi-embedded-helpers/errors.test.ts b/src/agents/pi-embedded-helpers/errors.test.ts index e8cd0d9ffd8..ad8fab14805 100644 --- a/src/agents/pi-embedded-helpers/errors.test.ts +++ b/src/agents/pi-embedded-helpers/errors.test.ts @@ -1,6 +1,7 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it } from "vitest"; import { makeAssistantMessageFixture } from "../test-helpers/assistant-message-fixtures.js"; +import { MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE } from "../transport-stream-shared.js"; import { formatAssistantErrorText } from "./errors.js"; describe("formatAssistantErrorText streaming JSON parse classification", () => { @@ -10,31 +11,19 @@ describe("formatAssistantErrorText streaming JSON parse classification", () => { content: [{ type: "text", text: errorMessage }], }); - it("suppresses raw streaming tool-call fragment parse failures", () => { + it("suppresses transport-classified malformed streaming fragments", () => { + const msg = makeAssistantError(MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE); + expect(formatAssistantErrorText(msg)).toBe( + "LLM streaming response contained a malformed fragment. Please try again.", + ); + }); + + it("does not suppress unclassified JSON.parse text", () => { const msg = makeAssistantError( "Expected ',' or '}' after property value in JSON at position 334 (line 1 column 335)", ); expect(formatAssistantErrorText(msg)).toBe( - "LLM streaming response contained a malformed fragment. Please try again.", - ); - }); - - it.each([ - "Unexpected end of JSON input", - "Unexpected non-whitespace character after JSON at position 4", - ])("suppresses plain JSON.parse streaming fragment failures: %s", (errorMessage) => { - const msg = makeAssistantError(errorMessage); - expect(formatAssistantErrorText(msg)).toBe( - "LLM streaming response contained a malformed fragment. Please try again.", - ); - }); - - it("suppresses structured Anthropic tool-call delta parse failures", () => { - const msg = makeAssistantError( - 'Could not parse Anthropic SSE event content_block_delta: Unexpected end of JSON input; data={"type":"content_block_delta","delta":{"type":"input_json_delta","partial_json":"{\\"path\\":"},"index":0}', - ); - expect(formatAssistantErrorText(msg)).toBe( - "LLM streaming response contained a malformed fragment. Please try again.", + "Expected ',' or '}' after property value in JSON at position 334 (line 1 column 335)", ); }); diff --git a/src/agents/pi-embedded-helpers/sanitize-user-facing-text.ts b/src/agents/pi-embedded-helpers/sanitize-user-facing-text.ts index 1091b5c2984..c7aa101fe79 100644 --- a/src/agents/pi-embedded-helpers/sanitize-user-facing-text.ts +++ b/src/agents/pi-embedded-helpers/sanitize-user-facing-text.ts @@ -14,6 +14,7 @@ import { import { formatExecDeniedUserMessage } from "../exec-approval-result.js"; import { stripInternalRuntimeContext } from "../internal-runtime-context.js"; import { stableStringify } from "../stable-stringify.js"; +import { MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE } from "../transport-stream-shared.js"; import { isBillingErrorMessage, isOverloadedErrorMessage, @@ -214,18 +215,10 @@ export function isStreamingJsonParseError(raw: string): boolean { return false; } const trimmed = raw.trim(); - if ( - /\bcould not parse anthropic sse event\b/i.test(trimmed) && - /\b(?:content_block_delta|input_json_delta|partial_json|tool_use)\b/i.test(trimmed) && - (/\b(?:expected|unexpected|unterminated)\b.+\bin json\b.+\bposition\b/i.test(trimmed) || - /\bunexpected end of json input\b/i.test(trimmed)) - ) { + if (trimmed === MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE) { return true; } - - return /^(?:Unexpected end of JSON input|Unexpected non-whitespace character after JSON at position \d+(?: \(line \d+ column \d+\))?|(?:Expected (?:',' or '\}' after property value|double-quoted property name|':' after property name|',' or '\]' after array element)|Unterminated string) in JSON at position \d+(?: \(line \d+ column \d+\))?)$/i.test( - trimmed, - ); + return false; } function hasRateLimitTpmHint(raw: string): boolean { diff --git a/src/agents/transport-stream-shared.ts b/src/agents/transport-stream-shared.ts index e0b87d2bc17..2bb7ac00d91 100644 --- a/src/agents/transport-stream-shared.ts +++ b/src/agents/transport-stream-shared.ts @@ -21,6 +21,9 @@ type TransportOutputShape = { export const EMPTY_TOOL_RESULT_TEXT = "(no output)"; +export const MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE = + "OpenClaw transport error: malformed_streaming_fragment"; + export function sanitizeTransportPayloadText(text: string): string { return text.replace( /[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?