From 2bb00f6726d4245c673c40bcbfe4416879e3512f Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Thu, 21 May 2026 19:55:29 +0800 Subject: [PATCH] fix(agents): fence embedded session writes --- CHANGELOG.md | 1 + .../run/attempt.session-lock.test.ts | 116 ++++++++++++++++++ .../run/attempt.session-lock.ts | 94 +++++++++++++- src/agents/pi-embedded-runner/run/attempt.ts | 35 ++++-- .../session-tool-result-guard-wrapper.ts | 2 + src/agents/session-tool-result-guard.ts | 2 + src/config/sessions/transcript-append.ts | 9 +- .../sessions/transcript-write-context.ts | 7 ++ src/config/sessions/transcript.test.ts | 35 +++++- 9 files changed, 281 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 28d4e2abf4d..bd12c15c9a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai - Agents: cap heartbeat model bleed context hints by the stored session window when runtime model metadata is unavailable, so overflow recovery advice does not suggest a larger window than the active session actually has. - Control UI/Web Push: use `https://openclaw.ai` as the generated default VAPID subject instead of the old localhost mailbox so iOS PWA push setup uses an Apple-acceptable subject when `OPENCLAW_VAPID_SUBJECT` is unset. Fixes #83134. (#83317) Thanks @IWhatsskill. +- Agents/Pi: keep embedded session transcript writes from tripping false takeover detection after packaged npm onboarding agent turns. - Memory/search: stop recall tracking from writing dreaming side-effect artifacts when `dreaming.enabled=false`, while preserving normal search results. Fixes #84436. (#84444) Thanks @NianJiuZst. - Diffs: render viewer toolbar icons from a closed icon-name map instead of HTML strings, removing the toolbar icon XSS sink. (#83955) Thanks @tanshanshan. - QA: keep `pnpm qa:e2e` self-check runs inside the private QA runtime envelope even when inherited shell env disables bundled plugins. diff --git a/src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts b/src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts index 5f1b79a97e5..10cd8f37ac2 100644 --- a/src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts @@ -183,6 +183,62 @@ describe("embedded attempt session lock lifecycle", () => { expect(release).toHaveBeenCalledTimes(2); }); + it("refreshes the prompt fence after an owned write throws", async () => { + const sessionFile = await createTempSessionFile(); + const release = vi.fn(async () => {}); + const acquireSessionWriteLock = vi.fn(async () => ({ release })); + const controller = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions: { ...lockOptions, sessionFile }, + }); + + await controller.releaseForPrompt(); + await expect( + controller.withSessionWriteLock(async () => { + await fs.appendFile(sessionFile, '{"type":"message","id":"owned-before-error"}\n', "utf8"); + throw new Error("downstream event handler failed"); + }), + ).rejects.toThrow("downstream event handler failed"); + await expect(controller.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize"); + + expect(controller.hasSessionTakeover()).toBe(false); + expect(acquireSessionWriteLock).toHaveBeenCalledTimes(3); + expect(release).toHaveBeenCalledTimes(3); + }); + + it("does not reuse a released lock from inherited async context", async () => { + const sessionFile = await createTempSessionFile(); + let resumeDetached!: () => void; + const detachedGate = new Promise((resolve) => { + resumeDetached = resolve; + }); + const release = vi.fn(async () => {}); + const acquireSessionWriteLock = vi.fn(async () => ({ release })); + const controller = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions: { ...lockOptions, sessionFile }, + }); + + await controller.releaseForPrompt(); + let detachedWrite!: Promise; + await controller.withSessionWriteLock(async () => { + detachedWrite = (async () => { + await detachedGate; + await controller.withSessionWriteLock(async () => { + await fs.appendFile(sessionFile, '{"type":"message","id":"detached-owned"}\n', "utf8"); + }); + })(); + }); + + resumeDetached(); + await detachedWrite; + await expect(controller.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize"); + + expect(controller.hasSessionTakeover()).toBe(false); + expect(acquireSessionWriteLock).toHaveBeenCalledTimes(4); + expect(release).toHaveBeenCalledTimes(4); + }); + it("refreshes the prompt fence after an owned transcript mirror append", async () => { const sessionFile = await createTempSessionFile(); const release = vi.fn(async () => {}); @@ -214,6 +270,23 @@ describe("embedded attempt session lock lifecycle", () => { expect(release).toHaveBeenCalledTimes(3); }); + it("refreshes the prompt fence after an owned session manager append", async () => { + const sessionFile = await createTempSessionFile(); + const release = vi.fn(async () => {}); + const acquireSessionWriteLock = vi.fn(async () => ({ release })); + const controller = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions: { ...lockOptions, sessionFile }, + }); + + await controller.releaseForPrompt(); + await fs.appendFile(sessionFile, '{"type":"message","id":"owned-session-manager"}\n', "utf8"); + controller.refreshAfterOwnedSessionWrite(); + + await expect(controller.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize"); + expect(controller.hasSessionTakeover()).toBe(false); + }); + it("returns a no-op cleanup lock after prompt lock reacquisition times out", async () => { const releases: string[] = []; const acquireSessionWriteLock = vi @@ -379,6 +452,49 @@ describe("embedded attempt session lock lifecycle", () => { expect(releases).toEqual(["released", "released", "released"]); }); + it("makes the Pi event listener await locked session event processing", async () => { + const events: string[] = []; + const session = { + _agentEventQueue: Promise.resolve(), + _disconnectFromAgent: vi.fn(() => events.push("disconnect")), + _reconnectToAgent: vi.fn(() => events.push("reconnect")), + _processAgentEvent: vi.fn(async (event: { type?: string }) => { + events.push(`process:${event.type}`); + }), + _handleAgentEvent(event: { type?: string }) { + events.push(`handle:${event.type}`); + session["_agentEventQueue"] = session["_agentEventQueue"].then(() => + session["_processAgentEvent"](event), + ); + session["_agentEventQueue"].catch(() => {}); + }, + }; + + installSessionEventWriteLock({ + session, + withSessionWriteLock: async (run) => { + events.push("lock"); + return await run(); + }, + }); + + const handleAgentEvent = session["_handleAgentEvent"]; + const result = handleAgentEvent({ type: "message_end" }) as unknown as Promise; + + expect(result).toHaveProperty("then"); + expect(events).toEqual(["disconnect", "reconnect", "handle:message_end"]); + + await result; + + expect(events).toEqual([ + "disconnect", + "reconnect", + "handle:message_end", + "lock", + "process:message_end", + ]); + }); + it("locks Pi extension hooks that can mutate the session outside agent events", async () => { const locked: string[] = []; const called: string[] = []; diff --git a/src/agents/pi-embedded-runner/run/attempt.session-lock.ts b/src/agents/pi-embedded-runner/run/attempt.session-lock.ts index 73f7684ebbc..e427bd560e4 100644 --- a/src/agents/pi-embedded-runner/run/attempt.session-lock.ts +++ b/src/agents/pi-embedded-runner/run/attempt.session-lock.ts @@ -1,10 +1,14 @@ import { AsyncLocalStorage } from "node:async_hooks"; +import { statSync } from "node:fs"; import fs from "node:fs/promises"; import { isSessionWriteLockTimeoutError } from "../../session-write-lock-error.js"; import type { acquireSessionWriteLock } from "../../session-write-lock.js"; type SessionLock = Awaited>; type AcquireSessionWriteLock = typeof acquireSessionWriteLock; +type ActiveWriteLockState = { + active: boolean; +}; type LockOptions = { sessionFile: string; @@ -25,6 +29,16 @@ type SessionEventQueueOwner = { _agentEventQueue?: PromiseLike; }; +type SessionEventQueueBridge = SessionEventQueueOwner & { + _handleAgentEvent?: AwaitableSessionEventHandler; + _disconnectFromAgent?: () => void; + _reconnectToAgent?: () => void; +}; + +type AwaitableSessionEventHandler = ((event: unknown, signal?: unknown) => unknown) & { + __openclawSessionEventQueueAwaitInstalled?: boolean; +}; + type SessionWithAgentPrompt = { agent?: { streamFn?: PromptReleaseStreamFn; @@ -147,6 +161,25 @@ async function readSessionFileFingerprint(sessionFile: string): Promise { const owner = session as SessionEventQueueOwner; for (let attempts = 0; attempts < 5; attempts += 1) { @@ -165,6 +198,41 @@ async function waitForSessionEventQueue(session: unknown): Promise { } } +function installAwaitableSessionEventQueue(session: unknown): void { + const owner = session as SessionEventQueueBridge; + const original = owner["_handleAgentEvent"]; + if ( + typeof original !== "function" || + original["__openclawSessionEventQueueAwaitInstalled"] === true + ) { + return; + } + + const canReconnect = + typeof owner["_disconnectFromAgent"] === "function" && + typeof owner["_reconnectToAgent"] === "function"; + if (canReconnect) { + owner["_disconnectFromAgent"]?.(); + } + + const wrapped: AwaitableSessionEventHandler = function awaitableSessionEventQueue( + ...args: [event: unknown, signal?: unknown] + ) { + const result = original(...args); + const queue = owner["_agentEventQueue"]; + if (queue && typeof queue.then === "function") { + return Promise.resolve(queue); + } + return result; + }; + wrapped["__openclawSessionEventQueueAwaitInstalled"] = true; + owner["_handleAgentEvent"] = wrapped; + + if (canReconnect) { + owner["_reconnectToAgent"]?.(); + } +} + export class EmbeddedAttemptSessionTakeoverError extends Error { constructor(sessionFile: string) { super(`session file changed while embedded prompt lock was released: ${sessionFile}`); @@ -176,6 +244,7 @@ export function installSessionEventWriteLock(params: { session: unknown; withSessionWriteLock: (run: () => Promise | T) => Promise; }): void { + installAwaitableSessionEventQueue(params.session); const session = params.session as SessionEventProcessor; const original = session["_processAgentEvent"]; if ( @@ -243,6 +312,7 @@ export function installSessionExternalHookWriteLock(params: { export type EmbeddedAttemptSessionLockController = { releaseForPrompt(): Promise; + refreshAfterOwnedSessionWrite(): void; waitForSessionEvents(session: unknown): Promise; withSessionWriteLock(run: () => Promise | T): Promise; acquireForCleanup(params?: { session?: unknown }): Promise; @@ -262,7 +332,7 @@ export async function createEmbeddedAttemptSessionLockController(params: { }); let heldLock: SessionLock | undefined = await acquireLock(); - const activeWriteLock = new AsyncLocalStorage(); + const activeWriteLock = new AsyncLocalStorage(); let fenceFingerprint: SessionFileFingerprint | undefined; let fenceActive = false; let takeoverDetected = false; @@ -311,24 +381,36 @@ export async function createEmbeddedAttemptSessionLockController(params: { fenceActive = true; await lock.release(); }, + refreshAfterOwnedSessionWrite(): void { + if (fenceActive && !takeoverDetected) { + fenceFingerprint = readSessionFileFingerprintSync(params.lockOptions.sessionFile); + } + }, waitForSessionEvents: waitForSessionEventQueue, async withSessionWriteLock(run: () => Promise | T): Promise { if (takeoverDetected) { throw new EmbeddedAttemptSessionTakeoverError(params.lockOptions.sessionFile); } - if (activeWriteLock.getStore()) { + if (activeWriteLock.getStore()?.active === true) { return await run(); } const { lock, owned } = await acquireWriteLock(); try { await assertSessionFileFence(); const runWithLock = async () => { - const result = await run(); - await refreshSessionFileFence(); - return result; + try { + return await run(); + } finally { + await refreshSessionFileFence(); + } }; if (owned) { - return await activeWriteLock.run(lock, runWithLock); + const activeLockState: ActiveWriteLockState = { active: true }; + try { + return await activeWriteLock.run(activeLockState, runWithLock); + } finally { + activeLockState.active = false; + } } return await runWithLock(); } finally { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index c853dd18ca1..2352b20ea38 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -15,7 +15,10 @@ import { runQuotaSuspensionMaintenance, updateSessionStoreEntry, } from "../../../config/sessions/store.js"; -import { withOwnedSessionTranscriptWrites } from "../../../config/sessions/transcript-write-context.js"; +import { + bindOwnedSessionTranscriptWrites, + withOwnedSessionTranscriptWrites, +} from "../../../config/sessions/transcript-write-context.js"; import { resolveContextEngineOwnerPluginId } from "../../../context-engine/registry.js"; import type { AssembleResult } from "../../../context-engine/types.js"; import { emitTrustedDiagnosticEvent } from "../../../infra/diagnostic-events.js"; @@ -2116,6 +2119,9 @@ export async function runEmbeddedAttempt( suppressTranscriptOnlyAssistantPersistence: params.suppressTranscriptOnlyAssistantPersistence, suppressAssistantErrorPersistence: params.suppressAssistantErrorPersistence, + onMessagePersisted: () => { + sessionLockController.refreshAfterOwnedSessionWrite(); + }, onUserMessagePersisted: (message) => { params.onUserMessagePersisted?.(message); }, @@ -3173,19 +3179,26 @@ export async function runEmbeddedAttempt( }; const abortable = (promise: Promise): Promise => abortableWithSignal(runAbortController.signal, promise); + const ownedTranscriptWriteContext = { + sessionFile: params.sessionFile, + sessionKey: params.sessionKey, + withSessionWriteLock: (operation: () => Promise | T) => + sessionLockController.withSessionWriteLock(operation), + }; const promptActiveSession = ( prompt: string, options?: Parameters[1], ): Promise => withOwnedSessionTranscriptWrites( - { - sessionFile: params.sessionFile, - sessionKey: params.sessionKey, - withSessionWriteLock: (operation) => - sessionLockController.withSessionWriteLock(operation), - }, + ownedTranscriptWriteContext, async () => abortable(trackPromptSettlePromise(activeSession.prompt(prompt, options))), ); + const onBlockReply = params.onBlockReply + ? bindOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, params.onBlockReply) + : undefined; + const onBlockReplyFlush = params.onBlockReplyFlush + ? bindOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, params.onBlockReplyFlush) + : undefined; const subscription = subscribeEmbeddedPiSession( buildEmbeddedSubscriptionParams({ @@ -3203,8 +3216,8 @@ export async function runEmbeddedAttempt( onToolResult: params.onToolResult, onReasoningStream: params.onReasoningStream, onReasoningEnd: params.onReasoningEnd, - onBlockReply: params.onBlockReply, - onBlockReplyFlush: params.onBlockReplyFlush, + onBlockReply, + onBlockReplyFlush, blockReplyBreak: params.blockReplyBreak, blockReplyChunking: params.blockReplyChunking, onPartialReply: params.onPartialReply, @@ -4204,8 +4217,8 @@ export async function runEmbeddedAttempt( // user receives the assistant response immediately. Without this, // coalesced/buffered blocks stay in the pipeline until compaction // finishes — which can take minutes on large contexts (#35074). - if (params.onBlockReplyFlush) { - await params.onBlockReplyFlush(); + if (onBlockReplyFlush) { + await onBlockReplyFlush(); } // Skip compaction wait when yield aborted the run — the signal is diff --git a/src/agents/session-tool-result-guard-wrapper.ts b/src/agents/session-tool-result-guard-wrapper.ts index 38911bfb16f..1da5107cd5c 100644 --- a/src/agents/session-tool-result-guard-wrapper.ts +++ b/src/agents/session-tool-result-guard-wrapper.ts @@ -38,6 +38,7 @@ export function guardSessionManager( onUserMessagePersisted?: ( message: Extract, ) => void | Promise; + onMessagePersisted?: (message: AgentMessage) => void | Promise; onAssistantErrorMessagePersisted?: ( message: Extract, ) => void | Promise; @@ -118,6 +119,7 @@ export function guardSessionManager( suppressNextUserMessagePersistence: opts?.suppressNextUserMessagePersistence, suppressTranscriptOnlyAssistantPersistence: opts?.suppressTranscriptOnlyAssistantPersistence, suppressAssistantErrorPersistence: opts?.suppressAssistantErrorPersistence, + onMessagePersisted: opts?.onMessagePersisted, onUserMessagePersisted: opts?.onUserMessagePersisted, onAssistantErrorMessagePersisted: opts?.onAssistantErrorMessagePersisted, }); diff --git a/src/agents/session-tool-result-guard.ts b/src/agents/session-tool-result-guard.ts index 3b94b4cf497..165b181facf 100644 --- a/src/agents/session-tool-result-guard.ts +++ b/src/agents/session-tool-result-guard.ts @@ -557,6 +557,7 @@ export function installSessionToolResultGuard( onUserMessagePersisted?: ( message: Extract, ) => void | Promise; + onMessagePersisted?: (message: AgentMessage) => void | Promise; onAssistantErrorMessagePersisted?: ( message: Extract, ) => void | Promise; @@ -598,6 +599,7 @@ export function installSessionToolResultGuard( ): { entryId: string; messageSeq?: number; sessionFile?: string | null } => { const parentEntryId = sessionManager.getLeafId(); const entryId = originalAppend(message as never); + void opts?.onMessagePersisted?.(message); const sessionFile = getSessionFile(); if (!sessionFile) { return { entryId, sessionFile }; diff --git a/src/config/sessions/transcript-append.ts b/src/config/sessions/transcript-append.ts index 2ef32514df5..fe38ce71621 100644 --- a/src/config/sessions/transcript-append.ts +++ b/src/config/sessions/transcript-append.ts @@ -10,6 +10,7 @@ import { import { redactTranscriptMessage } from "../../agents/transcript-redact.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { redactSecrets } from "../../logging/redact.js"; +import { runWithOwnedSessionTranscriptWriteLock } from "./transcript-write-context.js"; const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024; const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024; @@ -254,8 +255,12 @@ function isTranscriptAgentMessage(value: unknown): value is AgentMessage { export async function appendSessionTranscriptMessage( params: AppendSessionTranscriptMessageParams, ): Promise<{ messageId: string; message: TMessage }> { - return await withTranscriptAppendQueue(params.transcriptPath, () => - appendSessionTranscriptMessageLocked(params), + return await runWithOwnedSessionTranscriptWriteLock( + { sessionFile: params.transcriptPath }, + async () => + await withTranscriptAppendQueue(params.transcriptPath, () => + appendSessionTranscriptMessageLocked(params), + ), ); } diff --git a/src/config/sessions/transcript-write-context.ts b/src/config/sessions/transcript-write-context.ts index ee89b26e15f..f577417afa3 100644 --- a/src/config/sessions/transcript-write-context.ts +++ b/src/config/sessions/transcript-write-context.ts @@ -37,6 +37,13 @@ export async function withOwnedSessionTranscriptWrites( return await ownedTranscriptWriteContext.run(context, run); } +export function bindOwnedSessionTranscriptWrites( + context: OwnedSessionTranscriptWriteContext, + run: (...args: TArgs) => TResult, +): (...args: TArgs) => TResult { + return (...args) => ownedTranscriptWriteContext.run(context, () => run(...args)); +} + export async function runWithOwnedSessionTranscriptWriteLock( params: { sessionFile?: string; diff --git a/src/config/sessions/transcript.test.ts b/src/config/sessions/transcript.test.ts index 4174f6f564c..3bee08415b1 100644 --- a/src/config/sessions/transcript.test.ts +++ b/src/config/sessions/transcript.test.ts @@ -6,7 +6,10 @@ import type { SessionTranscriptUpdate } from "../../sessions/transcript-events.j import { resolveSessionTranscriptPathInDir } from "./paths.js"; import { useTempSessionsFixture } from "./test-helpers.js"; import { appendSessionTranscriptMessage } from "./transcript-append.js"; -import { withOwnedSessionTranscriptWrites } from "./transcript-write-context.js"; +import { + bindOwnedSessionTranscriptWrites, + withOwnedSessionTranscriptWrites, +} from "./transcript-write-context.js"; import { appendAssistantMessageToSessionTranscript, appendExactAssistantMessageToSessionTranscript, @@ -134,6 +137,36 @@ describe("appendAssistantMessageToSessionTranscript", () => { ); expect(result.ok).toBe(true); + expect(events).toEqual(["lock", "lock"]); + }); + + it("keeps matching owned transcript appends locked from bound callbacks", async () => { + const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir()); + const events: string[] = []; + const callback = bindOwnedSessionTranscriptWrites( + { + sessionFile, + sessionKey, + withSessionWriteLock: async (run) => { + events.push("lock"); + return await run(); + }, + }, + async () => + await appendSessionTranscriptMessage({ + transcriptPath: sessionFile, + message: { + role: "assistant", + content: "Hello from bound delivery", + timestamp: Date.now(), + stopReason: "stop", + }, + }), + ); + + const result = await callback(); + + expect(result.messageId).toBeTruthy(); expect(events).toEqual(["lock"]); });