fix(agents): suppress duplicate user persistence on fallback retries (#63696)

* fix(agents): suppress duplicate user persistence on fallback retries

* refactor(agents): align persisted-user callback types

* docs: note fallback transcript dedupe

* refactor(agents): remove fallback persistence casts

---------

Co-authored-by: Altay <altay@uinaf.dev>
This commit is contained in:
Dash
2026-05-04 01:55:45 +12:00
committed by GitHub
parent 4488382c1c
commit d35c79edd6
10 changed files with 101 additions and 0 deletions

View File

@@ -167,6 +167,7 @@ Docs: https://docs.openclaw.ai
- Plugins/providers: preserve scoped cold-load fallback for enabled external manifest-contract capability providers missing from the startup registry, so providers such as Fish Audio can resolve on request without requiring `activation.onStartup` for correctness. (#76536) Thanks @Conan-Scott.
- Gateway/update: carry `continuationMessage` from `update.run` into successful restart sentinels so session-scoped self-updates can resume one follow-up turn after the Gateway restarts. Refs #71178. (#74362) Thanks @100menotu001, @HeilbronAILabs, and @artnking.
- Agents/fallback: suppress duplicate current-turn user-message transcript writes after embedded fallback retries while still sending the retry prompt to the model. (#63696) Thanks @dashhuang.
## 2026.5.2

View File

@@ -765,6 +765,36 @@ describe("agentCommand LiveSessionModelSwitchError retry", () => {
expect(state.trajectoryFlushMock).toHaveBeenCalled();
});
it("suppresses duplicate user persistence only after the current turn has flushed", async () => {
type AttemptCall = {
onUserMessagePersisted?: () => void;
suppressPromptPersistenceOnRetry?: boolean;
};
const attemptCalls: AttemptCall[] = [];
state.runWithModelFallbackMock.mockImplementation(async (params: FallbackRunnerParams) => {
const first = await params.run(params.provider, params.model);
const result = await params.run(params.provider, params.model);
return {
result,
provider: params.provider,
model: params.model,
attempts: [first],
};
});
state.runAgentAttemptMock.mockImplementation(async (attemptParams: AttemptCall) => {
attemptCalls.push(attemptParams);
attemptParams.onUserMessagePersisted?.();
return makeSuccessResult("openai", "gpt-5.4");
});
await runBasicAgentCommand();
expect(attemptCalls).toHaveLength(2);
expect(attemptCalls[0]?.suppressPromptPersistenceOnRetry).not.toBe(true);
expect(typeof attemptCalls[0]?.onUserMessagePersisted).toBe("function");
expect(attemptCalls[1]?.suppressPromptPersistenceOnRetry).toBe(true);
});
it("propagates non-switch errors without retrying and emits lifecycle error", async () => {
state.runWithModelFallbackMock.mockRejectedValueOnce(new Error("provider down"));

View File

@@ -966,6 +966,7 @@ async function agentCommandInternal(
});
let fallbackAttemptIndex = 0;
let currentTurnUserMessagePersisted = false;
const fallbackResult = await runWithModelFallback<AgentAttemptResult>({
cfg,
provider,
@@ -1022,6 +1023,11 @@ async function agentCommandInternal(
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
sessionHasHistory:
!isNewSession || (await attemptExecutionRuntime.sessionFileHasContent(sessionFile)),
suppressPromptPersistenceOnRetry:
isFallbackRetry && currentTurnUserMessagePersisted,
onUserMessagePersisted: () => {
currentTurnUserMessagePersisted = true;
},
onAgentEvent: (evt) => {
if (evt.stream.startsWith("codex_app_server.")) {
emitAgentEvent({

View File

@@ -1,3 +1,4 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { normalizeReplyPayload } from "../../auto-reply/reply/normalize-reply.js";
import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js";
import { appendSessionTranscriptMessage } from "../../config/sessions/transcript-append.js";
@@ -357,6 +358,8 @@ export function runAgentAttempt(params: {
allowTransientCooldownProbe?: boolean;
modelFallbacksOverride?: string[];
sessionHasHistory?: boolean;
suppressPromptPersistenceOnRetry?: boolean;
onUserMessagePersisted?: (message: Extract<AgentMessage, { role: "user" }>) => void;
}) {
const isRawModelRun = params.opts.modelRun === true || params.opts.promptMode === "none";
const claudeCliFallbackPrelude =
@@ -611,6 +614,8 @@ export function runAgentAttempt(params: {
promptMode: params.opts.promptMode,
disableTools: params.opts.modelRun === true,
onAgentEvent: params.onAgentEvent,
suppressNextUserMessagePersistence: params.suppressPromptPersistenceOnRetry === true,
onUserMessagePersisted: params.onUserMessagePersisted,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature,
});

View File

@@ -1166,6 +1166,8 @@ export async function runEmbeddedPiAgent(
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature:
bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1],
suppressNextUserMessagePersistence: params.suppressNextUserMessagePersistence,
onUserMessagePersisted: params.onUserMessagePersisted,
});
const attempt = normalizeEmbeddedRunAttemptResult(rawAttempt);

View File

@@ -1432,6 +1432,10 @@ export async function runEmbeddedAttempt(
? "aborted"
: undefined,
allowedToolNames,
suppressNextUserMessagePersistence: params.suppressNextUserMessagePersistence,
onUserMessagePersisted: (message) => {
params.onUserMessagePersisted?.(message);
},
});
trackSessionManagerAccess(params.sessionFile);

View File

@@ -1,3 +1,4 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { ImageContent } from "@mariozechner/pi-ai";
import type { SourceReplyDeliveryMode } from "../../../auto-reply/get-reply-options.types.js";
import type { ReplyPayload } from "../../../auto-reply/reply-payload.js";
@@ -178,6 +179,8 @@ export type RunEmbeddedPiAgentParams = {
* where transient service pressure is often model-scoped.
*/
allowTransientCooldownProbe?: boolean;
suppressNextUserMessagePersistence?: boolean;
onUserMessagePersisted?: (message: Extract<AgentMessage, { role: "user" }>) => void;
/**
* Dispose bundled MCP runtimes when the overall run ends instead of preserving
* the session-scoped cache. Intended for one-shot local CLI runs that must

View File

@@ -97,6 +97,10 @@ export function guardSessionManager(
allowSyntheticToolResults?: boolean;
missingToolResultText?: string;
allowedToolNames?: Iterable<string>;
suppressNextUserMessagePersistence?: boolean;
onUserMessagePersisted?: (
message: Extract<AgentMessage, { role: "user" }>,
) => void | Promise<void>;
},
): GuardedSessionManager {
if (typeof (sessionManager as GuardedSessionManager).flushPendingToolResults === "function") {
@@ -170,6 +174,8 @@ export function guardSessionManager(
agentId: opts.agentId,
})
: undefined,
suppressNextUserMessagePersistence: opts?.suppressNextUserMessagePersistence,
onUserMessagePersisted: opts?.onUserMessagePersisted,
});
(sessionManager as GuardedSessionManager).flushPendingToolResults = guard.flushPendingToolResults;
(sessionManager as GuardedSessionManager).clearPendingToolResults = guard.clearPendingToolResults;

View File

@@ -498,6 +498,32 @@ describe("installSessionToolResultGuard", () => {
});
});
it("suppresses only the next persisted user message when requested", () => {
const sm = SessionManager.inMemory();
installSessionToolResultGuard(sm, {
suppressNextUserMessagePersistence: true,
});
sm.appendMessage(
asAppendMessage({
role: "user",
content: "first",
timestamp: Date.now(),
}),
);
sm.appendMessage(
asAppendMessage({
role: "user",
content: "second",
timestamp: Date.now() + 1,
}),
);
const persisted = getPersistedMessages(sm);
expect(persisted.map((message) => message.role)).toEqual(["user"]);
expect(persisted[0]).toMatchObject({ content: "second" });
});
// When an assistant message with toolCalls is aborted, no synthetic toolResult
// should be created. Creating synthetic results for aborted/incomplete tool calls
// causes API 400 errors: "unexpected tool_use_id found in tool_result blocks".

View File

@@ -44,6 +44,12 @@ function resolveMaxToolResultChars(opts?: { maxToolResultChars?: number }): numb
return Math.max(1, opts?.maxToolResultChars ?? DEFAULT_MAX_LIVE_TOOL_RESULT_CHARS);
}
type UserAgentMessage = Extract<AgentMessage, { role: "user" }>;
function isUserAgentMessage(message: AgentMessage): message is UserAgentMessage {
return message.role === "user";
}
// `details` is runtime/UI metadata, not model-visible tool output. Keep the
// session JSONL useful for debugging without letting metadata blobs dominate
// disk, replay repair, transcript broadcasts, or future tooling that reads raw
@@ -302,6 +308,10 @@ export function installSessionToolResultGuard(
event: PluginHookBeforeMessageWriteEvent,
) => PluginHookBeforeMessageWriteResult | undefined;
maxToolResultChars?: number;
suppressNextUserMessagePersistence?: boolean;
onUserMessagePersisted?: (
message: Extract<AgentMessage, { role: "user" }>,
) => void | Promise<void>;
},
): {
flushPendingToolResults: () => void;
@@ -328,6 +338,7 @@ export function installSessionToolResultGuard(
const missingToolResultText = opts?.missingToolResultText;
const beforeWrite = opts?.beforeMessageWriteHook;
const maxToolResultChars = resolveMaxToolResultChars(opts);
let suppressNextUserMessagePersistence = opts?.suppressNextUserMessagePersistence === true;
/**
* Run the before_message_write hook. Returns the (possibly modified) message,
@@ -450,6 +461,10 @@ export function installSessionToolResultGuard(
if (!finalMessage) {
return undefined;
}
if (isUserAgentMessage(finalMessage) && suppressNextUserMessagePersistence) {
suppressNextUserMessagePersistence = false;
return undefined;
}
const result = originalAppend(finalMessage as never);
const sessionFile = (
@@ -467,6 +482,9 @@ export function installSessionToolResultGuard(
if (toolCalls.length > 0) {
pendingState.trackToolCalls(toolCalls);
}
if (isUserAgentMessage(finalMessage)) {
void opts?.onUserMessagePersisted?.(finalMessage);
}
return result;
};