From d35c79edd63baf73b35c466716b7c971c17d116e Mon Sep 17 00:00:00 2001 From: Dash <125997726+dashhuang@users.noreply.github.com> Date: Mon, 4 May 2026 01:55:45 +1200 Subject: [PATCH] fix(agents): suppress duplicate user persistence on fallback retries (#63696) * fix(agents): suppress duplicate user persistence on fallback retries * refactor(agents): align persisted-user callback types * docs: note fallback transcript dedupe * refactor(agents): remove fallback persistence casts --------- Co-authored-by: Altay --- CHANGELOG.md | 1 + .../agent-command.live-model-switch.test.ts | 30 +++++++++++++++++++ src/agents/agent-command.ts | 6 ++++ src/agents/command/attempt-execution.ts | 5 ++++ src/agents/pi-embedded-runner/run.ts | 2 ++ src/agents/pi-embedded-runner/run/attempt.ts | 4 +++ src/agents/pi-embedded-runner/run/params.ts | 3 ++ .../session-tool-result-guard-wrapper.ts | 6 ++++ src/agents/session-tool-result-guard.test.ts | 26 ++++++++++++++++ src/agents/session-tool-result-guard.ts | 18 +++++++++++ 10 files changed, 101 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea9ca1de86a..cd47e8c2401 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -167,6 +167,7 @@ Docs: https://docs.openclaw.ai - Plugins/providers: preserve scoped cold-load fallback for enabled external manifest-contract capability providers missing from the startup registry, so providers such as Fish Audio can resolve on request without requiring `activation.onStartup` for correctness. (#76536) Thanks @Conan-Scott. - Gateway/update: carry `continuationMessage` from `update.run` into successful restart sentinels so session-scoped self-updates can resume one follow-up turn after the Gateway restarts. Refs #71178. (#74362) Thanks @100menotu001, @HeilbronAILabs, and @artnking. +- Agents/fallback: suppress duplicate current-turn user-message transcript writes after embedded fallback retries while still sending the retry prompt to the model. (#63696) Thanks @dashhuang. ## 2026.5.2 diff --git a/src/agents/agent-command.live-model-switch.test.ts b/src/agents/agent-command.live-model-switch.test.ts index 0fe3b9df71b..ac579a7ca9d 100644 --- a/src/agents/agent-command.live-model-switch.test.ts +++ b/src/agents/agent-command.live-model-switch.test.ts @@ -765,6 +765,36 @@ describe("agentCommand – LiveSessionModelSwitchError retry", () => { expect(state.trajectoryFlushMock).toHaveBeenCalled(); }); + it("suppresses duplicate user persistence only after the current turn has flushed", async () => { + type AttemptCall = { + onUserMessagePersisted?: () => void; + suppressPromptPersistenceOnRetry?: boolean; + }; + const attemptCalls: AttemptCall[] = []; + state.runWithModelFallbackMock.mockImplementation(async (params: FallbackRunnerParams) => { + const first = await params.run(params.provider, params.model); + const result = await params.run(params.provider, params.model); + return { + result, + provider: params.provider, + model: params.model, + attempts: [first], + }; + }); + state.runAgentAttemptMock.mockImplementation(async (attemptParams: AttemptCall) => { + attemptCalls.push(attemptParams); + attemptParams.onUserMessagePersisted?.(); + return makeSuccessResult("openai", "gpt-5.4"); + }); + + await runBasicAgentCommand(); + + expect(attemptCalls).toHaveLength(2); + expect(attemptCalls[0]?.suppressPromptPersistenceOnRetry).not.toBe(true); + expect(typeof attemptCalls[0]?.onUserMessagePersisted).toBe("function"); + expect(attemptCalls[1]?.suppressPromptPersistenceOnRetry).toBe(true); + }); + it("propagates non-switch errors without retrying and emits lifecycle error", async () => { state.runWithModelFallbackMock.mockRejectedValueOnce(new Error("provider down")); diff --git a/src/agents/agent-command.ts b/src/agents/agent-command.ts index 24edb0b9dd9..cc2767984ff 100644 --- a/src/agents/agent-command.ts +++ b/src/agents/agent-command.ts @@ -966,6 +966,7 @@ async function agentCommandInternal( }); let fallbackAttemptIndex = 0; + let currentTurnUserMessagePersisted = false; const fallbackResult = await runWithModelFallback({ cfg, provider, @@ -1022,6 +1023,11 @@ async function agentCommandInternal( allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe, sessionHasHistory: !isNewSession || (await attemptExecutionRuntime.sessionFileHasContent(sessionFile)), + suppressPromptPersistenceOnRetry: + isFallbackRetry && currentTurnUserMessagePersisted, + onUserMessagePersisted: () => { + currentTurnUserMessagePersisted = true; + }, onAgentEvent: (evt) => { if (evt.stream.startsWith("codex_app_server.")) { emitAgentEvent({ diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index c9cb72c1872..ebd84a4e0a9 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -1,3 +1,4 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; import { normalizeReplyPayload } from "../../auto-reply/reply/normalize-reply.js"; import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js"; import { appendSessionTranscriptMessage } from "../../config/sessions/transcript-append.js"; @@ -357,6 +358,8 @@ export function runAgentAttempt(params: { allowTransientCooldownProbe?: boolean; modelFallbacksOverride?: string[]; sessionHasHistory?: boolean; + suppressPromptPersistenceOnRetry?: boolean; + onUserMessagePersisted?: (message: Extract) => void; }) { const isRawModelRun = params.opts.modelRun === true || params.opts.promptMode === "none"; const claudeCliFallbackPrelude = @@ -611,6 +614,8 @@ export function runAgentAttempt(params: { promptMode: params.opts.promptMode, disableTools: params.opts.modelRun === true, onAgentEvent: params.onAgentEvent, + suppressNextUserMessagePersistence: params.suppressPromptPersistenceOnRetry === true, + onUserMessagePersisted: params.onUserMessagePersisted, bootstrapPromptWarningSignaturesSeen, bootstrapPromptWarningSignature, }); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 1174f540bda..01c8b97bfb2 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1166,6 +1166,8 @@ export async function runEmbeddedPiAgent( bootstrapPromptWarningSignaturesSeen, bootstrapPromptWarningSignature: bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1], + suppressNextUserMessagePersistence: params.suppressNextUserMessagePersistence, + onUserMessagePersisted: params.onUserMessagePersisted, }); const attempt = normalizeEmbeddedRunAttemptResult(rawAttempt); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index f7a22ab71ed..5d97f16b74b 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -1432,6 +1432,10 @@ export async function runEmbeddedAttempt( ? "aborted" : undefined, allowedToolNames, + suppressNextUserMessagePersistence: params.suppressNextUserMessagePersistence, + onUserMessagePersisted: (message) => { + params.onUserMessagePersisted?.(message); + }, }); trackSessionManagerAccess(params.sessionFile); diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index ccde04bfd8a..9c8ab3409ca 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -1,3 +1,4 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { ImageContent } from "@mariozechner/pi-ai"; import type { SourceReplyDeliveryMode } from "../../../auto-reply/get-reply-options.types.js"; import type { ReplyPayload } from "../../../auto-reply/reply-payload.js"; @@ -178,6 +179,8 @@ export type RunEmbeddedPiAgentParams = { * where transient service pressure is often model-scoped. */ allowTransientCooldownProbe?: boolean; + suppressNextUserMessagePersistence?: boolean; + onUserMessagePersisted?: (message: Extract) => void; /** * Dispose bundled MCP runtimes when the overall run ends instead of preserving * the session-scoped cache. Intended for one-shot local CLI runs that must diff --git a/src/agents/session-tool-result-guard-wrapper.ts b/src/agents/session-tool-result-guard-wrapper.ts index 54c8cde0d85..318f7e7266c 100644 --- a/src/agents/session-tool-result-guard-wrapper.ts +++ b/src/agents/session-tool-result-guard-wrapper.ts @@ -97,6 +97,10 @@ export function guardSessionManager( allowSyntheticToolResults?: boolean; missingToolResultText?: string; allowedToolNames?: Iterable; + suppressNextUserMessagePersistence?: boolean; + onUserMessagePersisted?: ( + message: Extract, + ) => void | Promise; }, ): GuardedSessionManager { if (typeof (sessionManager as GuardedSessionManager).flushPendingToolResults === "function") { @@ -170,6 +174,8 @@ export function guardSessionManager( agentId: opts.agentId, }) : undefined, + suppressNextUserMessagePersistence: opts?.suppressNextUserMessagePersistence, + onUserMessagePersisted: opts?.onUserMessagePersisted, }); (sessionManager as GuardedSessionManager).flushPendingToolResults = guard.flushPendingToolResults; (sessionManager as GuardedSessionManager).clearPendingToolResults = guard.clearPendingToolResults; diff --git a/src/agents/session-tool-result-guard.test.ts b/src/agents/session-tool-result-guard.test.ts index 82c1fc5311a..15666534c6f 100644 --- a/src/agents/session-tool-result-guard.test.ts +++ b/src/agents/session-tool-result-guard.test.ts @@ -498,6 +498,32 @@ describe("installSessionToolResultGuard", () => { }); }); + it("suppresses only the next persisted user message when requested", () => { + const sm = SessionManager.inMemory(); + installSessionToolResultGuard(sm, { + suppressNextUserMessagePersistence: true, + }); + + sm.appendMessage( + asAppendMessage({ + role: "user", + content: "first", + timestamp: Date.now(), + }), + ); + sm.appendMessage( + asAppendMessage({ + role: "user", + content: "second", + timestamp: Date.now() + 1, + }), + ); + + const persisted = getPersistedMessages(sm); + expect(persisted.map((message) => message.role)).toEqual(["user"]); + expect(persisted[0]).toMatchObject({ content: "second" }); + }); + // When an assistant message with toolCalls is aborted, no synthetic toolResult // should be created. Creating synthetic results for aborted/incomplete tool calls // causes API 400 errors: "unexpected tool_use_id found in tool_result blocks". diff --git a/src/agents/session-tool-result-guard.ts b/src/agents/session-tool-result-guard.ts index 1853a4d4584..2fdb52ab8d8 100644 --- a/src/agents/session-tool-result-guard.ts +++ b/src/agents/session-tool-result-guard.ts @@ -44,6 +44,12 @@ function resolveMaxToolResultChars(opts?: { maxToolResultChars?: number }): numb return Math.max(1, opts?.maxToolResultChars ?? DEFAULT_MAX_LIVE_TOOL_RESULT_CHARS); } +type UserAgentMessage = Extract; + +function isUserAgentMessage(message: AgentMessage): message is UserAgentMessage { + return message.role === "user"; +} + // `details` is runtime/UI metadata, not model-visible tool output. Keep the // session JSONL useful for debugging without letting metadata blobs dominate // disk, replay repair, transcript broadcasts, or future tooling that reads raw @@ -302,6 +308,10 @@ export function installSessionToolResultGuard( event: PluginHookBeforeMessageWriteEvent, ) => PluginHookBeforeMessageWriteResult | undefined; maxToolResultChars?: number; + suppressNextUserMessagePersistence?: boolean; + onUserMessagePersisted?: ( + message: Extract, + ) => void | Promise; }, ): { flushPendingToolResults: () => void; @@ -328,6 +338,7 @@ export function installSessionToolResultGuard( const missingToolResultText = opts?.missingToolResultText; const beforeWrite = opts?.beforeMessageWriteHook; const maxToolResultChars = resolveMaxToolResultChars(opts); + let suppressNextUserMessagePersistence = opts?.suppressNextUserMessagePersistence === true; /** * Run the before_message_write hook. Returns the (possibly modified) message, @@ -450,6 +461,10 @@ export function installSessionToolResultGuard( if (!finalMessage) { return undefined; } + if (isUserAgentMessage(finalMessage) && suppressNextUserMessagePersistence) { + suppressNextUserMessagePersistence = false; + return undefined; + } const result = originalAppend(finalMessage as never); const sessionFile = ( @@ -467,6 +482,9 @@ export function installSessionToolResultGuard( if (toolCalls.length > 0) { pendingState.trackToolCalls(toolCalls); } + if (isUserAgentMessage(finalMessage)) { + void opts?.onUserMessagePersisted?.(finalMessage); + } return result; };