From 930d81aa41d26ef72343c88c0acefa25596d900a Mon Sep 17 00:00:00 2001 From: Chunyue Wang <80630709+openperf@users.noreply.github.com> Date: Sun, 26 Apr 2026 02:04:40 +0800 Subject: [PATCH] fix(agents): prevent Bedrock replay death loop on empty assistant content (#71627) * fix(agents): prevent Bedrock replay death loop on empty assistant content Fixes #71572 * docs: document Bedrock replay repair (#71627) (thanks @openperf) * fix(diagnostics): share diagnostic event state across sdk graphs --------- Co-authored-by: Peter Steinberger --- CHANGELOG.md | 5 + docs/reference/transcript-hygiene.md | 21 ++- .../pi-embedded-runner/replay-history.test.ts | 115 ++++++++++++++ .../pi-embedded-runner/replay-history.ts | 78 ++++++++-- src/agents/session-file-repair.test.ts | 143 ++++++++++++++++++ src/agents/session-file-repair.ts | 86 ++++++++++- src/agents/stream-message-shared.test.ts | 44 ++++++ src/agents/stream-message-shared.ts | 19 ++- src/infra/diagnostic-events.ts | 24 ++- 9 files changed, 500 insertions(+), 35 deletions(-) create mode 100644 src/agents/pi-embedded-runner/replay-history.test.ts create mode 100644 src/agents/stream-message-shared.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 5662fc6b91e..0761d2755ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,11 @@ Docs: https://docs.openclaw.ai detection. Closes #33360. Thanks @smartchainark. - Subagents/browser: show an actionable `/tools` notice when browser automation is configured but filtered out by the active tool profile, and document that coding-profile agents should use `tools.alsoAllow: ["browser"]` rather than subagent allowlists alone. - Control UI/Quick Settings: persist the assistant avatar override to browser local storage (mirroring the user avatar) so uploaded image data URLs no longer fail config validation with "Too big: expected string to have <=200 characters". Also lift the gateway-side `ui.assistant.avatar` length cap to match the user avatar size budget for non-UI clients writing the field directly. Thanks @BunsDev. +- Plugin SDK: share diagnostic event subscriptions across duplicate source/dist + module graphs so legacy root SDK imports still receive runtime diagnostic events. +- Agents/Bedrock: prevent empty assistant stream-error turns from poisoning + Converse replay by persisting, repairing, and replaying a non-empty fallback + block. Fixes #71572. (#71627) Thanks @openperf. - Browser/CDP: make readiness diagnostics use the same discovery-first fallback as reachability for bare `ws://` Browserless and Browserbase CDP URLs. Fixes #69532. - Browser/CDP: explain that loopback Browserless or other externally managed CDP services need `attachOnly: true` and matching Browserless `EXTERNAL` endpoint when reporting local port ownership conflicts, and fall back to the configured bare WebSocket root when a discovered Browserless endpoint rejects CDP. Fixes #49815. - ACP/OpenCode: update the bundled acpx runtime to 0.6.0 and cover the OpenCode ACP bind path in Docker live tests. diff --git a/docs/reference/transcript-hygiene.md b/docs/reference/transcript-hygiene.md index 7d4bdbb069b..cc4b3ba751d 100644 --- a/docs/reference/transcript-hygiene.md +++ b/docs/reference/transcript-hygiene.md @@ -8,11 +8,12 @@ title: "Transcript hygiene" --- This document describes **provider-specific fixes** applied to transcripts before a run -(building model context). These are **in-memory** adjustments used to satisfy strict -provider requirements. These hygiene steps do **not** rewrite the stored JSONL transcript -on disk; however, a separate session-file repair pass may rewrite malformed JSONL files -by dropping invalid lines before the session is loaded. When a repair occurs, the original -file is backed up alongside the session file. +(building model context). Most of these are **in-memory** adjustments used to satisfy +strict provider requirements. A separate session-file repair pass may also rewrite +stored JSONL before the session is loaded, either by dropping malformed JSONL lines or +by repairing persisted turns that are syntactically valid but known to be rejected by a +provider during replay. When a repair occurs, the original file is backed up alongside +the session file. Scope includes: @@ -24,6 +25,7 @@ Scope includes: - Thought signature cleanup - Image payload sanitization - User-input provenance tagging (for inter-session routed prompts) +- Empty assistant error-turn repair for Bedrock Converse replay If you need transcript storage details, see: @@ -132,6 +134,15 @@ external end-user instructions. - Tool result pairing repair and synthetic tool results. - Turn validation (merge consecutive user turns to satisfy strict alternation). +**Amazon Bedrock (Converse API)** + +- Empty assistant stream-error turns are repaired to a non-empty fallback text block + before replay. Bedrock Converse rejects assistant messages with `content: []`, so + persisted assistant turns with `stopReason: "error"` and empty content are also + repaired on disk before load. +- Replay filters OpenClaw delivery-mirror and gateway-injected assistant turns. +- Image sanitization applies through the global rule. + **Mistral (including model-id based detection)** - Tool call id sanitization: strict9 (alphanumeric length 9). diff --git a/src/agents/pi-embedded-runner/replay-history.test.ts b/src/agents/pi-embedded-runner/replay-history.test.ts new file mode 100644 index 00000000000..a294cb060a2 --- /dev/null +++ b/src/agents/pi-embedded-runner/replay-history.test.ts @@ -0,0 +1,115 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { describe, expect, it } from "vitest"; +import { normalizeAssistantReplayContent } from "./replay-history.js"; + +const FALLBACK_TEXT = "[assistant turn failed before producing content]"; + +function bedrockAssistant( + content: unknown, + stopReason: "error" | "stop" | "toolUse" | "length" = "error", +): AgentMessage { + return { + role: "assistant", + content, + api: "bedrock-converse-stream", + provider: "amazon-bedrock", + model: "anthropic.claude-3-haiku-20240307-v1:0", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason, + timestamp: 0, + } as unknown as AgentMessage; +} + +function userMessage(text: string): AgentMessage { + return { role: "user", content: text, timestamp: 0 } as unknown as AgentMessage; +} + +function openclawTranscriptAssistant(model: "delivery-mirror" | "gateway-injected"): AgentMessage { + return { + role: "assistant", + content: [{ type: "text", text: "channel mirror" }], + api: "openai-responses", + provider: "openclaw", + model, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: 0, + } as unknown as AgentMessage; +} + +describe("normalizeAssistantReplayContent", () => { + it("converts assistant content: [] to a non-empty sentinel text block when stopReason is error", () => { + const messages = [userMessage("hello"), bedrockAssistant([], "error")]; + const out = normalizeAssistantReplayContent(messages); + expect(out).not.toBe(messages); + const repaired = out[1] as AgentMessage & { content: { type: string; text: string }[] }; + expect(repaired.content).toEqual([{ type: "text", text: FALLBACK_TEXT }]); + }); + + it("preserves silent-reply turns (stopReason=stop, content=[]) untouched", () => { + // run.empty-error-retry.test.ts treats `stopReason:"stop"` + `content:[]` + // as a legitimate NO_REPLY / silent-reply, NOT a crash. Substituting the + // failure sentinel here would inject a fabricated "[assistant turn failed + // before producing content]" into the next provider request and change + // model behavior even though no failure occurred. + const silentStop = bedrockAssistant([], "stop"); + const messages = [userMessage("hello"), silentStop]; + const out = normalizeAssistantReplayContent(messages); + expect(out).toBe(messages); + expect(out[1]).toBe(silentStop); + }); + + it("preserves empty content with non-error stopReasons (toolUse, length) untouched", () => { + // Boundary lock: only `stopReason:"error"` should trip the sentinel + // substitution. `toolUse` and `length` are reachable in practice when a + // provider terminates a turn before a content block is emitted, and + // rewriting them as a failure would lie about what happened. + const toolUse = bedrockAssistant([], "toolUse"); + const length = bedrockAssistant([], "length"); + const messages = [userMessage("hello"), toolUse, length]; + const out = normalizeAssistantReplayContent(messages); + expect(out).toBe(messages); + expect(out[1]).toBe(toolUse); + expect(out[2]).toBe(length); + }); + + it("wraps legacy string assistant content as a single text block (regression)", () => { + const messages = [userMessage("hi"), bedrockAssistant("plain string content")]; + const out = normalizeAssistantReplayContent(messages); + const wrapped = out[1] as AgentMessage & { content: { type: string; text: string }[] }; + expect(wrapped.content).toEqual([{ type: "text", text: "plain string content" }]); + }); + + it("filters openclaw delivery-mirror and gateway-injected assistant messages from replay", () => { + const messages = [ + userMessage("hello"), + openclawTranscriptAssistant("delivery-mirror"), + bedrockAssistant([{ type: "text", text: "real reply" }]), + openclawTranscriptAssistant("gateway-injected"), + ]; + const out = normalizeAssistantReplayContent(messages); + expect(out).toHaveLength(2); + expect((out[0] as { role: string }).role).toBe("user"); + expect((out[1] as { provider: string }).provider).toBe("amazon-bedrock"); + }); + + it("returns the original array reference when nothing needs to change", () => { + const messages = [userMessage("hello"), bedrockAssistant([{ type: "text", text: "fine" }])]; + const out = normalizeAssistantReplayContent(messages); + expect(out).toBe(messages); + }); +}); diff --git a/src/agents/pi-embedded-runner/replay-history.ts b/src/agents/pi-embedded-runner/replay-history.ts index b912b4b1234..a82fda16bf7 100644 --- a/src/agents/pi-embedded-runner/replay-history.ts +++ b/src/agents/pi-embedded-runner/replay-history.ts @@ -28,6 +28,7 @@ import { sanitizeToolUseResultPairing, stripToolResultDetails, } from "../session-transcript-repair.js"; +import { STREAM_ERROR_FALLBACK_TEXT } from "../stream-message-shared.js"; import { sanitizeToolCallIdsForCloudCodeAssist } from "../tool-call-id.js"; import type { TranscriptPolicy } from "../transcript-policy.js"; import { @@ -227,20 +228,77 @@ function stripStaleAssistantUsageBeforeLatestCompaction(messages: AgentMessage[] return touched ? out : messages; } +// `provider:"openclaw"` assistant entries written by the channel-delivery +// transcript mirror (`model:"delivery-mirror"`, see config/sessions/transcript.ts) +// and by the Gateway transcript-inject helper (`model:"gateway-injected"`, see +// gateway/server-methods/chat-transcript-inject.ts) are user-visible transcript +// records, not model output. Replaying them to the actual provider duplicates +// content and, on Bedrock or strict OpenAI-compatible providers, can also +// trigger turn-ordering rejections. +const TRANSCRIPT_ONLY_OPENCLAW_MODELS = new Set(["delivery-mirror", "gateway-injected"]); + +function isTranscriptOnlyOpenclawAssistant(message: AgentMessage): boolean { + if (!message || message.role !== "assistant") { + return false; + } + const provider = (message as { provider?: unknown }).provider; + const model = (message as { model?: unknown }).model; + return ( + provider === "openclaw" && + typeof model === "string" && + TRANSCRIPT_ONLY_OPENCLAW_MODELS.has(model) + ); +} + export function normalizeAssistantReplayContent(messages: AgentMessage[]): AgentMessage[] { let touched = false; - const out = [...messages]; - for (let i = 0; i < out.length; i += 1) { - const message = out[i]; - const replayContent = (message as { content?: unknown } | undefined)?.content; - if (!message || message.role !== "assistant" || typeof replayContent !== "string") { + const out: AgentMessage[] = []; + for (const message of messages) { + if (!message || message.role !== "assistant") { + out.push(message); continue; } - out[i] = { - ...message, - content: [{ type: "text", text: replayContent }], - }; - touched = true; + if (isTranscriptOnlyOpenclawAssistant(message)) { + // Drop from the in-memory replay copy; the persisted JSONL keeps the + // entry so user-facing transcript surfaces are unchanged. + touched = true; + continue; + } + const replayContent = (message as { content?: unknown }).content; + if (typeof replayContent === "string") { + out.push({ + ...message, + content: [{ type: "text", text: replayContent }], + }); + touched = true; + continue; + } + if (Array.isArray(replayContent) && replayContent.length === 0) { + // An assistant turn can legitimately end with `content: []` — for + // example the silent-reply / NO_REPLY path locked in by + // run.empty-error-retry.test.ts ("Clean stop with no output is a + // legitimate silent reply, not a crash"). We must NOT inject the + // failure sentinel into those turns: doing so would fabricate a + // failure statement in the next provider request and change model + // behavior even when no failure occurred. + // + // Only `stopReason: "error"` turns are the Bedrock-Converse replay + // poison this fix is scoped to: the provider rejects assistant + // messages with no ContentBlock, and the persisted error turn was + // never going to render anything useful to the model anyway. Leaving + // non-error empty-content turns untouched preserves silent-reply + // semantics on every other code path. + const stopReason = (message as { stopReason?: unknown }).stopReason; + if (stopReason === "error") { + out.push({ + ...message, + content: [{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }], + }); + touched = true; + continue; + } + } + out.push(message); } return touched ? out : messages; } diff --git a/src/agents/session-file-repair.test.ts b/src/agents/session-file-repair.test.ts index a4ba5d398c0..53796f7459a 100644 --- a/src/agents/session-file-repair.test.ts +++ b/src/agents/session-file-repair.test.ts @@ -97,4 +97,147 @@ describe("repairSessionFileIfNeeded", () => { expect(result.reason).toContain("failed to read session file"); expect(warn).toHaveBeenCalledTimes(1); }); + + it("rewrites persisted assistant messages with empty content arrays", async () => { + const { file } = await createTempSessionPath(); + const { header } = buildSessionHeaderAndMessage(); + const poisonedAssistantEntry = { + type: "message", + id: "msg-2", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [], + api: "bedrock-converse-stream", + provider: "amazon-bedrock", + model: "anthropic.claude-3-haiku-20240307-v1:0", + usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 }, + stopReason: "error", + errorMessage: "transient stream failure", + }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(poisonedAssistantEntry)}\n`; + await fs.writeFile(file, original, "utf-8"); + + const warn = vi.fn(); + const result = await repairSessionFileIfNeeded({ sessionFile: file, warn }); + + expect(result.repaired).toBe(true); + expect(result.droppedLines).toBe(0); + expect(result.rewrittenAssistantMessages).toBe(1); + expect(result.backupPath).toBeTruthy(); + // Warn message must omit the "dropped 0 malformed line(s)" noise when + // nothing was dropped; only the rewrite count is reported. + expect(warn).toHaveBeenCalledTimes(1); + const warnMessage = warn.mock.calls[0]?.[0] as string; + expect(warnMessage).toContain("rewrote 1 assistant message(s)"); + expect(warnMessage).not.toContain("dropped"); + + const repaired = await fs.readFile(file, "utf-8"); + const repairedLines = repaired.trim().split("\n"); + expect(repairedLines).toHaveLength(2); + const repairedEntry: { message: { content: { type: string; text: string }[] } } = JSON.parse( + repairedLines[1], + ); + expect(repairedEntry.message.content).toEqual([ + { type: "text", text: "[assistant turn failed before producing content]" }, + ]); + }); + + it("reports both drops and rewrites in the warn message when both occur", async () => { + const { file } = await createTempSessionPath(); + const { header } = buildSessionHeaderAndMessage(); + const poisonedAssistantEntry = { + type: "message", + id: "msg-2", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [], + api: "bedrock-converse-stream", + provider: "amazon-bedrock", + model: "anthropic.claude-3-haiku-20240307-v1:0", + usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 }, + stopReason: "error", + }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(poisonedAssistantEntry)}\n{"type":"message"`; + await fs.writeFile(file, original, "utf-8"); + + const warn = vi.fn(); + const result = await repairSessionFileIfNeeded({ sessionFile: file, warn }); + + expect(result.repaired).toBe(true); + expect(result.droppedLines).toBe(1); + expect(result.rewrittenAssistantMessages).toBe(1); + const warnMessage = warn.mock.calls[0]?.[0] as string; + expect(warnMessage).toContain("dropped 1 malformed line(s)"); + expect(warnMessage).toContain("rewrote 1 assistant message(s)"); + }); + + it("does not rewrite silent-reply turns (stopReason=stop, content=[]) on disk", async () => { + // Mirror of the in-memory replay-history test: a clean stop with no + // content is a legitimate silent reply (NO_REPLY token path). Repair + // must NOT permanently mutate it into a synthetic "[assistant turn + // failed before producing content]" entry — that would corrupt the + // historical transcript and replay fabricated failure text on every + // future provider request. + const { file } = await createTempSessionPath(); + const { header } = buildSessionHeaderAndMessage(); + const silentReplyEntry = { + type: "message", + id: "msg-2", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [], + api: "openai-responses", + provider: "ollama", + model: "glm-5.1:cloud", + usage: { input: 100, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 100 }, + stopReason: "stop", + }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(silentReplyEntry)}\n`; + await fs.writeFile(file, original, "utf-8"); + + const result = await repairSessionFileIfNeeded({ sessionFile: file }); + + expect(result.repaired).toBe(false); + expect(result.rewrittenAssistantMessages ?? 0).toBe(0); + const after = await fs.readFile(file, "utf-8"); + expect(after).toBe(original); + }); + + it("is a no-op on a session that was already repaired", async () => { + const { file } = await createTempSessionPath(); + const { header } = buildSessionHeaderAndMessage(); + const healedEntry = { + type: "message", + id: "msg-2", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [{ type: "text", text: "[assistant turn failed before producing content]" }], + api: "bedrock-converse-stream", + provider: "amazon-bedrock", + model: "anthropic.claude-3-haiku-20240307-v1:0", + usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 }, + stopReason: "error", + }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(healedEntry)}\n`; + await fs.writeFile(file, original, "utf-8"); + + const result = await repairSessionFileIfNeeded({ sessionFile: file }); + + expect(result.repaired).toBe(false); + expect(result.rewrittenAssistantMessages ?? 0).toBe(0); + const after = await fs.readFile(file, "utf-8"); + expect(after).toBe(original); + }); }); diff --git a/src/agents/session-file-repair.ts b/src/agents/session-file-repair.ts index d073a50f501..7c465c176cd 100644 --- a/src/agents/session-file-repair.ts +++ b/src/agents/session-file-repair.ts @@ -1,13 +1,29 @@ import fs from "node:fs/promises"; import path from "node:path"; +import { STREAM_ERROR_FALLBACK_TEXT } from "./stream-message-shared.js"; type RepairReport = { repaired: boolean; droppedLines: number; + rewrittenAssistantMessages?: number; backupPath?: string; reason?: string; }; +// Persisted assistant entries with `content: []` (written by older builds when +// a stream/provider error fired before any block was produced) are valid JSON +// but not valid for AWS Bedrock Converse replay; rewriting them on disk lets a +// poisoned session recover across gateway restarts instead of needing a fresh +// session. The sentinel text is shared with stream-message-shared.ts and +// replay-history.ts so a session repaired offline reads byte-identically to a +// live stream-error turn — that byte-identity is what makes the repair pass +// idempotent (a healed entry is then indistinguishable from a fresh one). + +type SessionMessageEntry = { + type: "message"; + message: { role: "assistant"; content: unknown[] } & Record; +} & Record; + function isSessionHeader(entry: unknown): entry is { type: string; id: string } { if (!entry || typeof entry !== "object") { return false; @@ -16,6 +32,56 @@ function isSessionHeader(entry: unknown): entry is { type: string; id: string } return record.type === "session" && typeof record.id === "string" && record.id.length > 0; } +function isAssistantEntryWithEmptyContent(entry: unknown): entry is SessionMessageEntry { + if (!entry || typeof entry !== "object") { + return false; + } + const record = entry as { type?: unknown; message?: unknown }; + if (record.type !== "message" || !record.message || typeof record.message !== "object") { + return false; + } + const message = record.message as { + role?: unknown; + content?: unknown; + stopReason?: unknown; + }; + if (message.role !== "assistant") { + return false; + } + if (!Array.isArray(message.content) || message.content.length !== 0) { + return false; + } + // Only error turns are eligible for on-disk rewrite. A clean stop with + // empty content (silent-reply / NO_REPLY path documented in + // run.empty-error-retry.test.ts) is a valid historical assistant turn — + // mutating it into a synthetic failure message would permanently corrupt + // the transcript and replay fabricated failure text on future requests. + return message.stopReason === "error"; +} + +function rewriteAssistantEntryWithEmptyContent(entry: SessionMessageEntry): SessionMessageEntry { + return { + ...entry, + message: { + ...entry.message, + content: [{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }], + }, + }; +} + +function buildRepairSummaryParts(droppedLines: number, rewrittenAssistantMessages: number): string { + const parts: string[] = []; + if (droppedLines > 0) { + parts.push(`dropped ${droppedLines} malformed line(s)`); + } + if (rewrittenAssistantMessages > 0) { + parts.push(`rewrote ${rewrittenAssistantMessages} assistant message(s)`); + } + // Caller only invokes this once at least one counter is non-zero, so the + // empty-array branch is unreachable in production. Kept for defensive output. + return parts.length > 0 ? parts.join(", ") : "no changes"; +} + export async function repairSessionFileIfNeeded(params: { sessionFile: string; warn?: (message: string) => void; @@ -41,13 +107,19 @@ export async function repairSessionFileIfNeeded(params: { const lines = content.split(/\r?\n/); const entries: unknown[] = []; let droppedLines = 0; + let rewrittenAssistantMessages = 0; for (const line of lines) { if (!line.trim()) { continue; } try { - const entry = JSON.parse(line); + const entry: unknown = JSON.parse(line); + if (isAssistantEntryWithEmptyContent(entry)) { + entries.push(rewriteAssistantEntryWithEmptyContent(entry)); + rewrittenAssistantMessages += 1; + continue; + } entries.push(entry); } catch { droppedLines += 1; @@ -65,7 +137,7 @@ export async function repairSessionFileIfNeeded(params: { return { repaired: false, droppedLines, reason: "invalid session header" }; } - if (droppedLines === 0) { + if (droppedLines === 0 && rewrittenAssistantMessages === 0) { return { repaired: false, droppedLines: 0 }; } @@ -96,14 +168,16 @@ export async function repairSessionFileIfNeeded(params: { return { repaired: false, droppedLines, + rewrittenAssistantMessages, reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`, }; } params.warn?.( - `session file repaired: dropped ${droppedLines} malformed line(s) (${path.basename( - sessionFile, - )})`, + `session file repaired: ${buildRepairSummaryParts( + droppedLines, + rewrittenAssistantMessages, + )} (${path.basename(sessionFile)})`, ); - return { repaired: true, droppedLines, backupPath }; + return { repaired: true, droppedLines, rewrittenAssistantMessages, backupPath }; } diff --git a/src/agents/stream-message-shared.test.ts b/src/agents/stream-message-shared.test.ts new file mode 100644 index 00000000000..6339bf5d1f3 --- /dev/null +++ b/src/agents/stream-message-shared.test.ts @@ -0,0 +1,44 @@ +import { describe, expect, it } from "vitest"; +import { + STREAM_ERROR_FALLBACK_TEXT, + buildStreamErrorAssistantMessage, +} from "./stream-message-shared.js"; + +const model = { + api: "bedrock-converse-stream", + provider: "amazon-bedrock", + id: "anthropic.claude-3-haiku-20240307-v1:0", +}; + +describe("buildStreamErrorAssistantMessage", () => { + it("never returns an empty content array", () => { + const message = buildStreamErrorAssistantMessage({ + model, + errorMessage: "stream aborted by upstream host=internal.example.com", + }); + expect(Array.isArray(message.content)).toBe(true); + expect(message.content.length).toBeGreaterThan(0); + }); + + it("places only the sentinel in content and never echoes the raw error text", () => { + const message = buildStreamErrorAssistantMessage({ + model, + errorMessage: "stream aborted by upstream host=internal.example.com", + }); + // Replay-visible content must be the canonical sentinel — replaying raw + // provider error strings could leak hostnames/metadata to the model and + // turn them into a prompt-injection surface. + expect(message.content).toEqual([{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }]); + expect(JSON.stringify(message.content)).not.toContain("internal.example.com"); + // The detailed error remains available in the peer field for clients/UIs. + expect(message.errorMessage).toBe("stream aborted by upstream host=internal.example.com"); + expect(message.stopReason).toBe("error"); + }); + + it("uses the same sentinel when errorMessage is blank", () => { + const message = buildStreamErrorAssistantMessage({ model, errorMessage: " " }); + expect(message.content).toEqual([{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }]); + // Original errorMessage is preserved verbatim for clients that surface it. + expect(message.errorMessage).toBe(" "); + }); +}); diff --git a/src/agents/stream-message-shared.ts b/src/agents/stream-message-shared.ts index 5c3f0b0d995..3efdcaa9cef 100644 --- a/src/agents/stream-message-shared.ts +++ b/src/agents/stream-message-shared.ts @@ -72,6 +72,23 @@ export function buildAssistantMessageWithZeroUsage(params: { }); } +// Single canonical sentinel placed in the `content` array of any assistant turn +// that failed before the model produced its own content. AWS Bedrock Converse +// rejects assistant messages with `content: []` during replay ("The content +// field in the Message object at messages.N is empty."), which can persist into +// the session file and trap subsequent turns in a validation-failure loop. The +// raw provider error text is intentionally NOT placed in `content` because that +// array is replayed back to the model on the next turn — provider error strings +// can carry hostnames or upstream metadata, and replaying them as assistant +// content opens a prompt-injection surface (CWE-200). The detailed error stays +// in the peer `errorMessage` field, which clients/UIs read directly and +// providers do not include in their wire payloads. +// +// This constant is the single source of truth used by replay normalization and +// session-file repair as well, so a session repaired offline reads identically +// to a live stream-error turn (and the repair pass remains idempotent). +export const STREAM_ERROR_FALLBACK_TEXT = "[assistant turn failed before producing content]"; + export function buildStreamErrorAssistantMessage(params: { model: StreamModelDescriptor; errorMessage: string; @@ -80,7 +97,7 @@ export function buildStreamErrorAssistantMessage(params: { return { ...buildAssistantMessageWithZeroUsage({ model: params.model, - content: [], + content: [{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }], stopReason: "error", timestamp: params.timestamp, }), diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index 6513adabedf..2b770239dff 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -402,6 +402,7 @@ type QueuedDiagnosticEvent = { }; type DiagnosticEventsGlobalState = { + marker: symbol; enabled: boolean; seq: number; listeners: Set; @@ -411,6 +412,7 @@ type DiagnosticEventsGlobalState = { }; const MAX_ASYNC_DIAGNOSTIC_EVENTS = 10_000; +const DIAGNOSTIC_EVENTS_STATE_KEY = Symbol.for("openclaw.diagnosticEvents.state.v1"); const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set([ "tool.execution.started", "tool.execution.completed", @@ -426,10 +428,9 @@ const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set([ "log.record", ]); -const DIAGNOSTIC_EVENTS_STATE_KEY = Symbol.for("openclaw.diagnosticEvents.state.v1"); - function createDiagnosticEventsState(): DiagnosticEventsGlobalState { return { + marker: DIAGNOSTIC_EVENTS_STATE_KEY, enabled: true, seq: 0, listeners: new Set(), @@ -445,6 +446,7 @@ function isDiagnosticEventsState(value: unknown): value is DiagnosticEventsGloba } const candidate = value as Partial; return ( + candidate.marker === DIAGNOSTIC_EVENTS_STATE_KEY && typeof candidate.enabled === "boolean" && typeof candidate.seq === "number" && candidate.listeners instanceof Set && @@ -454,24 +456,20 @@ function isDiagnosticEventsState(value: unknown): value is DiagnosticEventsGloba ); } -const diagnosticEventsState: DiagnosticEventsGlobalState = (() => { - const globalStore = globalThis as Record; - const existing = globalStore[DIAGNOSTIC_EVENTS_STATE_KEY]; +function getDiagnosticEventsState(): DiagnosticEventsGlobalState { + const globalRecord = globalThis as Record; + const existing = globalRecord[DIAGNOSTIC_EVENTS_STATE_KEY]; if (isDiagnosticEventsState(existing)) { return existing; } - const created = createDiagnosticEventsState(); - Object.defineProperty(globalStore, DIAGNOSTIC_EVENTS_STATE_KEY, { + const state = createDiagnosticEventsState(); + Object.defineProperty(globalThis, DIAGNOSTIC_EVENTS_STATE_KEY, { configurable: true, enumerable: false, - value: created, + value: state, writable: false, }); - return created; -})(); - -function getDiagnosticEventsState(): DiagnosticEventsGlobalState { - return diagnosticEventsState; + return state; } export function isDiagnosticsEnabled(config?: OpenClawConfig): boolean {