fix: classify malformed streaming fragments at transport boundary

This commit is contained in:
Mason Huang
2026-04-29 18:18:37 +08:00
parent 36689751c3
commit 946f26f73b
6 changed files with 62 additions and 68 deletions

View File

@@ -41,6 +41,14 @@ function createStalledSseResponse(params: { onCancel: (reason: unknown) => void
params.onCancel(reason);
},
});
return new Response(body, {
status: 200,
headers: { "content-type": "text/event-stream" },
});
}
function createRawSseResponse(body: string): Response {
return new Response(body, {
status: 200,
headers: { "content-type": "text/event-stream" },
@@ -339,6 +347,23 @@ describe("anthropic transport stream", () => {
expect(guardedFetchMock).not.toHaveBeenCalled();
});
it("classifies malformed Anthropic SSE data as a stable transport error", async () => {
guardedFetchMock.mockResolvedValueOnce(createRawSseResponse('data: {"type":\n\n'));
const result = await runTransportStream(
makeAnthropicTransportModel(),
{
messages: [{ role: "user", content: "hello" }],
} as AnthropicStreamContext,
{
apiKey: "sk-ant-api",
} as AnthropicStreamOptions,
);
expect(result.stopReason).toBe("error");
expect(result.errorMessage).toBe("OpenClaw transport error: malformed_streaming_fragment");
});
it("preserves Anthropic OAuth identity and tool-name remapping with transport overrides", async () => {
guardedFetchMock.mockResolvedValueOnce(
createSseResponse([

View File

@@ -24,6 +24,7 @@ import {
createWritableTransportEventStream,
failTransportStream,
finalizeTransportStream,
MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE,
mergeTransportHeaders,
sanitizeNonEmptyTransportPayloadText,
sanitizeTransportPayloadText,
@@ -534,6 +535,17 @@ function readAnthropicSseChunk(
});
}
function parseAnthropicSseEventData(data: string): Record<string, unknown> {
try {
return JSON.parse(data) as Record<string, unknown>;
} catch (error) {
if (error instanceof SyntaxError) {
throw new Error(MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE, { cause: error });
}
throw error;
}
}
async function* parseAnthropicSseBody(
body: ReadableStream<Uint8Array>,
signal?: AbortSignal,
@@ -558,7 +570,7 @@ async function* parseAnthropicSseBody(
.map((line) => line.slice(5).trimStart())
.join("\n");
if (data && data !== "[DONE]") {
yield JSON.parse(data) as Record<string, unknown>;
yield parseAnthropicSseEventData(data);
}
frameEnd = buffer.indexOf("\n\n");
}
@@ -571,7 +583,7 @@ async function* parseAnthropicSseBody(
.map((line) => line.slice(5).trimStart())
.join("\n");
if (data && data !== "[DONE]") {
yield JSON.parse(data) as Record<string, unknown>;
yield parseAnthropicSseEventData(data);
}
}
} finally {

View File

@@ -10,6 +10,7 @@ import {
sanitizeUserFacingText,
} from "./pi-embedded-helpers.js";
import { makeAssistantMessageFixture } from "./test-helpers/assistant-message-fixtures.js";
import { MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE } from "./transport-stream-shared.js";
describe("formatAssistantErrorText", () => {
const makeAssistantError = (errorMessage: string): AssistantMessage =>
@@ -351,28 +352,8 @@ describe("formatAssistantErrorText", () => {
);
});
it("sanitizes streaming JSON parse errors from Anthropic SDK (#59076)", () => {
const msg = makeAssistantError(
"Expected ',' or '}' after property value in JSON at position 334 (line 1 column 335)",
);
expect(formatAssistantErrorText(msg)).toBe(
"LLM streaming response contained a malformed fragment. Please try again.",
);
});
it("sanitizes 'Expected double-quoted property name' JSON parse errors (#59076)", () => {
const msg = makeAssistantError(
"Expected double-quoted property name in JSON at position 8912 (line 219 column 5)",
);
expect(formatAssistantErrorText(msg)).toBe(
"LLM streaming response contained a malformed fragment. Please try again.",
);
});
it("sanitizes context-proven streaming 'Unexpected token' JSON parse errors (#59076)", () => {
const msg = makeAssistantError(
'Could not parse Anthropic SSE event content_block_delta: Unexpected token } in JSON at position 14; data={"type":"content_block_delta","delta":{"type":"input_json_delta","partial_json":"}"},"index":1}',
);
it("sanitizes transport-classified malformed streaming fragments (#59076)", () => {
const msg = makeAssistantError(MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE);
expect(formatAssistantErrorText(msg)).toBe(
"LLM streaming response contained a malformed fragment. Please try again.",
);
@@ -475,19 +456,10 @@ describe("raw API error payload helpers", () => {
});
describe("sanitizeUserFacingText — streaming JSON parse error (#59076)", () => {
it("rewrites JSON parse error in error context", () => {
const result = sanitizeUserFacingText(
"Expected ',' or '}' after property value in JSON at position 334 (line 1 column 335)",
{ errorContext: true },
);
expect(result).toBe("LLM streaming response contained a malformed fragment. Please try again.");
});
it.each([
"Unexpected end of JSON input",
"Unexpected non-whitespace character after JSON at position 4",
])("rewrites plain JSON.parse error variants in error context: %s", (text) => {
const result = sanitizeUserFacingText(text, { errorContext: true });
it("rewrites transport-classified malformed streaming fragments in error context", () => {
const result = sanitizeUserFacingText(MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE, {
errorContext: true,
});
expect(result).toBe("LLM streaming response contained a malformed fragment. Please try again.");
});

View File

@@ -1,6 +1,7 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it } from "vitest";
import { makeAssistantMessageFixture } from "../test-helpers/assistant-message-fixtures.js";
import { MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE } from "../transport-stream-shared.js";
import { formatAssistantErrorText } from "./errors.js";
describe("formatAssistantErrorText streaming JSON parse classification", () => {
@@ -10,31 +11,19 @@ describe("formatAssistantErrorText streaming JSON parse classification", () => {
content: [{ type: "text", text: errorMessage }],
});
it("suppresses raw streaming tool-call fragment parse failures", () => {
it("suppresses transport-classified malformed streaming fragments", () => {
const msg = makeAssistantError(MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE);
expect(formatAssistantErrorText(msg)).toBe(
"LLM streaming response contained a malformed fragment. Please try again.",
);
});
it("does not suppress unclassified JSON.parse text", () => {
const msg = makeAssistantError(
"Expected ',' or '}' after property value in JSON at position 334 (line 1 column 335)",
);
expect(formatAssistantErrorText(msg)).toBe(
"LLM streaming response contained a malformed fragment. Please try again.",
);
});
it.each([
"Unexpected end of JSON input",
"Unexpected non-whitespace character after JSON at position 4",
])("suppresses plain JSON.parse streaming fragment failures: %s", (errorMessage) => {
const msg = makeAssistantError(errorMessage);
expect(formatAssistantErrorText(msg)).toBe(
"LLM streaming response contained a malformed fragment. Please try again.",
);
});
it("suppresses structured Anthropic tool-call delta parse failures", () => {
const msg = makeAssistantError(
'Could not parse Anthropic SSE event content_block_delta: Unexpected end of JSON input; data={"type":"content_block_delta","delta":{"type":"input_json_delta","partial_json":"{\\"path\\":"},"index":0}',
);
expect(formatAssistantErrorText(msg)).toBe(
"LLM streaming response contained a malformed fragment. Please try again.",
"Expected ',' or '}' after property value in JSON at position 334 (line 1 column 335)",
);
});

View File

@@ -14,6 +14,7 @@ import {
import { formatExecDeniedUserMessage } from "../exec-approval-result.js";
import { stripInternalRuntimeContext } from "../internal-runtime-context.js";
import { stableStringify } from "../stable-stringify.js";
import { MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE } from "../transport-stream-shared.js";
import {
isBillingErrorMessage,
isOverloadedErrorMessage,
@@ -214,18 +215,10 @@ export function isStreamingJsonParseError(raw: string): boolean {
return false;
}
const trimmed = raw.trim();
if (
/\bcould not parse anthropic sse event\b/i.test(trimmed) &&
/\b(?:content_block_delta|input_json_delta|partial_json|tool_use)\b/i.test(trimmed) &&
(/\b(?:expected|unexpected|unterminated)\b.+\bin json\b.+\bposition\b/i.test(trimmed) ||
/\bunexpected end of json input\b/i.test(trimmed))
) {
if (trimmed === MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE) {
return true;
}
return /^(?:Unexpected end of JSON input|Unexpected non-whitespace character after JSON at position \d+(?: \(line \d+ column \d+\))?|(?:Expected (?:',' or '\}' after property value|double-quoted property name|':' after property name|',' or '\]' after array element)|Unterminated string) in JSON at position \d+(?: \(line \d+ column \d+\))?)$/i.test(
trimmed,
);
return false;
}
function hasRateLimitTpmHint(raw: string): boolean {

View File

@@ -21,6 +21,9 @@ type TransportOutputShape = {
export const EMPTY_TOOL_RESULT_TEXT = "(no output)";
export const MALFORMED_STREAMING_FRAGMENT_ERROR_MESSAGE =
"OpenClaw transport error: malformed_streaming_fragment";
export function sanitizeTransportPayloadText(text: string): string {
return text.replace(
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF]/g,