mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:20:43 +00:00
fix(agents): prevent Bedrock replay death loop on empty assistant content (#71627)
* fix(agents): prevent Bedrock replay death loop on empty assistant content Fixes #71572 * docs: document Bedrock replay repair (#71627) (thanks @openperf) * fix(diagnostics): share diagnostic event state across sdk graphs --------- Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
@@ -56,6 +56,11 @@ Docs: https://docs.openclaw.ai
|
||||
detection. Closes #33360. Thanks @smartchainark.
|
||||
- Subagents/browser: show an actionable `/tools` notice when browser automation is configured but filtered out by the active tool profile, and document that coding-profile agents should use `tools.alsoAllow: ["browser"]` rather than subagent allowlists alone.
|
||||
- Control UI/Quick Settings: persist the assistant avatar override to browser local storage (mirroring the user avatar) so uploaded image data URLs no longer fail config validation with "Too big: expected string to have <=200 characters". Also lift the gateway-side `ui.assistant.avatar` length cap to match the user avatar size budget for non-UI clients writing the field directly. Thanks @BunsDev.
|
||||
- Plugin SDK: share diagnostic event subscriptions across duplicate source/dist
|
||||
module graphs so legacy root SDK imports still receive runtime diagnostic events.
|
||||
- Agents/Bedrock: prevent empty assistant stream-error turns from poisoning
|
||||
Converse replay by persisting, repairing, and replaying a non-empty fallback
|
||||
block. Fixes #71572. (#71627) Thanks @openperf.
|
||||
- Browser/CDP: make readiness diagnostics use the same discovery-first fallback as reachability for bare `ws://` Browserless and Browserbase CDP URLs. Fixes #69532.
|
||||
- Browser/CDP: explain that loopback Browserless or other externally managed CDP services need `attachOnly: true` and matching Browserless `EXTERNAL` endpoint when reporting local port ownership conflicts, and fall back to the configured bare WebSocket root when a discovered Browserless endpoint rejects CDP. Fixes #49815.
|
||||
- ACP/OpenCode: update the bundled acpx runtime to 0.6.0 and cover the OpenCode ACP bind path in Docker live tests.
|
||||
|
||||
@@ -8,11 +8,12 @@ title: "Transcript hygiene"
|
||||
---
|
||||
|
||||
This document describes **provider-specific fixes** applied to transcripts before a run
|
||||
(building model context). These are **in-memory** adjustments used to satisfy strict
|
||||
provider requirements. These hygiene steps do **not** rewrite the stored JSONL transcript
|
||||
on disk; however, a separate session-file repair pass may rewrite malformed JSONL files
|
||||
by dropping invalid lines before the session is loaded. When a repair occurs, the original
|
||||
file is backed up alongside the session file.
|
||||
(building model context). Most of these are **in-memory** adjustments used to satisfy
|
||||
strict provider requirements. A separate session-file repair pass may also rewrite
|
||||
stored JSONL before the session is loaded, either by dropping malformed JSONL lines or
|
||||
by repairing persisted turns that are syntactically valid but known to be rejected by a
|
||||
provider during replay. When a repair occurs, the original file is backed up alongside
|
||||
the session file.
|
||||
|
||||
Scope includes:
|
||||
|
||||
@@ -24,6 +25,7 @@ Scope includes:
|
||||
- Thought signature cleanup
|
||||
- Image payload sanitization
|
||||
- User-input provenance tagging (for inter-session routed prompts)
|
||||
- Empty assistant error-turn repair for Bedrock Converse replay
|
||||
|
||||
If you need transcript storage details, see:
|
||||
|
||||
@@ -132,6 +134,15 @@ external end-user instructions.
|
||||
- Tool result pairing repair and synthetic tool results.
|
||||
- Turn validation (merge consecutive user turns to satisfy strict alternation).
|
||||
|
||||
**Amazon Bedrock (Converse API)**
|
||||
|
||||
- Empty assistant stream-error turns are repaired to a non-empty fallback text block
|
||||
before replay. Bedrock Converse rejects assistant messages with `content: []`, so
|
||||
persisted assistant turns with `stopReason: "error"` and empty content are also
|
||||
repaired on disk before load.
|
||||
- Replay filters OpenClaw delivery-mirror and gateway-injected assistant turns.
|
||||
- Image sanitization applies through the global rule.
|
||||
|
||||
**Mistral (including model-id based detection)**
|
||||
|
||||
- Tool call id sanitization: strict9 (alphanumeric length 9).
|
||||
|
||||
115
src/agents/pi-embedded-runner/replay-history.test.ts
Normal file
115
src/agents/pi-embedded-runner/replay-history.test.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { normalizeAssistantReplayContent } from "./replay-history.js";
|
||||
|
||||
const FALLBACK_TEXT = "[assistant turn failed before producing content]";
|
||||
|
||||
function bedrockAssistant(
|
||||
content: unknown,
|
||||
stopReason: "error" | "stop" | "toolUse" | "length" = "error",
|
||||
): AgentMessage {
|
||||
return {
|
||||
role: "assistant",
|
||||
content,
|
||||
api: "bedrock-converse-stream",
|
||||
provider: "amazon-bedrock",
|
||||
model: "anthropic.claude-3-haiku-20240307-v1:0",
|
||||
usage: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 0,
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||
},
|
||||
stopReason,
|
||||
timestamp: 0,
|
||||
} as unknown as AgentMessage;
|
||||
}
|
||||
|
||||
function userMessage(text: string): AgentMessage {
|
||||
return { role: "user", content: text, timestamp: 0 } as unknown as AgentMessage;
|
||||
}
|
||||
|
||||
function openclawTranscriptAssistant(model: "delivery-mirror" | "gateway-injected"): AgentMessage {
|
||||
return {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "channel mirror" }],
|
||||
api: "openai-responses",
|
||||
provider: "openclaw",
|
||||
model,
|
||||
usage: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
totalTokens: 0,
|
||||
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||
},
|
||||
stopReason: "stop",
|
||||
timestamp: 0,
|
||||
} as unknown as AgentMessage;
|
||||
}
|
||||
|
||||
describe("normalizeAssistantReplayContent", () => {
|
||||
it("converts assistant content: [] to a non-empty sentinel text block when stopReason is error", () => {
|
||||
const messages = [userMessage("hello"), bedrockAssistant([], "error")];
|
||||
const out = normalizeAssistantReplayContent(messages);
|
||||
expect(out).not.toBe(messages);
|
||||
const repaired = out[1] as AgentMessage & { content: { type: string; text: string }[] };
|
||||
expect(repaired.content).toEqual([{ type: "text", text: FALLBACK_TEXT }]);
|
||||
});
|
||||
|
||||
it("preserves silent-reply turns (stopReason=stop, content=[]) untouched", () => {
|
||||
// run.empty-error-retry.test.ts treats `stopReason:"stop"` + `content:[]`
|
||||
// as a legitimate NO_REPLY / silent-reply, NOT a crash. Substituting the
|
||||
// failure sentinel here would inject a fabricated "[assistant turn failed
|
||||
// before producing content]" into the next provider request and change
|
||||
// model behavior even though no failure occurred.
|
||||
const silentStop = bedrockAssistant([], "stop");
|
||||
const messages = [userMessage("hello"), silentStop];
|
||||
const out = normalizeAssistantReplayContent(messages);
|
||||
expect(out).toBe(messages);
|
||||
expect(out[1]).toBe(silentStop);
|
||||
});
|
||||
|
||||
it("preserves empty content with non-error stopReasons (toolUse, length) untouched", () => {
|
||||
// Boundary lock: only `stopReason:"error"` should trip the sentinel
|
||||
// substitution. `toolUse` and `length` are reachable in practice when a
|
||||
// provider terminates a turn before a content block is emitted, and
|
||||
// rewriting them as a failure would lie about what happened.
|
||||
const toolUse = bedrockAssistant([], "toolUse");
|
||||
const length = bedrockAssistant([], "length");
|
||||
const messages = [userMessage("hello"), toolUse, length];
|
||||
const out = normalizeAssistantReplayContent(messages);
|
||||
expect(out).toBe(messages);
|
||||
expect(out[1]).toBe(toolUse);
|
||||
expect(out[2]).toBe(length);
|
||||
});
|
||||
|
||||
it("wraps legacy string assistant content as a single text block (regression)", () => {
|
||||
const messages = [userMessage("hi"), bedrockAssistant("plain string content")];
|
||||
const out = normalizeAssistantReplayContent(messages);
|
||||
const wrapped = out[1] as AgentMessage & { content: { type: string; text: string }[] };
|
||||
expect(wrapped.content).toEqual([{ type: "text", text: "plain string content" }]);
|
||||
});
|
||||
|
||||
it("filters openclaw delivery-mirror and gateway-injected assistant messages from replay", () => {
|
||||
const messages = [
|
||||
userMessage("hello"),
|
||||
openclawTranscriptAssistant("delivery-mirror"),
|
||||
bedrockAssistant([{ type: "text", text: "real reply" }]),
|
||||
openclawTranscriptAssistant("gateway-injected"),
|
||||
];
|
||||
const out = normalizeAssistantReplayContent(messages);
|
||||
expect(out).toHaveLength(2);
|
||||
expect((out[0] as { role: string }).role).toBe("user");
|
||||
expect((out[1] as { provider: string }).provider).toBe("amazon-bedrock");
|
||||
});
|
||||
|
||||
it("returns the original array reference when nothing needs to change", () => {
|
||||
const messages = [userMessage("hello"), bedrockAssistant([{ type: "text", text: "fine" }])];
|
||||
const out = normalizeAssistantReplayContent(messages);
|
||||
expect(out).toBe(messages);
|
||||
});
|
||||
});
|
||||
@@ -28,6 +28,7 @@ import {
|
||||
sanitizeToolUseResultPairing,
|
||||
stripToolResultDetails,
|
||||
} from "../session-transcript-repair.js";
|
||||
import { STREAM_ERROR_FALLBACK_TEXT } from "../stream-message-shared.js";
|
||||
import { sanitizeToolCallIdsForCloudCodeAssist } from "../tool-call-id.js";
|
||||
import type { TranscriptPolicy } from "../transcript-policy.js";
|
||||
import {
|
||||
@@ -227,20 +228,77 @@ function stripStaleAssistantUsageBeforeLatestCompaction(messages: AgentMessage[]
|
||||
return touched ? out : messages;
|
||||
}
|
||||
|
||||
// `provider:"openclaw"` assistant entries written by the channel-delivery
|
||||
// transcript mirror (`model:"delivery-mirror"`, see config/sessions/transcript.ts)
|
||||
// and by the Gateway transcript-inject helper (`model:"gateway-injected"`, see
|
||||
// gateway/server-methods/chat-transcript-inject.ts) are user-visible transcript
|
||||
// records, not model output. Replaying them to the actual provider duplicates
|
||||
// content and, on Bedrock or strict OpenAI-compatible providers, can also
|
||||
// trigger turn-ordering rejections.
|
||||
const TRANSCRIPT_ONLY_OPENCLAW_MODELS = new Set<string>(["delivery-mirror", "gateway-injected"]);
|
||||
|
||||
function isTranscriptOnlyOpenclawAssistant(message: AgentMessage): boolean {
|
||||
if (!message || message.role !== "assistant") {
|
||||
return false;
|
||||
}
|
||||
const provider = (message as { provider?: unknown }).provider;
|
||||
const model = (message as { model?: unknown }).model;
|
||||
return (
|
||||
provider === "openclaw" &&
|
||||
typeof model === "string" &&
|
||||
TRANSCRIPT_ONLY_OPENCLAW_MODELS.has(model)
|
||||
);
|
||||
}
|
||||
|
||||
export function normalizeAssistantReplayContent(messages: AgentMessage[]): AgentMessage[] {
|
||||
let touched = false;
|
||||
const out = [...messages];
|
||||
for (let i = 0; i < out.length; i += 1) {
|
||||
const message = out[i];
|
||||
const replayContent = (message as { content?: unknown } | undefined)?.content;
|
||||
if (!message || message.role !== "assistant" || typeof replayContent !== "string") {
|
||||
const out: AgentMessage[] = [];
|
||||
for (const message of messages) {
|
||||
if (!message || message.role !== "assistant") {
|
||||
out.push(message);
|
||||
continue;
|
||||
}
|
||||
out[i] = {
|
||||
...message,
|
||||
content: [{ type: "text", text: replayContent }],
|
||||
};
|
||||
touched = true;
|
||||
if (isTranscriptOnlyOpenclawAssistant(message)) {
|
||||
// Drop from the in-memory replay copy; the persisted JSONL keeps the
|
||||
// entry so user-facing transcript surfaces are unchanged.
|
||||
touched = true;
|
||||
continue;
|
||||
}
|
||||
const replayContent = (message as { content?: unknown }).content;
|
||||
if (typeof replayContent === "string") {
|
||||
out.push({
|
||||
...message,
|
||||
content: [{ type: "text", text: replayContent }],
|
||||
});
|
||||
touched = true;
|
||||
continue;
|
||||
}
|
||||
if (Array.isArray(replayContent) && replayContent.length === 0) {
|
||||
// An assistant turn can legitimately end with `content: []` — for
|
||||
// example the silent-reply / NO_REPLY path locked in by
|
||||
// run.empty-error-retry.test.ts ("Clean stop with no output is a
|
||||
// legitimate silent reply, not a crash"). We must NOT inject the
|
||||
// failure sentinel into those turns: doing so would fabricate a
|
||||
// failure statement in the next provider request and change model
|
||||
// behavior even when no failure occurred.
|
||||
//
|
||||
// Only `stopReason: "error"` turns are the Bedrock-Converse replay
|
||||
// poison this fix is scoped to: the provider rejects assistant
|
||||
// messages with no ContentBlock, and the persisted error turn was
|
||||
// never going to render anything useful to the model anyway. Leaving
|
||||
// non-error empty-content turns untouched preserves silent-reply
|
||||
// semantics on every other code path.
|
||||
const stopReason = (message as { stopReason?: unknown }).stopReason;
|
||||
if (stopReason === "error") {
|
||||
out.push({
|
||||
...message,
|
||||
content: [{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }],
|
||||
});
|
||||
touched = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
out.push(message);
|
||||
}
|
||||
return touched ? out : messages;
|
||||
}
|
||||
|
||||
@@ -97,4 +97,147 @@ describe("repairSessionFileIfNeeded", () => {
|
||||
expect(result.reason).toContain("failed to read session file");
|
||||
expect(warn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("rewrites persisted assistant messages with empty content arrays", async () => {
|
||||
const { file } = await createTempSessionPath();
|
||||
const { header } = buildSessionHeaderAndMessage();
|
||||
const poisonedAssistantEntry = {
|
||||
type: "message",
|
||||
id: "msg-2",
|
||||
parentId: null,
|
||||
timestamp: new Date().toISOString(),
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [],
|
||||
api: "bedrock-converse-stream",
|
||||
provider: "amazon-bedrock",
|
||||
model: "anthropic.claude-3-haiku-20240307-v1:0",
|
||||
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 },
|
||||
stopReason: "error",
|
||||
errorMessage: "transient stream failure",
|
||||
},
|
||||
};
|
||||
const original = `${JSON.stringify(header)}\n${JSON.stringify(poisonedAssistantEntry)}\n`;
|
||||
await fs.writeFile(file, original, "utf-8");
|
||||
|
||||
const warn = vi.fn();
|
||||
const result = await repairSessionFileIfNeeded({ sessionFile: file, warn });
|
||||
|
||||
expect(result.repaired).toBe(true);
|
||||
expect(result.droppedLines).toBe(0);
|
||||
expect(result.rewrittenAssistantMessages).toBe(1);
|
||||
expect(result.backupPath).toBeTruthy();
|
||||
// Warn message must omit the "dropped 0 malformed line(s)" noise when
|
||||
// nothing was dropped; only the rewrite count is reported.
|
||||
expect(warn).toHaveBeenCalledTimes(1);
|
||||
const warnMessage = warn.mock.calls[0]?.[0] as string;
|
||||
expect(warnMessage).toContain("rewrote 1 assistant message(s)");
|
||||
expect(warnMessage).not.toContain("dropped");
|
||||
|
||||
const repaired = await fs.readFile(file, "utf-8");
|
||||
const repairedLines = repaired.trim().split("\n");
|
||||
expect(repairedLines).toHaveLength(2);
|
||||
const repairedEntry: { message: { content: { type: string; text: string }[] } } = JSON.parse(
|
||||
repairedLines[1],
|
||||
);
|
||||
expect(repairedEntry.message.content).toEqual([
|
||||
{ type: "text", text: "[assistant turn failed before producing content]" },
|
||||
]);
|
||||
});
|
||||
|
||||
it("reports both drops and rewrites in the warn message when both occur", async () => {
|
||||
const { file } = await createTempSessionPath();
|
||||
const { header } = buildSessionHeaderAndMessage();
|
||||
const poisonedAssistantEntry = {
|
||||
type: "message",
|
||||
id: "msg-2",
|
||||
parentId: null,
|
||||
timestamp: new Date().toISOString(),
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [],
|
||||
api: "bedrock-converse-stream",
|
||||
provider: "amazon-bedrock",
|
||||
model: "anthropic.claude-3-haiku-20240307-v1:0",
|
||||
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 },
|
||||
stopReason: "error",
|
||||
},
|
||||
};
|
||||
const original = `${JSON.stringify(header)}\n${JSON.stringify(poisonedAssistantEntry)}\n{"type":"message"`;
|
||||
await fs.writeFile(file, original, "utf-8");
|
||||
|
||||
const warn = vi.fn();
|
||||
const result = await repairSessionFileIfNeeded({ sessionFile: file, warn });
|
||||
|
||||
expect(result.repaired).toBe(true);
|
||||
expect(result.droppedLines).toBe(1);
|
||||
expect(result.rewrittenAssistantMessages).toBe(1);
|
||||
const warnMessage = warn.mock.calls[0]?.[0] as string;
|
||||
expect(warnMessage).toContain("dropped 1 malformed line(s)");
|
||||
expect(warnMessage).toContain("rewrote 1 assistant message(s)");
|
||||
});
|
||||
|
||||
it("does not rewrite silent-reply turns (stopReason=stop, content=[]) on disk", async () => {
|
||||
// Mirror of the in-memory replay-history test: a clean stop with no
|
||||
// content is a legitimate silent reply (NO_REPLY token path). Repair
|
||||
// must NOT permanently mutate it into a synthetic "[assistant turn
|
||||
// failed before producing content]" entry — that would corrupt the
|
||||
// historical transcript and replay fabricated failure text on every
|
||||
// future provider request.
|
||||
const { file } = await createTempSessionPath();
|
||||
const { header } = buildSessionHeaderAndMessage();
|
||||
const silentReplyEntry = {
|
||||
type: "message",
|
||||
id: "msg-2",
|
||||
parentId: null,
|
||||
timestamp: new Date().toISOString(),
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [],
|
||||
api: "openai-responses",
|
||||
provider: "ollama",
|
||||
model: "glm-5.1:cloud",
|
||||
usage: { input: 100, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 100 },
|
||||
stopReason: "stop",
|
||||
},
|
||||
};
|
||||
const original = `${JSON.stringify(header)}\n${JSON.stringify(silentReplyEntry)}\n`;
|
||||
await fs.writeFile(file, original, "utf-8");
|
||||
|
||||
const result = await repairSessionFileIfNeeded({ sessionFile: file });
|
||||
|
||||
expect(result.repaired).toBe(false);
|
||||
expect(result.rewrittenAssistantMessages ?? 0).toBe(0);
|
||||
const after = await fs.readFile(file, "utf-8");
|
||||
expect(after).toBe(original);
|
||||
});
|
||||
|
||||
it("is a no-op on a session that was already repaired", async () => {
|
||||
const { file } = await createTempSessionPath();
|
||||
const { header } = buildSessionHeaderAndMessage();
|
||||
const healedEntry = {
|
||||
type: "message",
|
||||
id: "msg-2",
|
||||
parentId: null,
|
||||
timestamp: new Date().toISOString(),
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "[assistant turn failed before producing content]" }],
|
||||
api: "bedrock-converse-stream",
|
||||
provider: "amazon-bedrock",
|
||||
model: "anthropic.claude-3-haiku-20240307-v1:0",
|
||||
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 },
|
||||
stopReason: "error",
|
||||
},
|
||||
};
|
||||
const original = `${JSON.stringify(header)}\n${JSON.stringify(healedEntry)}\n`;
|
||||
await fs.writeFile(file, original, "utf-8");
|
||||
|
||||
const result = await repairSessionFileIfNeeded({ sessionFile: file });
|
||||
|
||||
expect(result.repaired).toBe(false);
|
||||
expect(result.rewrittenAssistantMessages ?? 0).toBe(0);
|
||||
const after = await fs.readFile(file, "utf-8");
|
||||
expect(after).toBe(original);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,13 +1,29 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { STREAM_ERROR_FALLBACK_TEXT } from "./stream-message-shared.js";
|
||||
|
||||
type RepairReport = {
|
||||
repaired: boolean;
|
||||
droppedLines: number;
|
||||
rewrittenAssistantMessages?: number;
|
||||
backupPath?: string;
|
||||
reason?: string;
|
||||
};
|
||||
|
||||
// Persisted assistant entries with `content: []` (written by older builds when
|
||||
// a stream/provider error fired before any block was produced) are valid JSON
|
||||
// but not valid for AWS Bedrock Converse replay; rewriting them on disk lets a
|
||||
// poisoned session recover across gateway restarts instead of needing a fresh
|
||||
// session. The sentinel text is shared with stream-message-shared.ts and
|
||||
// replay-history.ts so a session repaired offline reads byte-identically to a
|
||||
// live stream-error turn — that byte-identity is what makes the repair pass
|
||||
// idempotent (a healed entry is then indistinguishable from a fresh one).
|
||||
|
||||
type SessionMessageEntry = {
|
||||
type: "message";
|
||||
message: { role: "assistant"; content: unknown[] } & Record<string, unknown>;
|
||||
} & Record<string, unknown>;
|
||||
|
||||
function isSessionHeader(entry: unknown): entry is { type: string; id: string } {
|
||||
if (!entry || typeof entry !== "object") {
|
||||
return false;
|
||||
@@ -16,6 +32,56 @@ function isSessionHeader(entry: unknown): entry is { type: string; id: string }
|
||||
return record.type === "session" && typeof record.id === "string" && record.id.length > 0;
|
||||
}
|
||||
|
||||
function isAssistantEntryWithEmptyContent(entry: unknown): entry is SessionMessageEntry {
|
||||
if (!entry || typeof entry !== "object") {
|
||||
return false;
|
||||
}
|
||||
const record = entry as { type?: unknown; message?: unknown };
|
||||
if (record.type !== "message" || !record.message || typeof record.message !== "object") {
|
||||
return false;
|
||||
}
|
||||
const message = record.message as {
|
||||
role?: unknown;
|
||||
content?: unknown;
|
||||
stopReason?: unknown;
|
||||
};
|
||||
if (message.role !== "assistant") {
|
||||
return false;
|
||||
}
|
||||
if (!Array.isArray(message.content) || message.content.length !== 0) {
|
||||
return false;
|
||||
}
|
||||
// Only error turns are eligible for on-disk rewrite. A clean stop with
|
||||
// empty content (silent-reply / NO_REPLY path documented in
|
||||
// run.empty-error-retry.test.ts) is a valid historical assistant turn —
|
||||
// mutating it into a synthetic failure message would permanently corrupt
|
||||
// the transcript and replay fabricated failure text on future requests.
|
||||
return message.stopReason === "error";
|
||||
}
|
||||
|
||||
function rewriteAssistantEntryWithEmptyContent(entry: SessionMessageEntry): SessionMessageEntry {
|
||||
return {
|
||||
...entry,
|
||||
message: {
|
||||
...entry.message,
|
||||
content: [{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function buildRepairSummaryParts(droppedLines: number, rewrittenAssistantMessages: number): string {
|
||||
const parts: string[] = [];
|
||||
if (droppedLines > 0) {
|
||||
parts.push(`dropped ${droppedLines} malformed line(s)`);
|
||||
}
|
||||
if (rewrittenAssistantMessages > 0) {
|
||||
parts.push(`rewrote ${rewrittenAssistantMessages} assistant message(s)`);
|
||||
}
|
||||
// Caller only invokes this once at least one counter is non-zero, so the
|
||||
// empty-array branch is unreachable in production. Kept for defensive output.
|
||||
return parts.length > 0 ? parts.join(", ") : "no changes";
|
||||
}
|
||||
|
||||
export async function repairSessionFileIfNeeded(params: {
|
||||
sessionFile: string;
|
||||
warn?: (message: string) => void;
|
||||
@@ -41,13 +107,19 @@ export async function repairSessionFileIfNeeded(params: {
|
||||
const lines = content.split(/\r?\n/);
|
||||
const entries: unknown[] = [];
|
||||
let droppedLines = 0;
|
||||
let rewrittenAssistantMessages = 0;
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const entry = JSON.parse(line);
|
||||
const entry: unknown = JSON.parse(line);
|
||||
if (isAssistantEntryWithEmptyContent(entry)) {
|
||||
entries.push(rewriteAssistantEntryWithEmptyContent(entry));
|
||||
rewrittenAssistantMessages += 1;
|
||||
continue;
|
||||
}
|
||||
entries.push(entry);
|
||||
} catch {
|
||||
droppedLines += 1;
|
||||
@@ -65,7 +137,7 @@ export async function repairSessionFileIfNeeded(params: {
|
||||
return { repaired: false, droppedLines, reason: "invalid session header" };
|
||||
}
|
||||
|
||||
if (droppedLines === 0) {
|
||||
if (droppedLines === 0 && rewrittenAssistantMessages === 0) {
|
||||
return { repaired: false, droppedLines: 0 };
|
||||
}
|
||||
|
||||
@@ -96,14 +168,16 @@ export async function repairSessionFileIfNeeded(params: {
|
||||
return {
|
||||
repaired: false,
|
||||
droppedLines,
|
||||
rewrittenAssistantMessages,
|
||||
reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`,
|
||||
};
|
||||
}
|
||||
|
||||
params.warn?.(
|
||||
`session file repaired: dropped ${droppedLines} malformed line(s) (${path.basename(
|
||||
sessionFile,
|
||||
)})`,
|
||||
`session file repaired: ${buildRepairSummaryParts(
|
||||
droppedLines,
|
||||
rewrittenAssistantMessages,
|
||||
)} (${path.basename(sessionFile)})`,
|
||||
);
|
||||
return { repaired: true, droppedLines, backupPath };
|
||||
return { repaired: true, droppedLines, rewrittenAssistantMessages, backupPath };
|
||||
}
|
||||
|
||||
44
src/agents/stream-message-shared.test.ts
Normal file
44
src/agents/stream-message-shared.test.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
STREAM_ERROR_FALLBACK_TEXT,
|
||||
buildStreamErrorAssistantMessage,
|
||||
} from "./stream-message-shared.js";
|
||||
|
||||
const model = {
|
||||
api: "bedrock-converse-stream",
|
||||
provider: "amazon-bedrock",
|
||||
id: "anthropic.claude-3-haiku-20240307-v1:0",
|
||||
};
|
||||
|
||||
describe("buildStreamErrorAssistantMessage", () => {
|
||||
it("never returns an empty content array", () => {
|
||||
const message = buildStreamErrorAssistantMessage({
|
||||
model,
|
||||
errorMessage: "stream aborted by upstream host=internal.example.com",
|
||||
});
|
||||
expect(Array.isArray(message.content)).toBe(true);
|
||||
expect(message.content.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it("places only the sentinel in content and never echoes the raw error text", () => {
|
||||
const message = buildStreamErrorAssistantMessage({
|
||||
model,
|
||||
errorMessage: "stream aborted by upstream host=internal.example.com",
|
||||
});
|
||||
// Replay-visible content must be the canonical sentinel — replaying raw
|
||||
// provider error strings could leak hostnames/metadata to the model and
|
||||
// turn them into a prompt-injection surface.
|
||||
expect(message.content).toEqual([{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }]);
|
||||
expect(JSON.stringify(message.content)).not.toContain("internal.example.com");
|
||||
// The detailed error remains available in the peer field for clients/UIs.
|
||||
expect(message.errorMessage).toBe("stream aborted by upstream host=internal.example.com");
|
||||
expect(message.stopReason).toBe("error");
|
||||
});
|
||||
|
||||
it("uses the same sentinel when errorMessage is blank", () => {
|
||||
const message = buildStreamErrorAssistantMessage({ model, errorMessage: " " });
|
||||
expect(message.content).toEqual([{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }]);
|
||||
// Original errorMessage is preserved verbatim for clients that surface it.
|
||||
expect(message.errorMessage).toBe(" ");
|
||||
});
|
||||
});
|
||||
@@ -72,6 +72,23 @@ export function buildAssistantMessageWithZeroUsage(params: {
|
||||
});
|
||||
}
|
||||
|
||||
// Single canonical sentinel placed in the `content` array of any assistant turn
|
||||
// that failed before the model produced its own content. AWS Bedrock Converse
|
||||
// rejects assistant messages with `content: []` during replay ("The content
|
||||
// field in the Message object at messages.N is empty."), which can persist into
|
||||
// the session file and trap subsequent turns in a validation-failure loop. The
|
||||
// raw provider error text is intentionally NOT placed in `content` because that
|
||||
// array is replayed back to the model on the next turn — provider error strings
|
||||
// can carry hostnames or upstream metadata, and replaying them as assistant
|
||||
// content opens a prompt-injection surface (CWE-200). The detailed error stays
|
||||
// in the peer `errorMessage` field, which clients/UIs read directly and
|
||||
// providers do not include in their wire payloads.
|
||||
//
|
||||
// This constant is the single source of truth used by replay normalization and
|
||||
// session-file repair as well, so a session repaired offline reads identically
|
||||
// to a live stream-error turn (and the repair pass remains idempotent).
|
||||
export const STREAM_ERROR_FALLBACK_TEXT = "[assistant turn failed before producing content]";
|
||||
|
||||
export function buildStreamErrorAssistantMessage(params: {
|
||||
model: StreamModelDescriptor;
|
||||
errorMessage: string;
|
||||
@@ -80,7 +97,7 @@ export function buildStreamErrorAssistantMessage(params: {
|
||||
return {
|
||||
...buildAssistantMessageWithZeroUsage({
|
||||
model: params.model,
|
||||
content: [],
|
||||
content: [{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }],
|
||||
stopReason: "error",
|
||||
timestamp: params.timestamp,
|
||||
}),
|
||||
|
||||
@@ -402,6 +402,7 @@ type QueuedDiagnosticEvent = {
|
||||
};
|
||||
|
||||
type DiagnosticEventsGlobalState = {
|
||||
marker: symbol;
|
||||
enabled: boolean;
|
||||
seq: number;
|
||||
listeners: Set<DiagnosticEventListener>;
|
||||
@@ -411,6 +412,7 @@ type DiagnosticEventsGlobalState = {
|
||||
};
|
||||
|
||||
const MAX_ASYNC_DIAGNOSTIC_EVENTS = 10_000;
|
||||
const DIAGNOSTIC_EVENTS_STATE_KEY = Symbol.for("openclaw.diagnosticEvents.state.v1");
|
||||
const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set<DiagnosticEventPayload["type"]>([
|
||||
"tool.execution.started",
|
||||
"tool.execution.completed",
|
||||
@@ -426,10 +428,9 @@ const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set<DiagnosticEventPayload["type"]>([
|
||||
"log.record",
|
||||
]);
|
||||
|
||||
const DIAGNOSTIC_EVENTS_STATE_KEY = Symbol.for("openclaw.diagnosticEvents.state.v1");
|
||||
|
||||
function createDiagnosticEventsState(): DiagnosticEventsGlobalState {
|
||||
return {
|
||||
marker: DIAGNOSTIC_EVENTS_STATE_KEY,
|
||||
enabled: true,
|
||||
seq: 0,
|
||||
listeners: new Set<DiagnosticEventListener>(),
|
||||
@@ -445,6 +446,7 @@ function isDiagnosticEventsState(value: unknown): value is DiagnosticEventsGloba
|
||||
}
|
||||
const candidate = value as Partial<DiagnosticEventsGlobalState>;
|
||||
return (
|
||||
candidate.marker === DIAGNOSTIC_EVENTS_STATE_KEY &&
|
||||
typeof candidate.enabled === "boolean" &&
|
||||
typeof candidate.seq === "number" &&
|
||||
candidate.listeners instanceof Set &&
|
||||
@@ -454,24 +456,20 @@ function isDiagnosticEventsState(value: unknown): value is DiagnosticEventsGloba
|
||||
);
|
||||
}
|
||||
|
||||
const diagnosticEventsState: DiagnosticEventsGlobalState = (() => {
|
||||
const globalStore = globalThis as Record<PropertyKey, unknown>;
|
||||
const existing = globalStore[DIAGNOSTIC_EVENTS_STATE_KEY];
|
||||
function getDiagnosticEventsState(): DiagnosticEventsGlobalState {
|
||||
const globalRecord = globalThis as Record<PropertyKey, unknown>;
|
||||
const existing = globalRecord[DIAGNOSTIC_EVENTS_STATE_KEY];
|
||||
if (isDiagnosticEventsState(existing)) {
|
||||
return existing;
|
||||
}
|
||||
const created = createDiagnosticEventsState();
|
||||
Object.defineProperty(globalStore, DIAGNOSTIC_EVENTS_STATE_KEY, {
|
||||
const state = createDiagnosticEventsState();
|
||||
Object.defineProperty(globalThis, DIAGNOSTIC_EVENTS_STATE_KEY, {
|
||||
configurable: true,
|
||||
enumerable: false,
|
||||
value: created,
|
||||
value: state,
|
||||
writable: false,
|
||||
});
|
||||
return created;
|
||||
})();
|
||||
|
||||
function getDiagnosticEventsState(): DiagnosticEventsGlobalState {
|
||||
return diagnosticEventsState;
|
||||
return state;
|
||||
}
|
||||
|
||||
export function isDiagnosticsEnabled(config?: OpenClawConfig): boolean {
|
||||
|
||||
Reference in New Issue
Block a user