diff --git a/src/agents/pi-embedded-runner/run/attempt.test.ts b/src/agents/pi-embedded-runner/run/attempt.test.ts index 15f0cc53284..39b2abe4da7 100644 --- a/src/agents/pi-embedded-runner/run/attempt.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.test.ts @@ -1080,6 +1080,250 @@ describe("wrapStreamFnSanitizeMalformedToolCalls", () => { }, ]); }); + it("drops replayed tool names that are no longer allowlisted", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolUse", id: "call_1", name: "unknown_tool", input: { path: "." } }], + }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "unknown_tool", + content: [{ type: "text", text: "stale result" }], + isError: false, + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { messages: unknown[] }; + expect(seenContext.messages).toEqual([]); + }); + + it("drops ambiguous mangled replay names instead of guessing a tool", async () => { + const messages = [ + { + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "functions.exec2", arguments: {} }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls( + baseFn as never, + new Set(["exec", "exec2"]), + ); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { messages: unknown[] }; + expect(seenContext.messages).toEqual([]); + }); + + it("preserves matching tool results for retained errored assistant turns", async () => { + const messages = [ + { + role: "assistant", + stopReason: "error", + content: [ + { type: "toolCall", id: "call_1", name: "read", arguments: {} }, + { type: "toolCall", name: "read", arguments: {} }, + ], + }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "kept result" }], + isError: false, + }, + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"])); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { messages: unknown[] }; + expect(seenContext.messages).toEqual([ + { + role: "assistant", + stopReason: "error", + content: [{ type: "toolCall", id: "call_1", name: "read", arguments: {} }], + }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "read", + content: [{ type: "text", text: "kept result" }], + isError: false, + }, + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]); + }); + + it("revalidates turn ordering after dropping an assistant replay turn", async () => { + const messages = [ + { + role: "user", + content: [{ type: "text", text: "first" }], + }, + { + role: "assistant", + stopReason: "error", + content: [{ type: "toolCall", name: "read", arguments: {} }], + }, + { + role: "user", + content: [{ type: "text", text: "second" }], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"]), { + validateGeminiTurns: false, + validateAnthropicTurns: true, + }); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ role?: string; content?: unknown[] }>; + }; + expect(seenContext.messages).toEqual([ + { + role: "user", + content: [ + { type: "text", text: "first" }, + { type: "text", text: "second" }, + ], + }, + ]); + }); + + it("drops orphaned Anthropic user tool_result blocks after replay sanitization", async () => { + const messages = [ + { + role: "assistant", + content: [ + { type: "text", text: "partial response" }, + { type: "toolUse", name: "read", input: { path: "." } }, + ], + }, + { + role: "user", + content: [ + { type: "toolResult", toolUseId: "call_1", content: [{ type: "text", text: "stale" }] }, + { type: "text", text: "retry" }, + ], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"]), { + validateGeminiTurns: false, + validateAnthropicTurns: true, + }); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ role?: string; content?: unknown[] }>; + }; + expect(seenContext.messages).toEqual([ + { + role: "assistant", + content: [{ type: "text", text: "partial response" }], + }, + { + role: "user", + content: [{ type: "text", text: "retry" }], + }, + ]); + }); + + it("drops orphaned Anthropic user tool_result blocks after dropping an assistant replay turn", async () => { + const messages = [ + { + role: "user", + content: [{ type: "text", text: "first" }], + }, + { + role: "assistant", + stopReason: "error", + content: [{ type: "toolUse", name: "read", input: { path: "." } }], + }, + { + role: "user", + content: [ + { type: "toolResult", toolUseId: "call_1", content: [{ type: "text", text: "stale" }] }, + { type: "text", text: "second" }, + ], + }, + ]; + const baseFn = vi.fn((_model, _context) => + createFakeStream({ events: [], resultMessage: { role: "assistant", content: [] } }), + ); + + const wrapped = wrapStreamFnSanitizeMalformedToolCalls(baseFn as never, new Set(["read"]), { + validateGeminiTurns: false, + validateAnthropicTurns: true, + }); + const stream = wrapped({} as never, { messages } as never, {} as never) as + | FakeWrappedStream + | Promise; + await Promise.resolve(stream); + + expect(baseFn).toHaveBeenCalledTimes(1); + const seenContext = baseFn.mock.calls[0]?.[1] as { + messages: Array<{ role?: string; content?: unknown[] }>; + }; + expect(seenContext.messages).toEqual([ + { + role: "user", + content: [ + { type: "text", text: "first" }, + { type: "text", text: "second" }, + ], + }, + ]); + }); }); describe("wrapStreamFnRepairMalformedToolCallArguments", () => { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 9b76c6be1a9..fce8565fb5e 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -658,6 +658,16 @@ type ReplayToolCallBlock = { arguments?: unknown; }; +type ReplayToolCallSanitizeReport = { + messages: AgentMessage[]; + droppedAssistantMessages: number; +}; + +type AnthropicToolResultContentBlock = { + type?: unknown; + toolUseId?: unknown; +}; + function isReplayToolCallBlock(block: unknown): block is ReplayToolCallBlock { if (!block || typeof block !== "object") { return false; @@ -692,17 +702,15 @@ function resolveReplayToolCallName( if (!allowedToolNames || allowedToolNames.size === 0) { return trimmed; } - return ( - resolveExactAllowedToolName(trimmed, allowedToolNames) ?? - resolveStructuredAllowedToolName(trimmed, allowedToolNames) - ); + return resolveExactAllowedToolName(trimmed, allowedToolNames); } function sanitizeReplayToolCallInputs( messages: AgentMessage[], allowedToolNames?: Set, -): AgentMessage[] { +): ReplayToolCallSanitizeReport { let changed = false; + let droppedAssistantMessages = 0; const out: AgentMessage[] = []; for (const message of messages) { @@ -752,6 +760,8 @@ function sanitizeReplayToolCallInputs( changed = true; if (nextContent.length > 0) { out.push({ ...message, content: nextContent }); + } else { + droppedAssistantMessages += 1; } continue; } @@ -759,6 +769,76 @@ function sanitizeReplayToolCallInputs( out.push(message); } + return { + messages: changed ? out : messages, + droppedAssistantMessages, + }; +} + +function sanitizeAnthropicReplayToolResults(messages: AgentMessage[]): AgentMessage[] { + let changed = false; + const out: AgentMessage[] = []; + + for (let index = 0; index < messages.length; index += 1) { + const message = messages[index]; + if (!message || typeof message !== "object" || message.role !== "user") { + out.push(message); + continue; + } + if (!Array.isArray(message.content)) { + out.push(message); + continue; + } + + const previous = messages[index - 1]; + const validToolUseIds = new Set(); + if (previous && typeof previous === "object" && previous.role === "assistant") { + const previousContent = (previous as { content?: unknown }).content; + if (Array.isArray(previousContent)) { + for (const block of previousContent) { + if (!block || typeof block !== "object") { + continue; + } + const typedBlock = block as { type?: unknown; id?: unknown }; + if (typedBlock.type !== "toolUse" || typeof typedBlock.id !== "string") { + continue; + } + const trimmedId = typedBlock.id.trim(); + if (trimmedId) { + validToolUseIds.add(trimmedId); + } + } + } + } + + const nextContent = message.content.filter((block) => { + if (!block || typeof block !== "object") { + return true; + } + const typedBlock = block as AnthropicToolResultContentBlock; + if (typedBlock.type !== "toolResult" || typeof typedBlock.toolUseId !== "string") { + return true; + } + return validToolUseIds.size > 0 && validToolUseIds.has(typedBlock.toolUseId); + }); + + if (nextContent.length === message.content.length) { + out.push(message); + continue; + } + + changed = true; + if (nextContent.length > 0) { + out.push({ ...message, content: nextContent }); + continue; + } + + out.push({ + ...message, + content: [{ type: "text", text: "[tool results omitted]" }], + } as AgentMessage); + } + return changed ? out : messages; } @@ -913,6 +993,7 @@ export function wrapStreamFnTrimToolCallNames( export function wrapStreamFnSanitizeMalformedToolCalls( baseFn: StreamFn, allowedToolNames?: Set, + transcriptPolicy?: Pick, ): StreamFn { return (model, context, options) => { const ctx = context as unknown as { messages?: unknown }; @@ -921,13 +1002,26 @@ export function wrapStreamFnSanitizeMalformedToolCalls( return baseFn(model, context, options); } const sanitized = sanitizeReplayToolCallInputs(messages as AgentMessage[], allowedToolNames); - if (sanitized === messages) { + if (sanitized.messages === messages) { return baseFn(model, context, options); } - const paired = sanitizeToolUseResultPairing(sanitized); + let nextMessages = sanitizeToolUseResultPairing(sanitized.messages, { + preserveErroredAssistantResults: true, + }); + if (transcriptPolicy?.validateAnthropicTurns) { + nextMessages = sanitizeAnthropicReplayToolResults(nextMessages); + } + if (sanitized.droppedAssistantMessages > 0 || transcriptPolicy?.validateAnthropicTurns) { + if (transcriptPolicy?.validateGeminiTurns) { + nextMessages = validateGeminiTurns(nextMessages); + } + if (transcriptPolicy?.validateAnthropicTurns) { + nextMessages = validateAnthropicTurns(nextMessages); + } + } const nextContext = { ...(context as unknown as Record), - messages: paired, + messages: nextMessages, } as unknown; return baseFn(model, nextContext as typeof context, options); }; diff --git a/src/agents/session-transcript-repair.ts b/src/agents/session-transcript-repair.ts index e7ab7db94b3..9455837d930 100644 --- a/src/agents/session-transcript-repair.ts +++ b/src/agents/session-transcript-repair.ts @@ -195,6 +195,10 @@ export type ToolCallInputRepairOptions = { allowedToolNames?: Iterable; }; +export type ToolUseResultPairingOptions = { + preserveErroredAssistantResults?: boolean; +}; + export function stripToolResultDetails(messages: AgentMessage[]): AgentMessage[] { let touched = false; const out: AgentMessage[] = []; @@ -327,8 +331,11 @@ export function sanitizeToolCallInputs( return repairToolCallInputs(messages, options).messages; } -export function sanitizeToolUseResultPairing(messages: AgentMessage[]): AgentMessage[] { - return repairToolUseResultPairing(messages).messages; +export function sanitizeToolUseResultPairing( + messages: AgentMessage[], + options?: ToolUseResultPairingOptions, +): AgentMessage[] { + return repairToolUseResultPairing(messages, options).messages; } export type ToolUseRepairReport = { @@ -339,7 +346,10 @@ export type ToolUseRepairReport = { moved: boolean; }; -export function repairToolUseResultPairing(messages: AgentMessage[]): ToolUseRepairReport { +export function repairToolUseResultPairing( + messages: AgentMessage[], + options?: ToolUseResultPairingOptions, +): ToolUseRepairReport { // Anthropic (and Cloud Code Assist) reject transcripts where assistant tool calls are not // immediately followed by matching tool results. Session files can end up with results // displaced (e.g. after user turns) or duplicated. Repair by: @@ -390,18 +400,6 @@ export function repairToolUseResultPairing(messages: AgentMessage[]): ToolUseRep const assistant = msg as Extract; - // Skip tool call extraction for aborted or errored assistant messages. - // When stopReason is "error" or "aborted", the tool_use blocks may be incomplete - // (e.g., partialJson: true) and should not have synthetic tool_results created. - // Creating synthetic results for incomplete tool calls causes API 400 errors: - // "unexpected tool_use_id found in tool_result blocks" - // See: https://github.com/openclaw/openclaw/issues/4597 - const stopReason = (assistant as { stopReason?: string }).stopReason; - if (stopReason === "error" || stopReason === "aborted") { - out.push(msg); - continue; - } - const toolCalls = extractToolCallsFromAssistant(assistant); if (toolCalls.length === 0) { out.push(msg); @@ -459,6 +457,28 @@ export function repairToolUseResultPairing(messages: AgentMessage[]): ToolUseRep } } + // Aborted/errored assistant turns should never synthesize missing tool results, but + // the replay sanitizer can still legitimately retain real tool results for surviving + // tool calls in the same turn after malformed siblings are dropped. + const stopReason = (assistant as { stopReason?: string }).stopReason; + if (stopReason === "error" || stopReason === "aborted") { + out.push(msg); + if (options?.preserveErroredAssistantResults) { + for (const toolCall of toolCalls) { + const result = spanResultsById.get(toolCall.id); + if (!result) { + continue; + } + pushToolResult(result); + } + } + for (const rem of remainder) { + out.push(rem); + } + i = j - 1; + continue; + } + out.push(msg); if (spanResultsById.size > 0 && remainder.length > 0) {