fix: keep gateway fallback tied to user persistence

This commit is contained in:
Shakker
2026-05-25 18:15:52 +01:00
committed by Shakker
parent e1ff653ade
commit ce5adbd2c2
4 changed files with 70 additions and 9 deletions

View File

@@ -1,3 +1,4 @@
import type { AgentMessage } from "@earendil-works/pi-agent-core";
import type { ImageContent } from "@earendil-works/pi-ai";
import type { PromptImageOrderEntry } from "../media/prompt-image-order.js";
import type { ReplyPayload } from "./reply-payload.js";
@@ -56,6 +57,10 @@ export type GetReplyOptions = {
imageOrder?: PromptImageOrderEntry[];
/** Notifies when an agent run actually starts (useful for webchat command handling). */
onAgentRunStart?: (runId: string) => void;
/** Notifies when the runtime actually persists the current user turn. */
onUserMessagePersisted?: (
message: Extract<AgentMessage, { role: "user" }>,
) => Promise<void> | void;
onReplyStart?: () => Promise<void> | void;
/** Called when the typing controller cleans up (e.g., run ended with NO_REPLY). */
onTypingCleanup?: () => void;

View File

@@ -693,6 +693,12 @@ export function createFollowupRunner(params: {
}) ??
provider);
let attemptCompactionCount = 0;
const notifyUserMessagePersisted = async (
message: Parameters<NonNullable<GetReplyOptions["onUserMessagePersisted"]>>[0],
) => {
queuedUserMessagePersistedAcrossFallback = true;
await opts?.onUserMessagePersisted?.(message);
};
try {
if (isCliProvider(cliExecutionProvider, runtimeConfig)) {
const isRoomEventCliRun = queued.currentInboundEventKind === "room_event";
@@ -727,9 +733,7 @@ export function createFollowupRunner(params: {
? { message: effectiveQueued.userMessageForPersistence }
: { text: effectiveQueued.transcriptPrompt ?? effectiveQueued.prompt },
suppressNextUserMessagePersistence: suppressQueuedUserPersistenceForCandidate,
onUserMessagePersisted: () => {
queuedUserMessagePersistedAcrossFallback = true;
},
onUserMessagePersisted: notifyUserMessagePersisted,
currentInboundEventKind: queued.currentInboundEventKind,
currentInboundContext: queued.currentInboundContext,
inputProvenance: run.inputProvenance,
@@ -828,9 +832,7 @@ export function createFollowupRunner(params: {
sourceReplyDeliveryMode: run.sourceReplyDeliveryMode,
forceMessageTool: run.sourceReplyDeliveryMode === "message_tool_only",
suppressNextUserMessagePersistence: suppressQueuedUserPersistenceForCandidate,
onUserMessagePersisted: () => {
queuedUserMessagePersistedAcrossFallback = true;
},
onUserMessagePersisted: notifyUserMessagePersisted,
suppressTranscriptOnlyAssistantPersistence:
run.suppressTranscriptOnlyAssistantPersistence,
suppressAssistantErrorPersistence: suppressAssistantErrorPersistenceForCandidate,

View File

@@ -56,6 +56,7 @@ const mockState = vi.hoisted(() => ({
dispatchError: null as Error | null,
dispatchErrorAfterAgentRunStart: null as Error | null,
triggerAgentRunStart: false,
triggerUserMessagePersisted: false,
onAfterAgentRunStart: null as (() => void) | null,
agentRunId: "run-agent-1",
sessionEntry: {} as Record<string, unknown>,
@@ -192,6 +193,7 @@ vi.mock("../../auto-reply/dispatch.js", () => ({
};
replyOptions?: {
onAgentRunStart?: (runId: string) => void;
onUserMessagePersisted?: (message: { role: "user"; content: string }) => void;
images?: Array<{ mimeType: string; data: string }>;
imageOrder?: string[];
};
@@ -208,6 +210,12 @@ vi.mock("../../auto-reply/dispatch.js", () => ({
params.replyOptions?.onAgentRunStart?.(mockState.agentRunId);
mockState.onAfterAgentRunStart?.();
}
if (mockState.triggerUserMessagePersisted) {
params.replyOptions?.onUserMessagePersisted?.({
role: "user",
content: "persisted by runtime",
});
}
if (mockState.dispatchErrorAfterAgentRunStart) {
throw mockState.dispatchErrorAfterAgentRunStart;
}
@@ -700,6 +708,7 @@ describe("chat directive tag stripping for non-streaming final payloads", () =>
mockState.dispatchErrorAfterAgentRunStart = null;
mockState.mainSessionKey = "main";
mockState.triggerAgentRunStart = false;
mockState.triggerUserMessagePersisted = false;
mockState.onAfterAgentRunStart = null;
mockState.agentRunId = "run-agent-1";
mockState.sessionEntry = {};
@@ -4376,6 +4385,43 @@ describe("chat directive tag stripping for non-streaming final payloads", () =>
expect(persistedUser?.content).toBe("hello from failed dispatch");
});
});
it("emits a user transcript update when chat.send fails after agent start but before runtime persistence", async () => {
createTranscriptFixture("openclaw-chat-send-user-transcript-error-before-runtime-persist-");
mockState.triggerAgentRunStart = true;
mockState.dispatchErrorAfterAgentRunStart = new Error("cli backend unavailable");
const respond = vi.fn();
const context = createChatContext();
await runNonStreamingChatSend({
context,
respond,
idempotencyKey: "idem-user-transcript-error-before-runtime-persist",
message: "hello before cli startup failure",
expectBroadcast: false,
});
await waitForAssertion(() => {
expect(context.dedupe.get("chat:idem-user-transcript-error-before-runtime-persist")?.ok).toBe(
false,
);
const userUpdate = findUserUpdate();
const message = userUpdateMessage(userUpdate);
expect(userUpdate?.sessionFile.endsWith("sess.jsonl")).toBe(true);
expect(userUpdate?.sessionKey).toBe("main");
expect(message?.role).toBe("user");
expect(message?.content).toBe("hello before cli startup failure");
const persistedUser = readTranscriptJsonLines(mockState.transcriptPath)
.map((entry) => entry.message)
.find(
(candidate): candidate is Record<string, unknown> =>
typeof candidate === "object" &&
candidate !== null &&
(candidate as { role?: unknown }).role === "user",
);
expect(persistedUser?.content).toBe("hello before cli startup failure");
});
});
});
describe("chat.send operator UI client sender context", () => {

View File

@@ -48,6 +48,7 @@ import {
import { createChannelMessageReplyPipeline } from "../../plugin-sdk/channel-message.js";
import type { ChannelRouteRef } from "../../plugin-sdk/channel-route.js";
import { isPluginOwnedSessionBindingRecord } from "../../plugins/conversation-binding.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
@@ -2676,6 +2677,9 @@ export const chatHandlers: GatewayRequestHandlers = {
let appendedWebchatAgentMedia = false;
let userTranscriptUpdatePromise: Promise<void> | null = null;
let agentRunStarted = false;
let agentUserMessagePersisted = false;
const beforeAgentRunHooksRegistered =
getGlobalHookRunner()?.hasHooks("before_agent_run") === true;
const persistGatewayUserTurnTranscript = async () => {
if (userTranscriptUpdatePromise) {
await userTranscriptUpdatePromise;
@@ -2870,6 +2874,9 @@ export const chatHandlers: GatewayRequestHandlers = {
}
}
},
onUserMessagePersisted: () => {
agentUserMessagePersisted = true;
},
onModelSelected: (modelSelection) => {
updateChatRunProvider(context.chatAbortControllers, {
runId: clientRunId,
@@ -3446,9 +3453,10 @@ export const chatHandlers: GatewayRequestHandlers = {
);
})
.catch(async (err) => {
const emitAfterError = agentRunStarted
? Promise.resolve()
: persistGatewayUserTurnTranscript();
const emitAfterError =
agentUserMessagePersisted || beforeAgentRunHooksRegistered
? Promise.resolve()
: persistGatewayUserTurnTranscript();
await emitAfterError.catch((transcriptErr) => {
context.logGateway.warn(
`webchat user transcript update failed after error: ${formatForLog(transcriptErr)}`,