fix(agents): trim trailing assistant turns and rewrite blank user messages in session repair (#75606)

* fix(agents): trim trailing assistant turns and rewrite blank user messages in session repair

Session-file repair now:
- Trims trailing assistant messages so the JSONL never ends on
  role=assistant, preventing the Anthropic 400 prefill-loop that
  fires when thinking is enabled. (#75271)
- Rewrites blank-only user messages to a synthetic '(continue)'
  placeholder instead of dropping them, so strict providers
  (Qwen/mlx-vlm, Anthropic) no longer reject transcripts missing
  a user turn. (#75313)

Closes #75271, closes #75313.

* refactor: clean up comments in session-file repair

* fix(agents): preserve trailing assistant tool-call turns during session trim

Mirror the outbound guard (stripTrailingAssistantPrefillTurns):
skip assistant entries containing toolCall/toolUse/functionCall
blocks so transcript repair can synthesize missing tool results.

Addresses PR review feedback from clawsweeper on #75606.
This commit is contained in:
Alex Knight
2026-05-01 21:24:50 +10:00
committed by GitHub
parent 5fbf406beb
commit 524528944f
3 changed files with 343 additions and 38 deletions

View File

@@ -48,6 +48,7 @@ Docs: https://docs.openclaw.ai
- Voice Call CLI: delegate operational `voicecall` commands to the running Gateway runtime and skip webhook startup during CLI-only plugin loading, preventing webhook port conflicts and `setup --json` hangs. Fixes #72345. Thanks @serrurco and @DougButdorf.
- Agents/pi-embedded-runner: extract the `abortable` provider-call wrapper from `runEmbeddedAttempt` to module scope so its promise handlers no longer close over the run lexical context, releasing transcripts, tool buffers, and subscription callbacks when a provider call hangs past abort. (#74182) Thanks @cjboy007.
- Docker: restore `python3` in the gateway runtime image after the slim-runtime switch. Fixes #75041.
- Agents/session-repair: fix resumed sessions failing with repeated 400 errors on Anthropic and strict OpenAI-compatible providers (Qwen, mlx-vlm) after an interrupted conversation or blank user input. Fixes #75271 and #75313. Thanks @amknight.
- CLI/Voice Call: scope `voicecall` command activation to the Voice Call plugin so setup and smoke checks no longer broad-load unrelated plugin runtimes or hang after printing JSON. Thanks @vincentkoc.
- Doctor/plugins: warn when restrictive `plugins.allow` is paired with wildcard or plugin-owned tool allowlists, making the exclusive plugin allowlist behavior visible before users hit empty callable-tool runs. Refs #58009 and #64982. Thanks @KR-Python and @BKF-Gitty.
- Google Meet/Voice Call: keep Twilio Meet joins in conversation mode and reuse the realtime intro prompt when no voice-call-specific intro is configured, so answered phone bridge calls speak instead of joining silently. Refs #72478. Thanks @DougButdorf.

View File

@@ -2,7 +2,7 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { repairSessionFileIfNeeded } from "./session-file-repair.js";
import { BLANK_USER_FALLBACK_TEXT, repairSessionFileIfNeeded } from "./session-file-repair.js";
function buildSessionHeaderAndMessage() {
const header = {
@@ -100,7 +100,7 @@ describe("repairSessionFileIfNeeded", () => {
it("rewrites persisted assistant messages with empty content arrays", async () => {
const { file } = await createTempSessionPath();
const { header } = buildSessionHeaderAndMessage();
const { header, message } = buildSessionHeaderAndMessage();
const poisonedAssistantEntry = {
type: "message",
id: "msg-2",
@@ -117,7 +117,15 @@ describe("repairSessionFileIfNeeded", () => {
errorMessage: "transient stream failure",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(poisonedAssistantEntry)}\n`;
// Follow-up so the session doesn't end on assistant (trailing-trim is tested separately).
const followUp = {
type: "message",
id: "msg-3",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "retry" },
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(poisonedAssistantEntry)}\n${JSON.stringify(followUp)}\n`;
await fs.writeFile(file, original, "utf-8");
const warn = vi.fn();
@@ -127,8 +135,6 @@ describe("repairSessionFileIfNeeded", () => {
expect(result.droppedLines).toBe(0);
expect(result.rewrittenAssistantMessages).toBe(1);
expect(result.backupPath).toBeTruthy();
// Warn message must omit the "dropped 0 malformed line(s)" noise when
// nothing was dropped; only the rewrite count is reported.
expect(warn).toHaveBeenCalledTimes(1);
const warnMessage = warn.mock.calls[0]?.[0] as string;
expect(warnMessage).toContain("rewrote 1 assistant message(s)");
@@ -136,16 +142,16 @@ describe("repairSessionFileIfNeeded", () => {
const repaired = await fs.readFile(file, "utf-8");
const repairedLines = repaired.trim().split("\n");
expect(repairedLines).toHaveLength(2);
expect(repairedLines).toHaveLength(4);
const repairedEntry: { message: { content: { type: string; text: string }[] } } = JSON.parse(
repairedLines[1],
repairedLines[2],
);
expect(repairedEntry.message.content).toEqual([
{ type: "text", text: "[assistant turn failed before producing content]" },
]);
});
it("drops persisted blank user text messages", async () => {
it("rewrites blank-only user text messages to synthetic placeholder instead of dropping", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const blankUserEntry = {
@@ -165,13 +171,46 @@ describe("repairSessionFileIfNeeded", () => {
const result = await repairSessionFileIfNeeded({ sessionFile: file, warn });
expect(result.repaired).toBe(true);
expect(result.droppedBlankUserMessages).toBe(1);
expect(warn.mock.calls[0]?.[0]).toContain("dropped 1 blank user message(s)");
expect(result.rewrittenUserMessages).toBe(1);
expect(result.droppedBlankUserMessages).toBe(0);
expect(warn.mock.calls[0]?.[0]).toContain("rewrote 1 user message(s)");
const repaired = await fs.readFile(file, "utf-8");
const repairedLines = repaired.trim().split("\n");
expect(repairedLines).toHaveLength(2);
expect(JSON.parse(repairedLines[1])?.id).toBe("msg-1");
expect(repairedLines).toHaveLength(3);
const rewrittenEntry = JSON.parse(repairedLines[1]);
expect(rewrittenEntry.id).toBe("msg-blank");
expect(rewrittenEntry.message.content).toEqual([
{ type: "text", text: BLANK_USER_FALLBACK_TEXT },
]);
});
it("rewrites blank string-content user messages to placeholder", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const blankStringUserEntry = {
type: "message",
id: "msg-blank-str",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "user",
content: " ",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(blankStringUserEntry)}\n${JSON.stringify(message)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(true);
expect(result.rewrittenUserMessages).toBe(1);
const repaired = await fs.readFile(file, "utf-8");
const repairedLines = repaired.trim().split("\n");
expect(repairedLines).toHaveLength(3);
const rewrittenEntry = JSON.parse(repairedLines[1]);
expect(rewrittenEntry.message.content).toBe(BLANK_USER_FALLBACK_TEXT);
});
it("removes blank user text blocks while preserving media blocks", async () => {
@@ -237,12 +276,6 @@ describe("repairSessionFileIfNeeded", () => {
});
it("does not rewrite silent-reply turns (stopReason=stop, content=[]) on disk", async () => {
// Mirror of the in-memory replay-history test: a clean stop with no
// content is a legitimate silent reply (NO_REPLY token path). Repair
// must NOT permanently mutate it into a synthetic "[assistant turn
// failed before producing content]" entry — that would corrupt the
// historical transcript and replay fabricated failure text on every
// future provider request.
const { file } = await createTempSessionPath();
const { header } = buildSessionHeaderAndMessage();
const silentReplyEntry = {
@@ -260,7 +293,15 @@ describe("repairSessionFileIfNeeded", () => {
stopReason: "stop",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(silentReplyEntry)}\n`;
// Follow-up so the session doesn't end on assistant (trailing-trim is tested separately).
const followUp = {
type: "message",
id: "msg-3",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "follow up" },
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(silentReplyEntry)}\n${JSON.stringify(followUp)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
@@ -271,6 +312,198 @@ describe("repairSessionFileIfNeeded", () => {
expect(after).toBe(original);
});
it("trims trailing assistant messages from the session file", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const assistantEntry = {
type: "message",
id: "msg-asst",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "stale answer" }],
stopReason: "stop",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry)}\n`;
await fs.writeFile(file, original, "utf-8");
const warn = vi.fn();
const result = await repairSessionFileIfNeeded({ sessionFile: file, warn });
expect(result.repaired).toBe(true);
expect(result.trimmedTrailingAssistantMessages).toBe(1);
expect(warn.mock.calls[0]?.[0]).toContain("trimmed 1 trailing assistant message(s)");
const repaired = await fs.readFile(file, "utf-8");
const repairedLines = repaired.trim().split("\n");
expect(repairedLines).toHaveLength(2);
});
it("trims multiple consecutive trailing assistant messages", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const assistantEntry1 = {
type: "message",
id: "msg-asst-1",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "first" }],
stopReason: "stop",
},
};
const assistantEntry2 = {
type: "message",
id: "msg-asst-2",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "second" }],
stopReason: "stop",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry1)}\n${JSON.stringify(assistantEntry2)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(true);
expect(result.trimmedTrailingAssistantMessages).toBe(2);
const repaired = await fs.readFile(file, "utf-8");
const repairedLines = repaired.trim().split("\n");
expect(repairedLines).toHaveLength(2);
});
it("does not trim non-trailing assistant messages", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const assistantEntry = {
type: "message",
id: "msg-asst",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "answer" }],
stopReason: "stop",
},
};
const userFollowUp = {
type: "message",
id: "msg-user-2",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "follow up" },
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry)}\n${JSON.stringify(userFollowUp)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
expect(result.trimmedTrailingAssistantMessages ?? 0).toBe(0);
});
it("preserves trailing assistant messages that contain tool calls", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const toolCallAssistant = {
type: "message",
id: "msg-asst-tc",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [
{ type: "text", text: "Let me check that." },
{ type: "toolCall", id: "call_1", name: "read", input: { path: "/tmp/test" } },
],
stopReason: "toolUse",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
expect(result.trimmedTrailingAssistantMessages ?? 0).toBe(0);
const after = await fs.readFile(file, "utf-8");
expect(after).toBe(original);
});
it("trims non-tool-call assistant but stops at tool-call assistant", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const toolCallAssistant = {
type: "message",
id: "msg-asst-tc",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "toolUse", id: "call_1", name: "read" }],
stopReason: "toolUse",
},
};
const plainAssistant = {
type: "message",
id: "msg-asst-plain",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "stale" }],
stopReason: "stop",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n${JSON.stringify(plainAssistant)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(true);
expect(result.trimmedTrailingAssistantMessages).toBe(1);
const repaired = await fs.readFile(file, "utf-8");
const repairedLines = repaired.trim().split("\n");
expect(repairedLines).toHaveLength(3);
expect(JSON.parse(repairedLines[2]).id).toBe("msg-asst-tc");
});
it("never trims below the session header", async () => {
const { file } = await createTempSessionPath();
const { header } = buildSessionHeaderAndMessage();
const assistantEntry = {
type: "message",
id: "msg-asst",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "orphan" }],
stopReason: "stop",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(assistantEntry)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(true);
expect(result.trimmedTrailingAssistantMessages).toBe(1);
const repaired = await fs.readFile(file, "utf-8");
const repairedLines = repaired.trim().split("\n");
expect(repairedLines).toHaveLength(1);
expect(JSON.parse(repairedLines[0]).type).toBe("session");
});
it("is a no-op on a session that was already repaired", async () => {
const { file } = await createTempSessionPath();
const { header } = buildSessionHeaderAndMessage();
@@ -289,7 +522,15 @@ describe("repairSessionFileIfNeeded", () => {
stopReason: "error",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(healedEntry)}\n`;
// Follow-up so the session doesn't end on assistant (trailing-trim is tested separately).
const followUp = {
type: "message",
id: "msg-3",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "follow up" },
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(healedEntry)}\n${JSON.stringify(followUp)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });

View File

@@ -2,24 +2,24 @@ import fs from "node:fs/promises";
import path from "node:path";
import { STREAM_ERROR_FALLBACK_TEXT } from "./stream-message-shared.js";
/** Placeholder for blank user messages — preserves the user turn so strict
* providers that require at least one user message don't reject the transcript. */
export const BLANK_USER_FALLBACK_TEXT = "(continue)";
type RepairReport = {
repaired: boolean;
droppedLines: number;
rewrittenAssistantMessages?: number;
droppedBlankUserMessages?: number;
rewrittenUserMessages?: number;
trimmedTrailingAssistantMessages?: number;
backupPath?: string;
reason?: string;
};
// Persisted assistant entries with `content: []` (written by older builds when
// a stream/provider error fired before any block was produced) are valid JSON
// but not valid for AWS Bedrock Converse replay; rewriting them on disk lets a
// poisoned session recover across gateway restarts instead of needing a fresh
// session. The sentinel text is shared with stream-message-shared.ts and
// replay-history.ts so a session repaired offline reads byte-identically to a
// live stream-error turn — that byte-identity is what makes the repair pass
// idempotent (a healed entry is then indistinguishable from a fresh one).
// The sentinel text is shared with stream-message-shared.ts and
// replay-history.ts so a repaired entry is byte-identical to a live
// stream-error turn, keeping the repair pass idempotent.
type SessionMessageEntry = {
type: "message";
@@ -53,11 +53,8 @@ function isAssistantEntryWithEmptyContent(entry: unknown): entry is SessionMessa
if (!Array.isArray(message.content) || message.content.length !== 0) {
return false;
}
// Only error turns are eligible for on-disk rewrite. A clean stop with
// empty content (silent-reply / NO_REPLY path documented in
// run.empty-error-retry.test.ts) is a valid historical assistant turn —
// mutating it into a synthetic failure message would permanently corrupt
// the transcript and replay fabricated failure text on future requests.
// Only error stops — clean stops with empty content (NO_REPLY path) are
// valid silent replies that must not be overwritten with synthetic text.
return message.stopReason === "error";
}
@@ -79,7 +76,19 @@ type UserEntryRepair =
function repairUserEntryWithBlankTextContent(entry: SessionMessageEntry): UserEntryRepair {
const content = entry.message.content;
if (typeof content === "string") {
return content.trim() ? { kind: "keep" } : { kind: "drop" };
if (content.trim()) {
return { kind: "keep" };
}
return {
kind: "rewrite",
entry: {
...entry,
message: {
...entry.message,
content: BLANK_USER_FALLBACK_TEXT,
},
},
};
}
if (!Array.isArray(content)) {
return { kind: "keep" };
@@ -101,7 +110,16 @@ function repairUserEntryWithBlankTextContent(entry: SessionMessageEntry): UserEn
return false;
});
if (nextContent.length === 0) {
return { kind: "drop" };
return {
kind: "rewrite",
entry: {
...entry,
message: {
...entry.message,
content: [{ type: "text", text: BLANK_USER_FALLBACK_TEXT }],
},
},
};
}
if (!touched) {
return { kind: "keep" };
@@ -118,11 +136,42 @@ function repairUserEntryWithBlankTextContent(entry: SessionMessageEntry): UserEn
};
}
function isToolCallBlock(block: unknown): boolean {
if (!block || typeof block !== "object") {
return false;
}
const type = (block as { type?: unknown }).type;
return type === "toolCall" || type === "toolUse" || type === "functionCall";
}
/** Trailing assistant without tool calls — safe to trim from disk.
* Assistant turns with tool calls are kept so transcript repair can
* synthesize missing tool results (mirrors the outbound guard). */
function isTrimmableTrailingAssistantEntry(entry: unknown): boolean {
if (!entry || typeof entry !== "object") {
return false;
}
const record = entry as { type?: unknown; message?: unknown };
if (record.type !== "message" || !record.message || typeof record.message !== "object") {
return false;
}
const message = record.message as { role?: unknown; content?: unknown };
if (message.role !== "assistant") {
return false;
}
const content = message.content;
if (Array.isArray(content) && content.some(isToolCallBlock)) {
return false;
}
return true;
}
function buildRepairSummaryParts(params: {
droppedLines: number;
rewrittenAssistantMessages: number;
droppedBlankUserMessages: number;
rewrittenUserMessages: number;
trimmedTrailingAssistantMessages: number;
}): string {
const parts: string[] = [];
if (params.droppedLines > 0) {
@@ -137,8 +186,9 @@ function buildRepairSummaryParts(params: {
if (params.rewrittenUserMessages > 0) {
parts.push(`rewrote ${params.rewrittenUserMessages} user message(s)`);
}
// Caller only invokes this once at least one counter is non-zero, so the
// empty-array branch is unreachable in production. Kept for defensive output.
if (params.trimmedTrailingAssistantMessages > 0) {
parts.push(`trimmed ${params.trimmedTrailingAssistantMessages} trailing assistant message(s)`);
}
return parts.length > 0 ? parts.join(", ") : "no changes";
}
@@ -217,11 +267,21 @@ export async function repairSessionFileIfNeeded(params: {
return { repaired: false, droppedLines, reason: "invalid session header" };
}
// Sessions ending on role=assistant cause Anthropic prefill 400s when
// thinking is enabled. The outbound path strips per-request, but leaving
// the file corrupted causes repeated reject cycles across restarts.
let trimmedTrailingAssistantMessages = 0;
while (entries.length > 1 && isTrimmableTrailingAssistantEntry(entries[entries.length - 1])) {
entries.pop();
trimmedTrailingAssistantMessages += 1;
}
if (
droppedLines === 0 &&
rewrittenAssistantMessages === 0 &&
droppedBlankUserMessages === 0 &&
rewrittenUserMessages === 0
rewrittenUserMessages === 0 &&
trimmedTrailingAssistantMessages === 0
) {
return { repaired: false, droppedLines: 0 };
}
@@ -256,6 +316,7 @@ export async function repairSessionFileIfNeeded(params: {
rewrittenAssistantMessages,
droppedBlankUserMessages,
rewrittenUserMessages,
trimmedTrailingAssistantMessages,
reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`,
};
}
@@ -266,6 +327,7 @@ export async function repairSessionFileIfNeeded(params: {
rewrittenAssistantMessages,
droppedBlankUserMessages,
rewrittenUserMessages,
trimmedTrailingAssistantMessages,
})} (${path.basename(sessionFile)})`,
);
return {
@@ -274,6 +336,7 @@ export async function repairSessionFileIfNeeded(params: {
rewrittenAssistantMessages,
droppedBlankUserMessages,
rewrittenUserMessages,
trimmedTrailingAssistantMessages,
backupPath,
};
}