From 524528944f05b5c36d26e0cbf82bb5caf5d2aef4 Mon Sep 17 00:00:00 2001 From: Alex Knight Date: Fri, 1 May 2026 21:24:50 +1000 Subject: [PATCH] fix(agents): trim trailing assistant turns and rewrite blank user messages in session repair (#75606) * fix(agents): trim trailing assistant turns and rewrite blank user messages in session repair Session-file repair now: - Trims trailing assistant messages so the JSONL never ends on role=assistant, preventing the Anthropic 400 prefill-loop that fires when thinking is enabled. (#75271) - Rewrites blank-only user messages to a synthetic '(continue)' placeholder instead of dropping them, so strict providers (Qwen/mlx-vlm, Anthropic) no longer reject transcripts missing a user turn. (#75313) Closes #75271, closes #75313. * refactor: clean up comments in session-file repair * fix(agents): preserve trailing assistant tool-call turns during session trim Mirror the outbound guard (stripTrailingAssistantPrefillTurns): skip assistant entries containing toolCall/toolUse/functionCall blocks so transcript repair can synthesize missing tool results. Addresses PR review feedback from clawsweeper on #75606. --- CHANGELOG.md | 1 + src/agents/session-file-repair.test.ts | 281 +++++++++++++++++++++++-- src/agents/session-file-repair.ts | 99 +++++++-- 3 files changed, 343 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e5e519e0662..05847e99499 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ Docs: https://docs.openclaw.ai - Voice Call CLI: delegate operational `voicecall` commands to the running Gateway runtime and skip webhook startup during CLI-only plugin loading, preventing webhook port conflicts and `setup --json` hangs. Fixes #72345. Thanks @serrurco and @DougButdorf. - Agents/pi-embedded-runner: extract the `abortable` provider-call wrapper from `runEmbeddedAttempt` to module scope so its promise handlers no longer close over the run lexical context, releasing transcripts, tool buffers, and subscription callbacks when a provider call hangs past abort. (#74182) Thanks @cjboy007. - Docker: restore `python3` in the gateway runtime image after the slim-runtime switch. Fixes #75041. +- Agents/session-repair: fix resumed sessions failing with repeated 400 errors on Anthropic and strict OpenAI-compatible providers (Qwen, mlx-vlm) after an interrupted conversation or blank user input. Fixes #75271 and #75313. Thanks @amknight. - CLI/Voice Call: scope `voicecall` command activation to the Voice Call plugin so setup and smoke checks no longer broad-load unrelated plugin runtimes or hang after printing JSON. Thanks @vincentkoc. - Doctor/plugins: warn when restrictive `plugins.allow` is paired with wildcard or plugin-owned tool allowlists, making the exclusive plugin allowlist behavior visible before users hit empty callable-tool runs. Refs #58009 and #64982. Thanks @KR-Python and @BKF-Gitty. - Google Meet/Voice Call: keep Twilio Meet joins in conversation mode and reuse the realtime intro prompt when no voice-call-specific intro is configured, so answered phone bridge calls speak instead of joining silently. Refs #72478. Thanks @DougButdorf. diff --git a/src/agents/session-file-repair.test.ts b/src/agents/session-file-repair.test.ts index 07a37963322..a2f74be2932 100644 --- a/src/agents/session-file-repair.test.ts +++ b/src/agents/session-file-repair.test.ts @@ -2,7 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; -import { repairSessionFileIfNeeded } from "./session-file-repair.js"; +import { BLANK_USER_FALLBACK_TEXT, repairSessionFileIfNeeded } from "./session-file-repair.js"; function buildSessionHeaderAndMessage() { const header = { @@ -100,7 +100,7 @@ describe("repairSessionFileIfNeeded", () => { it("rewrites persisted assistant messages with empty content arrays", async () => { const { file } = await createTempSessionPath(); - const { header } = buildSessionHeaderAndMessage(); + const { header, message } = buildSessionHeaderAndMessage(); const poisonedAssistantEntry = { type: "message", id: "msg-2", @@ -117,7 +117,15 @@ describe("repairSessionFileIfNeeded", () => { errorMessage: "transient stream failure", }, }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(poisonedAssistantEntry)}\n`; + // Follow-up so the session doesn't end on assistant (trailing-trim is tested separately). + const followUp = { + type: "message", + id: "msg-3", + parentId: null, + timestamp: new Date().toISOString(), + message: { role: "user", content: "retry" }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(poisonedAssistantEntry)}\n${JSON.stringify(followUp)}\n`; await fs.writeFile(file, original, "utf-8"); const warn = vi.fn(); @@ -127,8 +135,6 @@ describe("repairSessionFileIfNeeded", () => { expect(result.droppedLines).toBe(0); expect(result.rewrittenAssistantMessages).toBe(1); expect(result.backupPath).toBeTruthy(); - // Warn message must omit the "dropped 0 malformed line(s)" noise when - // nothing was dropped; only the rewrite count is reported. expect(warn).toHaveBeenCalledTimes(1); const warnMessage = warn.mock.calls[0]?.[0] as string; expect(warnMessage).toContain("rewrote 1 assistant message(s)"); @@ -136,16 +142,16 @@ describe("repairSessionFileIfNeeded", () => { const repaired = await fs.readFile(file, "utf-8"); const repairedLines = repaired.trim().split("\n"); - expect(repairedLines).toHaveLength(2); + expect(repairedLines).toHaveLength(4); const repairedEntry: { message: { content: { type: string; text: string }[] } } = JSON.parse( - repairedLines[1], + repairedLines[2], ); expect(repairedEntry.message.content).toEqual([ { type: "text", text: "[assistant turn failed before producing content]" }, ]); }); - it("drops persisted blank user text messages", async () => { + it("rewrites blank-only user text messages to synthetic placeholder instead of dropping", async () => { const { file } = await createTempSessionPath(); const { header, message } = buildSessionHeaderAndMessage(); const blankUserEntry = { @@ -165,13 +171,46 @@ describe("repairSessionFileIfNeeded", () => { const result = await repairSessionFileIfNeeded({ sessionFile: file, warn }); expect(result.repaired).toBe(true); - expect(result.droppedBlankUserMessages).toBe(1); - expect(warn.mock.calls[0]?.[0]).toContain("dropped 1 blank user message(s)"); + expect(result.rewrittenUserMessages).toBe(1); + expect(result.droppedBlankUserMessages).toBe(0); + expect(warn.mock.calls[0]?.[0]).toContain("rewrote 1 user message(s)"); const repaired = await fs.readFile(file, "utf-8"); const repairedLines = repaired.trim().split("\n"); - expect(repairedLines).toHaveLength(2); - expect(JSON.parse(repairedLines[1])?.id).toBe("msg-1"); + expect(repairedLines).toHaveLength(3); + const rewrittenEntry = JSON.parse(repairedLines[1]); + expect(rewrittenEntry.id).toBe("msg-blank"); + expect(rewrittenEntry.message.content).toEqual([ + { type: "text", text: BLANK_USER_FALLBACK_TEXT }, + ]); + }); + + it("rewrites blank string-content user messages to placeholder", async () => { + const { file } = await createTempSessionPath(); + const { header, message } = buildSessionHeaderAndMessage(); + const blankStringUserEntry = { + type: "message", + id: "msg-blank-str", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "user", + content: " ", + }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(blankStringUserEntry)}\n${JSON.stringify(message)}\n`; + await fs.writeFile(file, original, "utf-8"); + + const result = await repairSessionFileIfNeeded({ sessionFile: file }); + + expect(result.repaired).toBe(true); + expect(result.rewrittenUserMessages).toBe(1); + + const repaired = await fs.readFile(file, "utf-8"); + const repairedLines = repaired.trim().split("\n"); + expect(repairedLines).toHaveLength(3); + const rewrittenEntry = JSON.parse(repairedLines[1]); + expect(rewrittenEntry.message.content).toBe(BLANK_USER_FALLBACK_TEXT); }); it("removes blank user text blocks while preserving media blocks", async () => { @@ -237,12 +276,6 @@ describe("repairSessionFileIfNeeded", () => { }); it("does not rewrite silent-reply turns (stopReason=stop, content=[]) on disk", async () => { - // Mirror of the in-memory replay-history test: a clean stop with no - // content is a legitimate silent reply (NO_REPLY token path). Repair - // must NOT permanently mutate it into a synthetic "[assistant turn - // failed before producing content]" entry — that would corrupt the - // historical transcript and replay fabricated failure text on every - // future provider request. const { file } = await createTempSessionPath(); const { header } = buildSessionHeaderAndMessage(); const silentReplyEntry = { @@ -260,7 +293,15 @@ describe("repairSessionFileIfNeeded", () => { stopReason: "stop", }, }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(silentReplyEntry)}\n`; + // Follow-up so the session doesn't end on assistant (trailing-trim is tested separately). + const followUp = { + type: "message", + id: "msg-3", + parentId: null, + timestamp: new Date().toISOString(), + message: { role: "user", content: "follow up" }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(silentReplyEntry)}\n${JSON.stringify(followUp)}\n`; await fs.writeFile(file, original, "utf-8"); const result = await repairSessionFileIfNeeded({ sessionFile: file }); @@ -271,6 +312,198 @@ describe("repairSessionFileIfNeeded", () => { expect(after).toBe(original); }); + it("trims trailing assistant messages from the session file", async () => { + const { file } = await createTempSessionPath(); + const { header, message } = buildSessionHeaderAndMessage(); + const assistantEntry = { + type: "message", + id: "msg-asst", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [{ type: "text", text: "stale answer" }], + stopReason: "stop", + }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry)}\n`; + await fs.writeFile(file, original, "utf-8"); + + const warn = vi.fn(); + const result = await repairSessionFileIfNeeded({ sessionFile: file, warn }); + + expect(result.repaired).toBe(true); + expect(result.trimmedTrailingAssistantMessages).toBe(1); + expect(warn.mock.calls[0]?.[0]).toContain("trimmed 1 trailing assistant message(s)"); + + const repaired = await fs.readFile(file, "utf-8"); + const repairedLines = repaired.trim().split("\n"); + expect(repairedLines).toHaveLength(2); + }); + + it("trims multiple consecutive trailing assistant messages", async () => { + const { file } = await createTempSessionPath(); + const { header, message } = buildSessionHeaderAndMessage(); + const assistantEntry1 = { + type: "message", + id: "msg-asst-1", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [{ type: "text", text: "first" }], + stopReason: "stop", + }, + }; + const assistantEntry2 = { + type: "message", + id: "msg-asst-2", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [{ type: "text", text: "second" }], + stopReason: "stop", + }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry1)}\n${JSON.stringify(assistantEntry2)}\n`; + await fs.writeFile(file, original, "utf-8"); + + const result = await repairSessionFileIfNeeded({ sessionFile: file }); + + expect(result.repaired).toBe(true); + expect(result.trimmedTrailingAssistantMessages).toBe(2); + + const repaired = await fs.readFile(file, "utf-8"); + const repairedLines = repaired.trim().split("\n"); + expect(repairedLines).toHaveLength(2); + }); + + it("does not trim non-trailing assistant messages", async () => { + const { file } = await createTempSessionPath(); + const { header, message } = buildSessionHeaderAndMessage(); + const assistantEntry = { + type: "message", + id: "msg-asst", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [{ type: "text", text: "answer" }], + stopReason: "stop", + }, + }; + const userFollowUp = { + type: "message", + id: "msg-user-2", + parentId: null, + timestamp: new Date().toISOString(), + message: { role: "user", content: "follow up" }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry)}\n${JSON.stringify(userFollowUp)}\n`; + await fs.writeFile(file, original, "utf-8"); + + const result = await repairSessionFileIfNeeded({ sessionFile: file }); + + expect(result.repaired).toBe(false); + expect(result.trimmedTrailingAssistantMessages ?? 0).toBe(0); + }); + + it("preserves trailing assistant messages that contain tool calls", async () => { + const { file } = await createTempSessionPath(); + const { header, message } = buildSessionHeaderAndMessage(); + const toolCallAssistant = { + type: "message", + id: "msg-asst-tc", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [ + { type: "text", text: "Let me check that." }, + { type: "toolCall", id: "call_1", name: "read", input: { path: "/tmp/test" } }, + ], + stopReason: "toolUse", + }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n`; + await fs.writeFile(file, original, "utf-8"); + + const result = await repairSessionFileIfNeeded({ sessionFile: file }); + + expect(result.repaired).toBe(false); + expect(result.trimmedTrailingAssistantMessages ?? 0).toBe(0); + const after = await fs.readFile(file, "utf-8"); + expect(after).toBe(original); + }); + + it("trims non-tool-call assistant but stops at tool-call assistant", async () => { + const { file } = await createTempSessionPath(); + const { header, message } = buildSessionHeaderAndMessage(); + const toolCallAssistant = { + type: "message", + id: "msg-asst-tc", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [{ type: "toolUse", id: "call_1", name: "read" }], + stopReason: "toolUse", + }, + }; + const plainAssistant = { + type: "message", + id: "msg-asst-plain", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [{ type: "text", text: "stale" }], + stopReason: "stop", + }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n${JSON.stringify(plainAssistant)}\n`; + await fs.writeFile(file, original, "utf-8"); + + const result = await repairSessionFileIfNeeded({ sessionFile: file }); + + expect(result.repaired).toBe(true); + expect(result.trimmedTrailingAssistantMessages).toBe(1); + + const repaired = await fs.readFile(file, "utf-8"); + const repairedLines = repaired.trim().split("\n"); + expect(repairedLines).toHaveLength(3); + expect(JSON.parse(repairedLines[2]).id).toBe("msg-asst-tc"); + }); + + it("never trims below the session header", async () => { + const { file } = await createTempSessionPath(); + const { header } = buildSessionHeaderAndMessage(); + const assistantEntry = { + type: "message", + id: "msg-asst", + parentId: null, + timestamp: new Date().toISOString(), + message: { + role: "assistant", + content: [{ type: "text", text: "orphan" }], + stopReason: "stop", + }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(assistantEntry)}\n`; + await fs.writeFile(file, original, "utf-8"); + + const result = await repairSessionFileIfNeeded({ sessionFile: file }); + + expect(result.repaired).toBe(true); + expect(result.trimmedTrailingAssistantMessages).toBe(1); + + const repaired = await fs.readFile(file, "utf-8"); + const repairedLines = repaired.trim().split("\n"); + expect(repairedLines).toHaveLength(1); + expect(JSON.parse(repairedLines[0]).type).toBe("session"); + }); + it("is a no-op on a session that was already repaired", async () => { const { file } = await createTempSessionPath(); const { header } = buildSessionHeaderAndMessage(); @@ -289,7 +522,15 @@ describe("repairSessionFileIfNeeded", () => { stopReason: "error", }, }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(healedEntry)}\n`; + // Follow-up so the session doesn't end on assistant (trailing-trim is tested separately). + const followUp = { + type: "message", + id: "msg-3", + parentId: null, + timestamp: new Date().toISOString(), + message: { role: "user", content: "follow up" }, + }; + const original = `${JSON.stringify(header)}\n${JSON.stringify(healedEntry)}\n${JSON.stringify(followUp)}\n`; await fs.writeFile(file, original, "utf-8"); const result = await repairSessionFileIfNeeded({ sessionFile: file }); diff --git a/src/agents/session-file-repair.ts b/src/agents/session-file-repair.ts index 1db80eb7a73..7bae100b5ad 100644 --- a/src/agents/session-file-repair.ts +++ b/src/agents/session-file-repair.ts @@ -2,24 +2,24 @@ import fs from "node:fs/promises"; import path from "node:path"; import { STREAM_ERROR_FALLBACK_TEXT } from "./stream-message-shared.js"; +/** Placeholder for blank user messages — preserves the user turn so strict + * providers that require at least one user message don't reject the transcript. */ +export const BLANK_USER_FALLBACK_TEXT = "(continue)"; + type RepairReport = { repaired: boolean; droppedLines: number; rewrittenAssistantMessages?: number; droppedBlankUserMessages?: number; rewrittenUserMessages?: number; + trimmedTrailingAssistantMessages?: number; backupPath?: string; reason?: string; }; -// Persisted assistant entries with `content: []` (written by older builds when -// a stream/provider error fired before any block was produced) are valid JSON -// but not valid for AWS Bedrock Converse replay; rewriting them on disk lets a -// poisoned session recover across gateway restarts instead of needing a fresh -// session. The sentinel text is shared with stream-message-shared.ts and -// replay-history.ts so a session repaired offline reads byte-identically to a -// live stream-error turn — that byte-identity is what makes the repair pass -// idempotent (a healed entry is then indistinguishable from a fresh one). +// The sentinel text is shared with stream-message-shared.ts and +// replay-history.ts so a repaired entry is byte-identical to a live +// stream-error turn, keeping the repair pass idempotent. type SessionMessageEntry = { type: "message"; @@ -53,11 +53,8 @@ function isAssistantEntryWithEmptyContent(entry: unknown): entry is SessionMessa if (!Array.isArray(message.content) || message.content.length !== 0) { return false; } - // Only error turns are eligible for on-disk rewrite. A clean stop with - // empty content (silent-reply / NO_REPLY path documented in - // run.empty-error-retry.test.ts) is a valid historical assistant turn — - // mutating it into a synthetic failure message would permanently corrupt - // the transcript and replay fabricated failure text on future requests. + // Only error stops — clean stops with empty content (NO_REPLY path) are + // valid silent replies that must not be overwritten with synthetic text. return message.stopReason === "error"; } @@ -79,7 +76,19 @@ type UserEntryRepair = function repairUserEntryWithBlankTextContent(entry: SessionMessageEntry): UserEntryRepair { const content = entry.message.content; if (typeof content === "string") { - return content.trim() ? { kind: "keep" } : { kind: "drop" }; + if (content.trim()) { + return { kind: "keep" }; + } + return { + kind: "rewrite", + entry: { + ...entry, + message: { + ...entry.message, + content: BLANK_USER_FALLBACK_TEXT, + }, + }, + }; } if (!Array.isArray(content)) { return { kind: "keep" }; @@ -101,7 +110,16 @@ function repairUserEntryWithBlankTextContent(entry: SessionMessageEntry): UserEn return false; }); if (nextContent.length === 0) { - return { kind: "drop" }; + return { + kind: "rewrite", + entry: { + ...entry, + message: { + ...entry.message, + content: [{ type: "text", text: BLANK_USER_FALLBACK_TEXT }], + }, + }, + }; } if (!touched) { return { kind: "keep" }; @@ -118,11 +136,42 @@ function repairUserEntryWithBlankTextContent(entry: SessionMessageEntry): UserEn }; } +function isToolCallBlock(block: unknown): boolean { + if (!block || typeof block !== "object") { + return false; + } + const type = (block as { type?: unknown }).type; + return type === "toolCall" || type === "toolUse" || type === "functionCall"; +} + +/** Trailing assistant without tool calls — safe to trim from disk. + * Assistant turns with tool calls are kept so transcript repair can + * synthesize missing tool results (mirrors the outbound guard). */ +function isTrimmableTrailingAssistantEntry(entry: unknown): boolean { + if (!entry || typeof entry !== "object") { + return false; + } + const record = entry as { type?: unknown; message?: unknown }; + if (record.type !== "message" || !record.message || typeof record.message !== "object") { + return false; + } + const message = record.message as { role?: unknown; content?: unknown }; + if (message.role !== "assistant") { + return false; + } + const content = message.content; + if (Array.isArray(content) && content.some(isToolCallBlock)) { + return false; + } + return true; +} + function buildRepairSummaryParts(params: { droppedLines: number; rewrittenAssistantMessages: number; droppedBlankUserMessages: number; rewrittenUserMessages: number; + trimmedTrailingAssistantMessages: number; }): string { const parts: string[] = []; if (params.droppedLines > 0) { @@ -137,8 +186,9 @@ function buildRepairSummaryParts(params: { if (params.rewrittenUserMessages > 0) { parts.push(`rewrote ${params.rewrittenUserMessages} user message(s)`); } - // Caller only invokes this once at least one counter is non-zero, so the - // empty-array branch is unreachable in production. Kept for defensive output. + if (params.trimmedTrailingAssistantMessages > 0) { + parts.push(`trimmed ${params.trimmedTrailingAssistantMessages} trailing assistant message(s)`); + } return parts.length > 0 ? parts.join(", ") : "no changes"; } @@ -217,11 +267,21 @@ export async function repairSessionFileIfNeeded(params: { return { repaired: false, droppedLines, reason: "invalid session header" }; } + // Sessions ending on role=assistant cause Anthropic prefill 400s when + // thinking is enabled. The outbound path strips per-request, but leaving + // the file corrupted causes repeated reject cycles across restarts. + let trimmedTrailingAssistantMessages = 0; + while (entries.length > 1 && isTrimmableTrailingAssistantEntry(entries[entries.length - 1])) { + entries.pop(); + trimmedTrailingAssistantMessages += 1; + } + if ( droppedLines === 0 && rewrittenAssistantMessages === 0 && droppedBlankUserMessages === 0 && - rewrittenUserMessages === 0 + rewrittenUserMessages === 0 && + trimmedTrailingAssistantMessages === 0 ) { return { repaired: false, droppedLines: 0 }; } @@ -256,6 +316,7 @@ export async function repairSessionFileIfNeeded(params: { rewrittenAssistantMessages, droppedBlankUserMessages, rewrittenUserMessages, + trimmedTrailingAssistantMessages, reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`, }; } @@ -266,6 +327,7 @@ export async function repairSessionFileIfNeeded(params: { rewrittenAssistantMessages, droppedBlankUserMessages, rewrittenUserMessages, + trimmedTrailingAssistantMessages, })} (${path.basename(sessionFile)})`, ); return { @@ -274,6 +336,7 @@ export async function repairSessionFileIfNeeded(params: { rewrittenAssistantMessages, droppedBlankUserMessages, rewrittenUserMessages, + trimmedTrailingAssistantMessages, backupPath, }; }