fix: harden phase-aware history sanitization

This commit is contained in:
Eva
2026-04-06 05:30:29 +07:00
committed by Peter Steinberger
parent 4bded29f2a
commit 029ed5d32a
5 changed files with 260 additions and 76 deletions

View File

@@ -12,7 +12,7 @@ function asMessages(messages: unknown[]): AgentMessage[] {
function makeDualToolUseAssistantContent() {
return [
{ type: "toolUse", id: "tool-1", name: "test1", input: {} },
{ type: "toolUse", id: "tool-1", name: "test1", arguments: {} },
{ type: "toolUse", id: "tool-2", name: "test2", input: {} },
{ type: "text", text: "Done" },
];
@@ -123,7 +123,7 @@ describe("validateGeminiTurns", () => {
{ role: "user", content: "Use tool" },
{
role: "assistant",
content: [{ type: "toolUse", id: "tool-1", name: "test", input: {} }],
content: [{ type: "toolUse", id: "tool-1", name: "test", arguments: {} }],
},
{
role: "toolResult",
@@ -368,7 +368,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
{
role: "assistant",
content: [
{ type: "toolUse", id: "tool-1", name: "test", input: {} },
{ type: "toolUse", id: "tool-1", name: "test", arguments: {} },
{ type: "text", text: "I'll check that" },
],
},
@@ -389,7 +389,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
{
role: "assistant",
content: [
{ type: "toolUse", id: "tool-1", name: "test", input: {} },
{ type: "toolUse", id: "tool-1", name: "test", arguments: {} },
{ type: "text", text: "Here's result" },
],
},
@@ -408,7 +408,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
// tool_use should be preserved because matching tool_result exists
const assistantContent = (result[1] as { content?: unknown[] }).content;
expect(assistantContent).toEqual([
{ type: "toolUse", id: "tool-1", name: "test", input: {} },
{ type: "toolUse", id: "tool-1", name: "test", arguments: {} },
{ type: "text", text: "Here's result" },
]);
});
@@ -418,7 +418,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
{ role: "user", content: [{ type: "text", text: "Use tool" }] },
{
role: "assistant",
content: [{ type: "toolUse", id: "tool-1", name: "test", input: {} }],
content: [{ type: "toolUse", id: "tool-1", name: "test", arguments: {} }],
},
{ role: "user", content: [{ type: "text", text: "Hello" }] },
]);
@@ -458,7 +458,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
// tool-1 should be preserved (has matching tool_result), tool-2 stripped, text preserved
const assistantContent = (result[1] as { content?: unknown[] }).content;
expect(assistantContent).toEqual([
{ type: "toolUse", id: "tool-1", name: "test1", input: {} },
{ type: "toolUse", id: "tool-1", name: "test1", arguments: {} },
{ type: "text", text: "Done" },
]);
});
@@ -468,7 +468,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
{ role: "user", content: [{ type: "text", text: "Use tool" }] },
{
role: "assistant",
content: [{ type: "toolUse", id: "tool-1", name: "test", input: {} }],
content: [{ type: "toolUse", id: "tool-1", name: "test", arguments: {} }],
},
// Next is assistant, not user - should not strip
{ role: "assistant", content: [{ type: "text", text: "Continue" }] },
@@ -479,7 +479,7 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
expect(result).toHaveLength(3);
// Original tool_use should be preserved
const assistantContent = (result[1] as { content?: unknown[] }).content;
expect(assistantContent).toEqual([{ type: "toolUse", id: "tool-1", name: "test", input: {} }]);
expect(assistantContent).toEqual([{ type: "toolUse", id: "tool-1", name: "test", arguments: {} }]);
});
it("is replay-safe across repeated validation passes", () => {
@@ -510,5 +510,47 @@ describe("validateAnthropicTurns strips dangling tool_use blocks", () => {
expect(() => validateAnthropicTurns(msgs)).not.toThrow();
const result = validateAnthropicTurns(msgs);
expect(result).toHaveLength(3);
expect(result[1]).toMatchObject({ content: "legacy-content" });
});
it("matches tool results from standalone toolResult/tool turns and stops at the next assistant", () => {
const msgs = asMessages([
{ role: "user", content: [{ type: "text", text: "Use tool" }] },
{
role: "assistant",
content: [{ type: "toolCall", id: "tool-1", name: "test", arguments: {} }],
},
{ role: "user", content: [{ type: "text", text: "intermediate" }] },
{ role: "toolResult", tool_call_id: "tool-1", content: [{ type: "text", text: "Result" }] },
{
role: "assistant",
content: [{ type: "text", text: "New assistant boundary" }],
},
{ role: "tool", toolUseId: "tool-1", content: [{ type: "text", text: "Late result" }] },
] as unknown as AgentMessage[]);
const result = validateAnthropicTurns(msgs);
expect((result[1] as { content?: unknown[] }).content).toEqual([
{ type: "toolCall", id: "tool-1", name: "test", arguments: {} },
]);
});
it("does not synthesize fallback text for aborted mid-transcript tool-only turns", () => {
const msgs = asMessages([
{ role: "user", content: [{ type: "text", text: "Use tool" }] },
{
role: "assistant",
stopReason: "aborted",
content: [{ type: "toolCall", id: "tool-1", name: "test", arguments: {} }],
},
{ role: "user", content: [{ type: "text", text: "Retry" }] },
]);
const result = validateAnthropicTurns(msgs);
expect(result[1]).toMatchObject({ stopReason: "aborted", content: [] });
});
});

View File

@@ -1,17 +1,36 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
// Extend this union when pi-agent-core adds new tool-call block types
type AnthropicContentBlock = {
type: "text" | "toolUse" | "toolResult";
type: "text" | "toolUse" | "toolResult" | "toolCall" | "functionCall";
text?: string;
id?: string;
name?: string;
toolUseId?: string;
toolCallId?: string;
tool_use_id?: string;
tool_call_id?: string;
};
/** Recognizes toolUse, toolCall, and functionCall block types from different providers/core versions */
function isToolCallBlock(type: string | undefined): boolean {
return type === "toolUse" || type === "toolCall" || type === "functionCall";
}
function isAbortedAssistantTurn(msg: AgentMessage): boolean {
if (!msg || typeof msg !== "object") {
return false;
}
const stopReason = (msg as { stopReason?: unknown }).stopReason;
return stopReason === "error" || stopReason === "aborted";
}
/**
* Strips dangling tool_use blocks from assistant messages when the immediately
* following user message does not contain a matching tool_result block.
* This fixes the "tool_use ids found without tool_result blocks" error from Anthropic.
* Strips dangling assistant tool-call blocks (toolUse/toolCall/functionCall)
* when no later message in the same assistant span contains a matching
* tool_result block. This fixes the "tool_use ids found without tool_result
* blocks" error from Anthropic. Aborted/error turns are still filtered for
* dangling tool calls, but they do not receive synthesized fallback text.
*/
function stripDanglingAnthropicToolUses(messages: AgentMessage[]): AgentMessage[] {
const result: AgentMessage[] = [];
@@ -33,47 +52,76 @@ function stripDanglingAnthropicToolUses(messages: AgentMessage[]): AgentMessage[
content?: AnthropicContentBlock[];
};
// Get the next message to check for tool_result blocks
const nextMsg = messages[i + 1];
const nextMsgRole =
nextMsg && typeof nextMsg === "object"
? ((nextMsg as { role?: unknown }).role as string | undefined)
: undefined;
const isAbortedTurn = isAbortedAssistantTurn(msg);
// If next message is not user, keep the assistant message as-is
if (nextMsgRole !== "user") {
if (!Array.isArray(assistantMsg.content)) {
result.push(msg);
continue;
}
// Collect tool_use_ids from the next user message's tool_result blocks
const nextUserMsg = nextMsg as {
content?: AnthropicContentBlock[];
};
// Scan ALL subsequent messages in this assistant span for matching tool_result blocks.
// OpenAI-compatible transcripts can have assistant(toolCall) → user(text) → toolResult
// ordering, so we must look beyond the immediate next message.
// TODO: optimize to single-pass suffix set if this helper becomes hot.
const validToolUseIds = new Set<string>();
if (Array.isArray(nextUserMsg.content)) {
for (const block of nextUserMsg.content) {
if (block && block.type === "toolResult" && block.toolUseId) {
validToolUseIds.add(block.toolUseId);
for (let j = i + 1; j < messages.length; j++) {
const futureMsg = messages[j];
if (!futureMsg || typeof futureMsg !== "object") {
continue;
}
const futureRole = (futureMsg as { role?: unknown }).role as string | undefined;
if (futureRole === "assistant") {
break;
}
if (futureRole !== "user" && futureRole !== "toolResult" && futureRole !== "tool") {
continue;
}
const futureUserMsg = futureMsg as {
content?: AnthropicContentBlock[];
toolUseId?: string;
toolCallId?: string;
tool_use_id?: string;
tool_call_id?: string;
};
if (futureRole === "toolResult" || futureRole === "tool") {
const directToolResultId =
futureUserMsg.toolUseId ??
futureUserMsg.toolCallId ??
futureUserMsg.tool_use_id ??
futureUserMsg.tool_call_id;
if (directToolResultId) {
validToolUseIds.add(directToolResultId);
}
}
if (!Array.isArray(futureUserMsg.content)) {
continue;
}
for (const block of futureUserMsg.content) {
if (block && block.type === "toolResult") {
const toolResultId =
block.toolUseId ?? block.toolCallId ?? block.tool_use_id ?? block.tool_call_id;
if (toolResultId) {
validToolUseIds.add(toolResultId);
}
}
}
}
// Filter out tool_use blocks that don't have matching tool_result
const originalContent = Array.isArray(assistantMsg.content) ? assistantMsg.content : [];
// Filter out tool-call blocks that don't have matching tool_result
const originalContent = assistantMsg.content;
const filteredContent = originalContent.filter((block) => {
if (!block) {
return false;
}
if (block.type !== "toolUse") {
if (!isToolCallBlock(block.type)) {
return true;
}
// Keep tool_use if its id is in the valid set
// Keep tool call if its id is in the valid set
return validToolUseIds.has(block.id || "");
});
// If all content would be removed, insert a minimal fallback text block
if (originalContent.length > 0 && filteredContent.length === 0) {
// If all content would be removed, insert a minimal fallback text block for non-aborted turns.
if (originalContent.length > 0 && filteredContent.length === 0 && !isAbortedTurn) {
result.push({
...assistantMsg,
content: [{ type: "text", text: "[tool calls omitted]" }],
@@ -86,6 +134,33 @@ function stripDanglingAnthropicToolUses(messages: AgentMessage[]): AgentMessage[
}
}
// See also: main loop tool_use stripping above
// Handle end-of-conversation orphans: if the last message is assistant with
// tool-call blocks and no following user message, strip those blocks.
if (result.length > 0) {
const lastMsg = result[result.length - 1];
const lastRole =
lastMsg && typeof lastMsg === "object"
? ((lastMsg as { role?: unknown }).role as string | undefined)
: undefined;
if (lastRole === "assistant") {
const lastAssistant = lastMsg as { content?: AnthropicContentBlock[] };
if (Array.isArray(lastAssistant.content)) {
const hasToolUse = lastAssistant.content.some((b) => b && isToolCallBlock(b.type));
if (hasToolUse) {
const filtered = lastAssistant.content.filter((b) => b && !isToolCallBlock(b.type));
result[result.length - 1] =
filtered.length > 0 || isAbortedAssistantTurn(lastMsg)
? ({ ...lastAssistant, content: filtered } as AgentMessage)
: ({
...lastAssistant,
content: [{ type: "text" as const, text: "[tool calls omitted]" }],
} as AgentMessage);
}
}
}
}
return result;
}
@@ -193,6 +268,7 @@ export function validateAnthropicTurns(messages: AgentMessage[]): AgentMessage[]
// First, strip dangling tool_use blocks from assistant messages
const stripped = stripDanglingAnthropicToolUses(messages);
// Then merge consecutive user messages
return validateTurnsWithConsecutiveMerge({
messages: stripped,
role: "user",

View File

@@ -1,6 +1,7 @@
import { extractTextFromChatContent } from "../../shared/chat-content.js";
import { sanitizeUserFacingText } from "../pi-embedded-helpers.js";
import {
extractAssistantVisibleText,
stripDowngradedToolCallText,
stripMinimaxToolCallXml,
stripModelSpecialTokens,
@@ -41,16 +42,29 @@ export function extractAssistantText(message: unknown): string | undefined {
if (!Array.isArray(content)) {
return undefined;
}
const joined =
extractTextFromChatContent(content, {
sanitizeText: sanitizeTextContent,
joinWith: "",
normalizeText: (text) => text.trim(),
}) ?? "";
const hasPhaseMetadata = content.some(
(block) =>
block && typeof block === "object" && typeof (block as { textSignature?: unknown }).textSignature === "string",
);
const joined = hasPhaseMetadata
? (extractAssistantVisibleText(message as Parameters<typeof extractAssistantVisibleText>[0]) ?? "")
: (
extractTextFromChatContent(content, {
sanitizeText: sanitizeTextContent,
joinWith: "",
normalizeText: (text) => text.trim(),
}) ?? ""
);
if (!joined.trim()) {
return undefined;
}
if (hasPhaseMetadata) {
return joined;
}
const stopReason = (message as { stopReason?: unknown }).stopReason;
// Gate on stopReason only — a non-error response with a stale/background errorMessage
// should not have its content rewritten with error templates (#13935).
const errorContext = stopReason === "error";
return joined ? sanitizeUserFacingText(joined, { errorContext }) : undefined;
return sanitizeUserFacingText(joined, { errorContext });
}

View File

@@ -465,6 +465,56 @@ describe("session history HTTP endpoints", () => {
});
});
test("suppresses NO_REPLY-only SSE fast-path updates while preserving raw sequence numbering", async () => {
const { storePath } = await seedSession({ text: "first message" });
await withGatewayHarness(async (harness) => {
const res = await fetchSessionHistory(harness.port, "agent:main:main", {
headers: { Accept: "text/event-stream" },
});
expect(res.status).toBe(200);
const reader = res.body?.getReader();
expect(reader).toBeTruthy();
const streamState = { buffer: "" };
await readSseEvent(reader!, streamState);
const silent = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "NO_REPLY",
storePath,
});
expect(silent.ok).toBe(true);
const visible = await appendAssistantMessageToSessionTranscript({
sessionKey: "agent:main:main",
text: "third visible message",
storePath,
});
expect(visible.ok).toBe(true);
const messageEvent = await readSseEvent(reader!, streamState);
expect(messageEvent.event).toBe("message");
expect(
(messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message
?.content?.[0]?.text,
).toBe("third visible message");
expect((messageEvent.data as { messageSeq?: number }).messageSeq).toBe(3);
expect(
(
messageEvent.data as {
message?: { __openclaw?: { id?: string; seq?: number } };
}
).message?.__openclaw,
).toMatchObject({
id: visible.ok ? visible.messageId : undefined,
seq: 3,
});
await reader?.cancel();
});
});
test("rejects session history when operator.read is not requested", async () => {
await seedSession({ text: "scope-guarded history" });

View File

@@ -25,7 +25,10 @@ import {
resolveGatewaySessionStoreTarget,
resolveSessionTranscriptCandidates,
} from "./session-utils.js";
import { DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS, sanitizeChatHistoryMessages } from "./server-methods/chat.js";
import {
DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS,
sanitizeChatHistoryMessages,
} from "./server-methods/chat.js";
const MAX_SESSION_HISTORY_LIMIT = 1000;
@@ -103,17 +106,21 @@ function paginateSessionMessages(
cursor: string | undefined,
): PaginatedSessionHistory {
const cursorSeq = resolveCursorSeq(cursor);
const endExclusive =
typeof cursorSeq === "number"
? messages.findIndex((message) => {
const seq = resolveMessageSeq(message);
return typeof seq === "number" && seq >= cursorSeq;
})
: -1;
const boundedEndExclusive = endExclusive >= 0 ? endExclusive : messages.length;
const start =
typeof limit === "number" && limit > 0 ? Math.max(0, boundedEndExclusive - limit) : 0;
const items = messages.slice(start, boundedEndExclusive);
let endExclusive = messages.length;
if (typeof cursorSeq === "number") {
endExclusive = messages.findIndex((message, index) => {
const seq = resolveMessageSeq(message);
if (typeof seq === "number") {
return seq >= cursorSeq;
}
return index + 1 >= cursorSeq;
});
if (endExclusive < 0) {
endExclusive = messages.length;
}
}
const start = typeof limit === "number" && limit > 0 ? Math.max(0, endExclusive - limit) : 0;
const items = messages.slice(start, endExclusive);
const firstSeq = resolveMessageSeq(items[0]);
return {
items,
@@ -211,16 +218,13 @@ export async function handleSessionHistoryHttpRequest(
typeof cfg.gateway?.webchat?.chatHistoryMaxChars === "number"
? cfg.gateway.webchat.chatHistoryMaxChars
: DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS;
const history = paginateSessionMessages(
sanitizeChatHistoryMessages(
entry?.sessionId
? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile)
: [],
effectiveMaxChars,
),
limit,
cursor,
const sanitizedMessages = sanitizeChatHistoryMessages(
entry?.sessionId
? readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile)
: [],
effectiveMaxChars,
);
const history = paginateSessionMessages(sanitizedMessages, limit, cursor);
if (!shouldStreamSse(req)) {
sendJson(res, 200, {
@@ -244,6 +248,7 @@ export async function handleSessionHistoryHttpRequest(
: new Set<string>();
let sentHistory = history;
let rawTranscriptSeq = resolveMessageSeq(sentHistory.items.at(-1)) ?? 0;
setSseHeaders(res);
res.write("retry: 1000\n\n");
sseWrite(res, "history", {
@@ -266,30 +271,27 @@ export async function handleSessionHistoryHttpRequest(
return;
}
if (update.message !== undefined) {
const previousSeq = resolveMessageSeq(sentHistory.items.at(-1));
rawTranscriptSeq += 1;
const nextMessage = attachOpenClawTranscriptMeta(update.message, {
...(typeof update.messageId === "string" ? { id: update.messageId } : {}),
seq:
typeof previousSeq === "number"
? previousSeq + 1
: readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile).length,
seq: rawTranscriptSeq,
});
if (limit === undefined && cursor === undefined) {
sentHistory = {
items: [...sentHistory.items, nextMessage],
messages: [...sentHistory.items, nextMessage],
hasMore: false,
};
const sanitized = sanitizeChatHistoryMessages([nextMessage], effectiveMaxChars);
if (sanitized.length === 0) {
return;
}
const sanitizedMessage = sanitized[0];
const sanitizedMsg = sanitized[0];
sentHistory = {
items: [...sentHistory.items, sanitizedMsg],
messages: [...sentHistory.items, sanitizedMsg],
hasMore: false,
};
sseWrite(res, "message", {
sessionKey: target.canonicalKey,
message: sanitizedMessage,
message: sanitizedMsg,
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
messageSeq: resolveMessageSeq(sanitizedMessage),
messageSeq: resolveMessageSeq(sanitizedMsg),
});
return;
}