diff --git a/extensions/codex/src/app-server/transcript-mirror.test.ts b/extensions/codex/src/app-server/transcript-mirror.test.ts index 502680e475d..b83672ee626 100644 --- a/extensions/codex/src/app-server/transcript-mirror.test.ts +++ b/extensions/codex/src/app-server/transcript-mirror.test.ts @@ -77,7 +77,7 @@ describe("buildCodexUserPromptMessage", () => { MediaTypes: ["image/png"], }, }, - } as Parameters[0]); + } as unknown as Parameters[0]); expect(message).toMatchObject({ role: "user", diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index 79e84ede2a0..428b369585d 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -28,6 +28,7 @@ import { resolveBootstrapWarningSignaturesSeen } from "../bootstrap-budget.js"; import { runCliAgent } from "../cli-runner.js"; import { getCliSessionBinding, setCliSessionBinding } from "../cli-session.js"; import { FailoverError } from "../failover-error.js"; +import { runAgentHarnessBeforeMessageWriteHook } from "../harness/hook-helpers.js"; import { resolveAvailableAgentHarnessPolicy } from "../harness/selection.js"; import { resolveCliRuntimeExecutionProvider } from "../model-runtime-aliases.js"; import { isCliProvider } from "../model-selection.js"; @@ -242,6 +243,7 @@ async function persistTextTurnTranscript( sessionKey: params.sessionKey, cwd: params.sessionCwd, config: params.config, + beforeMessageWrite: runAgentHarnessBeforeMessageWriteHook, ...(userMessage ? { message: userMessage } : { diff --git a/src/agents/pi-embedded-runner.guard.test.ts b/src/agents/pi-embedded-runner.guard.test.ts index c3b83b425ca..b3af06e19fc 100644 --- a/src/agents/pi-embedded-runner.guard.test.ts +++ b/src/agents/pi-embedded-runner.guard.test.ts @@ -101,6 +101,7 @@ describe("guardSessionManager integration", () => { preparedUserTurnMessage: { role: "user", content: "What is in this image?", + timestamp: 123, MediaPath: "/tmp/a.png", MediaPaths: ["/tmp/a.png"], MediaType: "image/png", @@ -138,6 +139,7 @@ describe("guardSessionManager integration", () => { preparedUserTurnMessage: { role: "user", content: "visible prompt", + timestamp: 123, MediaPath: "/tmp/a.png", MediaPaths: ["/tmp/a.png"], MediaType: "image/png", @@ -149,6 +151,7 @@ describe("guardSessionManager integration", () => { appendMessage({ role: "user", content: [{ type: "text", text: "blocked" }], + timestamp: 124, __openclaw: { beforeAgentRunBlocked: { blockedBy: "test", blockedAt: 123 } }, } as AgentMessage); appendMessage({ role: "user", content: "runtime prompt" } as AgentMessage); diff --git a/src/auto-reply/reply/agent-runner-execution.test.ts b/src/auto-reply/reply/agent-runner-execution.test.ts index 3ac18dc9356..cd4e7c3b4cb 100644 --- a/src/auto-reply/reply/agent-runner-execution.test.ts +++ b/src/auto-reply/reply/agent-runner-execution.test.ts @@ -6046,15 +6046,7 @@ describe("runAgentTurnWithFallback", () => { }); const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); - await runAgentTurnWithFallback( - createMinimalRunAgentTurnParams({ - opts: { - onUserMessagePersisted: async () => { - throw new Error("gateway notification failed"); - }, - }, - }), - ); + await runAgentTurnWithFallback(createMinimalRunAgentTurnParams()); expect(state.runEmbeddedPiAgentMock).toHaveBeenCalledTimes(3); expectMockCallArgFields(state.runEmbeddedPiAgentMock, 0, "primary candidate", { diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index efb273097a2..afa7074a53b 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -589,7 +589,8 @@ describe("createFollowupRunner reply-lane admission", () => { expect(runEmbeddedPiAgentMock).toHaveBeenCalledOnce(); const call = requireLastMockCallArg(runEmbeddedPiAgentMock, "run embedded pi agent"); - expect(call.userTurnTranscriptRecorder?.message).toBe(preparedUserTurnMessage); + const recorder = requireRecord(call.userTurnTranscriptRecorder, "embedded user turn recorder"); + expect(recorder.message).toBe(preparedUserTurnMessage); }); it("runs queued followups with the session id returned by admission", async () => { @@ -873,7 +874,8 @@ describe("createFollowupRunner runtime config", () => { config: runtimeConfig, suppressNextUserMessagePersistence: false, }); - expect(call.userTurnTranscriptRecorder?.message).toMatchObject({ + const recorder = requireRecord(call.userTurnTranscriptRecorder, "cli user turn recorder"); + expect(recorder.message).toMatchObject({ role: "user", content: "hello", }); @@ -924,7 +926,8 @@ describe("createFollowupRunner runtime config", () => { expect(runCliAgentMock).toHaveBeenCalledOnce(); const mediaCall = requireLastMockCallArg(runCliAgentMock, "run cli agent"); - expect(mediaCall.userTurnTranscriptRecorder?.message).toBe(preparedUserTurnMessage); + const recorder = requireRecord(mediaCall.userTurnTranscriptRecorder, "cli user turn recorder"); + expect(recorder.message).toBe(preparedUserTurnMessage); }); it("defers queued CLI attempt terminal lifecycle events until fallback settles", async () => { @@ -2948,11 +2951,6 @@ describe("createFollowupRunner queued user message idempotency across fallback", }); const runner = createFollowupRunner({ - opts: { - onUserMessagePersisted: async () => { - throw new Error("gateway notification failed"); - }, - }, typing: createMockTypingController(), typingMode: "instant", defaultModel: "anthropic/claude-opus-4-7", diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index 5e0ec7fff15..11152196b88 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -7,6 +7,7 @@ import { import { resolveSessionAuthProfileOverride } from "../../agents/auth-profiles/session-override.js"; import type { ExecToolDefaults } from "../../agents/bash-tools.js"; import { resolveFastModeState } from "../../agents/fast-mode.js"; +import { runAgentHarnessBeforeMessageWriteHook } from "../../agents/harness/hook-helpers.js"; import { resolveAgentHarnessPolicy } from "../../agents/harness/selection.js"; import { listOpenAIAuthProfileProvidersForAgentRuntime } from "../../agents/openai-codex-routing.js"; import { resolveEmbeddedFullAccessState } from "../../agents/pi-embedded-runner/sandbox-info.js"; @@ -1175,6 +1176,7 @@ export async function runPreparedReply( config: cfg, }), errorContext: "reply user turn transcript", + beforeMessageWrite: runAgentHarnessBeforeMessageWriteHook, }) : undefined); const followupRun = { diff --git a/src/config/sessions/transcript-append.ts b/src/config/sessions/transcript-append.ts index b256fd4aa41..8ea2d761c12 100644 --- a/src/config/sessions/transcript-append.ts +++ b/src/config/sessions/transcript-append.ts @@ -309,10 +309,10 @@ async function appendSessionTranscriptMessageLocked( const idempotencyKey = readMessageIdempotencyKey(params.message); const existing = idempotencyKey && params.idempotencyLookup === "scan" - ? await findTranscriptMessageByIdempotencyKey(params.transcriptPath, idempotencyKey) + ? await findTranscriptMessageByIdempotencyKey(params.transcriptPath, idempotencyKey) : undefined; if (existing) { - return { ...existing, appended: false }; + return { ...existing, message: existing.message as TMessage, appended: false }; } const messageId = randomUUID(); @@ -359,10 +359,10 @@ function readMessageIdempotencyKey(message: unknown): string | undefined { return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined; } -async function findTranscriptMessageByIdempotencyKey( +async function findTranscriptMessageByIdempotencyKey( transcriptPath: string, idempotencyKey: string, -): Promise<{ messageId: string; message: TMessage } | undefined> { +): Promise<{ messageId: string; message: unknown } | undefined> { for await (const line of streamSessionTranscriptLinesReverse(transcriptPath)) { try { const parsed = JSON.parse(line) as { @@ -376,7 +376,7 @@ async function findTranscriptMessageByIdempotencyKey( return { messageId: typeof parsed.id === "string" && parsed.id.trim().length > 0 ? parsed.id : idempotencyKey, - message: message as TMessage, + message, }; } catch { continue; diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 3f8c406a8f4..c6a0f423d8a 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -10,6 +10,7 @@ import { resolveSendableOutboundReplyParts, } from "openclaw/plugin-sdk/reply-payload"; import { resolveAgentWorkspaceDir, resolveSessionAgentId } from "../../agents/agent-scope.js"; +import { runAgentHarnessBeforeMessageWriteHook } from "../../agents/harness/hook-helpers.js"; import { rewriteTranscriptEntriesInSessionFile } from "../../agents/pi-embedded-runner/transcript-rewrite.js"; import { resolveProviderIdForAuth } from "../../agents/provider-auth-aliases.js"; import { ensureSandboxWorkspaceForSession } from "../../agents/sandbox/context.js"; @@ -2721,6 +2722,7 @@ export const chatHandlers: GatewayRequestHandlers = { }; }, errorContext: "gateway chat user turn transcript", + beforeMessageWrite: runAgentHarnessBeforeMessageWriteHook, onPersistenceError: (error) => { context.logGateway.warn( `gateway user transcript persistence failed: ${formatForLog(error)}`, @@ -2964,7 +2966,6 @@ export const chatHandlers: GatewayRequestHandlers = { // assistant turn, so it appends a gateway-injected assistant entry before // broadcasting the final UI event. if (!agentRunStarted) { - await persistGatewayUserTurnTranscript(); const btwReplies = deliveredReplies .map((entry) => entry.payload) .filter(isBtwReplyPayload); @@ -2992,6 +2993,7 @@ export const chatHandlers: GatewayRequestHandlers = { sessionKey, }); } else { + await persistGatewayUserTurnTranscript(); const rawFinalPayloads = appendedWebchatAgentMedia ? [] : deliveredReplies diff --git a/src/sessions/user-turn-transcript.ts b/src/sessions/user-turn-transcript.ts index 4d6541a5757..9316086e59d 100644 --- a/src/sessions/user-turn-transcript.ts +++ b/src/sessions/user-turn-transcript.ts @@ -1,14 +1,18 @@ import path from "node:path"; import type { AgentMessage } from "@earendil-works/pi-agent-core"; -import { runAgentHarnessBeforeMessageWriteHook } from "../agents/harness/hook-helpers.js"; import { appendSessionTranscriptMessage } from "../config/sessions/transcript-append.js"; -import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js"; -import type { SessionEntry } from "../config/sessions/types.js"; -import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { logVerbose } from "../globals.js"; import { mimeTypeFromFilePath } from "../media/mime.js"; import { emitSessionTranscriptUpdate } from "./transcript-events.js"; +type TranscriptAppendConfig = Parameters[0]["config"]; + +type UserTurnSessionEntry = { + sessionId: string; + updatedAt: number; + sessionFile?: string; + threadId?: string | number; +} & Record; + type PersistedUserTurnMediaInput = { path?: string | null; url?: string | null; @@ -35,6 +39,12 @@ export type UserTurnInput = { type UserTurnTranscriptUpdateMode = "inline" | "none"; +export type UserTurnBeforeMessageWrite = (params: { + message: PersistedUserTurnMessage; + agentId?: string; + sessionKey?: string; +}) => AgentMessage | null; + type AppendUserTurnTranscriptMessageParams = { transcriptPath: string; input?: UserTurnInput; @@ -43,8 +53,9 @@ type AppendUserTurnTranscriptMessageParams = { agentId?: string; sessionKey?: string; cwd?: string; - config?: OpenClawConfig; + config?: TranscriptAppendConfig; updateMode?: UserTurnTranscriptUpdateMode; + beforeMessageWrite?: UserTurnBeforeMessageWrite; }; type PersistUserTurnTranscriptParams = { @@ -52,14 +63,15 @@ type PersistUserTurnTranscriptParams = { message?: PersistedUserTurnMessage; sessionId: string; sessionKey: string; - sessionEntry: SessionEntry | undefined; - sessionStore?: Record; + sessionEntry: UserTurnSessionEntry | undefined; + sessionStore?: Record; storePath?: string; agentId: string; threadId?: string | number; cwd?: string; - config?: OpenClawConfig; + config?: TranscriptAppendConfig; updateMode?: UserTurnTranscriptUpdateMode; + beforeMessageWrite?: UserTurnBeforeMessageWrite; }; type UserTurnTranscriptPersistenceTarget = Omit< @@ -73,14 +85,14 @@ type UserTurnTranscriptFileTarget = { agentId?: string; sessionKey?: string; cwd?: string; - config?: OpenClawConfig; + config?: TranscriptAppendConfig; }; type UserTurnTranscriptTarget = UserTurnTranscriptPersistenceTarget | UserTurnTranscriptFileTarget; type UserTurnTranscriptPersistResult = { sessionFile: string; - sessionEntry: SessionEntry | undefined; + sessionEntry: UserTurnSessionEntry | undefined; messageId: string; message: PersistedUserTurnMessage; }; @@ -113,6 +125,7 @@ type CreateUserTurnTranscriptRecorderParams = { message?: PersistedUserTurnMessage; target: UserTurnTranscriptTargetResolver; updateMode?: UserTurnTranscriptUpdateMode; + beforeMessageWrite?: UserTurnBeforeMessageWrite; errorContext?: string; onPersistenceError?: (error: unknown) => void; }; @@ -351,12 +364,18 @@ export function mergePreparedUserTurnMessageForRuntime(params: { function applyBeforeMessageWriteToUserTurn( message: PersistedUserTurnMessage, - params: Pick, + params: Pick< + AppendUserTurnTranscriptMessageParams, + "agentId" | "sessionKey" | "beforeMessageWrite" + >, ): PersistedUserTurnMessage | undefined { + if (!params.beforeMessageWrite) { + return message; + } const originalMessage = message as unknown as { idempotencyKey?: unknown }; const idempotencyKey = typeof originalMessage.idempotencyKey === "string" ? originalMessage.idempotencyKey : undefined; - const nextMessage = runAgentHarnessBeforeMessageWriteHook({ + const nextMessage = params.beforeMessageWrite({ message, ...(params.agentId ? { agentId: params.agentId } : {}), ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), @@ -429,6 +448,7 @@ export async function persistUserTurnTranscript( return undefined; } + const { resolveSessionTranscriptFile } = await import("../config/sessions/transcript.js"); const { sessionFile, sessionEntry } = await resolveSessionTranscriptFile({ sessionId: params.sessionId, sessionKey: params.sessionKey, @@ -448,6 +468,7 @@ export async function persistUserTurnTranscript( ...(params.cwd ? { cwd: params.cwd } : {}), ...(params.config ? { config: params.config } : {}), ...(params.updateMode ? { updateMode: params.updateMode } : {}), + ...(params.beforeMessageWrite ? { beforeMessageWrite: params.beforeMessageWrite } : {}), }); if (!appended) { return undefined; @@ -486,9 +507,13 @@ export function createUserTurnTranscriptRecorder( params.onPersistenceError(error); return; } - logVerbose( - `failed to persist ${params.errorContext ?? "user turn transcript"}: ${String(error)}`, - ); + void import("../globals.js") + .then(({ logVerbose }) => { + logVerbose( + `failed to persist ${params.errorContext ?? "user turn transcript"}: ${String(error)}`, + ); + }) + .catch(() => undefined); }; const waitForRuntimePersistence = async () => { @@ -537,6 +562,7 @@ export function createUserTurnTranscriptRecorder( ...target, message, updateMode, + ...(params.beforeMessageWrite ? { beforeMessageWrite: params.beforeMessageWrite } : {}), }).then((appended) => appended ? { @@ -549,6 +575,7 @@ export function createUserTurnTranscriptRecorder( ...target, message, updateMode, + ...(params.beforeMessageWrite ? { beforeMessageWrite: params.beforeMessageWrite } : {}), }); if (result) { persisted = true;