diff --git a/src/auto-reply/get-reply-options.types.ts b/src/auto-reply/get-reply-options.types.ts index a89b30a7079..ea74f01614f 100644 --- a/src/auto-reply/get-reply-options.types.ts +++ b/src/auto-reply/get-reply-options.types.ts @@ -1,3 +1,4 @@ +import type { AgentMessage } from "@earendil-works/pi-agent-core"; import type { ImageContent } from "@earendil-works/pi-ai"; import type { PromptImageOrderEntry } from "../media/prompt-image-order.js"; import type { ReplyPayload } from "./reply-payload.js"; @@ -56,6 +57,10 @@ export type GetReplyOptions = { imageOrder?: PromptImageOrderEntry[]; /** Notifies when an agent run actually starts (useful for webchat command handling). */ onAgentRunStart?: (runId: string) => void; + /** Notifies when the runtime actually persists the current user turn. */ + onUserMessagePersisted?: ( + message: Extract, + ) => Promise | void; onReplyStart?: () => Promise | void; /** Called when the typing controller cleans up (e.g., run ended with NO_REPLY). */ onTypingCleanup?: () => void; diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 7521374ce74..92a31c02828 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -693,6 +693,12 @@ export function createFollowupRunner(params: { }) ?? provider); let attemptCompactionCount = 0; + const notifyUserMessagePersisted = async ( + message: Parameters>[0], + ) => { + queuedUserMessagePersistedAcrossFallback = true; + await opts?.onUserMessagePersisted?.(message); + }; try { if (isCliProvider(cliExecutionProvider, runtimeConfig)) { const isRoomEventCliRun = queued.currentInboundEventKind === "room_event"; @@ -727,9 +733,7 @@ export function createFollowupRunner(params: { ? { message: effectiveQueued.userMessageForPersistence } : { text: effectiveQueued.transcriptPrompt ?? effectiveQueued.prompt }, suppressNextUserMessagePersistence: suppressQueuedUserPersistenceForCandidate, - onUserMessagePersisted: () => { - queuedUserMessagePersistedAcrossFallback = true; - }, + onUserMessagePersisted: notifyUserMessagePersisted, currentInboundEventKind: queued.currentInboundEventKind, currentInboundContext: queued.currentInboundContext, inputProvenance: run.inputProvenance, @@ -828,9 +832,7 @@ export function createFollowupRunner(params: { sourceReplyDeliveryMode: run.sourceReplyDeliveryMode, forceMessageTool: run.sourceReplyDeliveryMode === "message_tool_only", suppressNextUserMessagePersistence: suppressQueuedUserPersistenceForCandidate, - onUserMessagePersisted: () => { - queuedUserMessagePersistedAcrossFallback = true; - }, + onUserMessagePersisted: notifyUserMessagePersisted, suppressTranscriptOnlyAssistantPersistence: run.suppressTranscriptOnlyAssistantPersistence, suppressAssistantErrorPersistence: suppressAssistantErrorPersistenceForCandidate, diff --git a/src/gateway/server-methods/chat.directive-tags.test.ts b/src/gateway/server-methods/chat.directive-tags.test.ts index ffebaa259f4..bc5e184e0e4 100644 --- a/src/gateway/server-methods/chat.directive-tags.test.ts +++ b/src/gateway/server-methods/chat.directive-tags.test.ts @@ -56,6 +56,7 @@ const mockState = vi.hoisted(() => ({ dispatchError: null as Error | null, dispatchErrorAfterAgentRunStart: null as Error | null, triggerAgentRunStart: false, + triggerUserMessagePersisted: false, onAfterAgentRunStart: null as (() => void) | null, agentRunId: "run-agent-1", sessionEntry: {} as Record, @@ -192,6 +193,7 @@ vi.mock("../../auto-reply/dispatch.js", () => ({ }; replyOptions?: { onAgentRunStart?: (runId: string) => void; + onUserMessagePersisted?: (message: { role: "user"; content: string }) => void; images?: Array<{ mimeType: string; data: string }>; imageOrder?: string[]; }; @@ -208,6 +210,12 @@ vi.mock("../../auto-reply/dispatch.js", () => ({ params.replyOptions?.onAgentRunStart?.(mockState.agentRunId); mockState.onAfterAgentRunStart?.(); } + if (mockState.triggerUserMessagePersisted) { + params.replyOptions?.onUserMessagePersisted?.({ + role: "user", + content: "persisted by runtime", + }); + } if (mockState.dispatchErrorAfterAgentRunStart) { throw mockState.dispatchErrorAfterAgentRunStart; } @@ -700,6 +708,7 @@ describe("chat directive tag stripping for non-streaming final payloads", () => mockState.dispatchErrorAfterAgentRunStart = null; mockState.mainSessionKey = "main"; mockState.triggerAgentRunStart = false; + mockState.triggerUserMessagePersisted = false; mockState.onAfterAgentRunStart = null; mockState.agentRunId = "run-agent-1"; mockState.sessionEntry = {}; @@ -4376,6 +4385,43 @@ describe("chat directive tag stripping for non-streaming final payloads", () => expect(persistedUser?.content).toBe("hello from failed dispatch"); }); }); + + it("emits a user transcript update when chat.send fails after agent start but before runtime persistence", async () => { + createTranscriptFixture("openclaw-chat-send-user-transcript-error-before-runtime-persist-"); + mockState.triggerAgentRunStart = true; + mockState.dispatchErrorAfterAgentRunStart = new Error("cli backend unavailable"); + const respond = vi.fn(); + const context = createChatContext(); + + await runNonStreamingChatSend({ + context, + respond, + idempotencyKey: "idem-user-transcript-error-before-runtime-persist", + message: "hello before cli startup failure", + expectBroadcast: false, + }); + + await waitForAssertion(() => { + expect(context.dedupe.get("chat:idem-user-transcript-error-before-runtime-persist")?.ok).toBe( + false, + ); + const userUpdate = findUserUpdate(); + const message = userUpdateMessage(userUpdate); + expect(userUpdate?.sessionFile.endsWith("sess.jsonl")).toBe(true); + expect(userUpdate?.sessionKey).toBe("main"); + expect(message?.role).toBe("user"); + expect(message?.content).toBe("hello before cli startup failure"); + const persistedUser = readTranscriptJsonLines(mockState.transcriptPath) + .map((entry) => entry.message) + .find( + (candidate): candidate is Record => + typeof candidate === "object" && + candidate !== null && + (candidate as { role?: unknown }).role === "user", + ); + expect(persistedUser?.content).toBe("hello before cli startup failure"); + }); + }); }); describe("chat.send operator UI client sender context", () => { diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 8a13b5965b0..9d1566feff4 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -48,6 +48,7 @@ import { import { createChannelMessageReplyPipeline } from "../../plugin-sdk/channel-message.js"; import type { ChannelRouteRef } from "../../plugin-sdk/channel-route.js"; import { isPluginOwnedSessionBindingRecord } from "../../plugins/conversation-binding.js"; +import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; @@ -2676,6 +2677,9 @@ export const chatHandlers: GatewayRequestHandlers = { let appendedWebchatAgentMedia = false; let userTranscriptUpdatePromise: Promise | null = null; let agentRunStarted = false; + let agentUserMessagePersisted = false; + const beforeAgentRunHooksRegistered = + getGlobalHookRunner()?.hasHooks("before_agent_run") === true; const persistGatewayUserTurnTranscript = async () => { if (userTranscriptUpdatePromise) { await userTranscriptUpdatePromise; @@ -2870,6 +2874,9 @@ export const chatHandlers: GatewayRequestHandlers = { } } }, + onUserMessagePersisted: () => { + agentUserMessagePersisted = true; + }, onModelSelected: (modelSelection) => { updateChatRunProvider(context.chatAbortControllers, { runId: clientRunId, @@ -3446,9 +3453,10 @@ export const chatHandlers: GatewayRequestHandlers = { ); }) .catch(async (err) => { - const emitAfterError = agentRunStarted - ? Promise.resolve() - : persistGatewayUserTurnTranscript(); + const emitAfterError = + agentUserMessagePersisted || beforeAgentRunHooksRegistered + ? Promise.resolve() + : persistGatewayUserTurnTranscript(); await emitAfterError.catch((transcriptErr) => { context.logGateway.warn( `webchat user transcript update failed after error: ${formatForLog(transcriptErr)}`,