fix(agents): fence embedded session writes

This commit is contained in:
Vincent Koc
2026-05-21 19:55:29 +08:00
parent 95eac52e92
commit 2bb00f6726
9 changed files with 281 additions and 20 deletions

View File

@@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai
- Agents: cap heartbeat model bleed context hints by the stored session window when runtime model metadata is unavailable, so overflow recovery advice does not suggest a larger window than the active session actually has.
- Control UI/Web Push: use `https://openclaw.ai` as the generated default VAPID subject instead of the old localhost mailbox so iOS PWA push setup uses an Apple-acceptable subject when `OPENCLAW_VAPID_SUBJECT` is unset. Fixes #83134. (#83317) Thanks @IWhatsskill.
- Agents/Pi: keep embedded session transcript writes from tripping false takeover detection after packaged npm onboarding agent turns.
- Memory/search: stop recall tracking from writing dreaming side-effect artifacts when `dreaming.enabled=false`, while preserving normal search results. Fixes #84436. (#84444) Thanks @NianJiuZst.
- Diffs: render viewer toolbar icons from a closed icon-name map instead of HTML strings, removing the toolbar icon XSS sink. (#83955) Thanks @tanshanshan.
- QA: keep `pnpm qa:e2e` self-check runs inside the private QA runtime envelope even when inherited shell env disables bundled plugins.

View File

@@ -183,6 +183,62 @@ describe("embedded attempt session lock lifecycle", () => {
expect(release).toHaveBeenCalledTimes(2);
});
it("refreshes the prompt fence after an owned write throws", async () => {
const sessionFile = await createTempSessionFile();
const release = vi.fn(async () => {});
const acquireSessionWriteLock = vi.fn(async () => ({ release }));
const controller = await createEmbeddedAttemptSessionLockController({
acquireSessionWriteLock,
lockOptions: { ...lockOptions, sessionFile },
});
await controller.releaseForPrompt();
await expect(
controller.withSessionWriteLock(async () => {
await fs.appendFile(sessionFile, '{"type":"message","id":"owned-before-error"}\n', "utf8");
throw new Error("downstream event handler failed");
}),
).rejects.toThrow("downstream event handler failed");
await expect(controller.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize");
expect(controller.hasSessionTakeover()).toBe(false);
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(3);
expect(release).toHaveBeenCalledTimes(3);
});
it("does not reuse a released lock from inherited async context", async () => {
const sessionFile = await createTempSessionFile();
let resumeDetached!: () => void;
const detachedGate = new Promise<void>((resolve) => {
resumeDetached = resolve;
});
const release = vi.fn(async () => {});
const acquireSessionWriteLock = vi.fn(async () => ({ release }));
const controller = await createEmbeddedAttemptSessionLockController({
acquireSessionWriteLock,
lockOptions: { ...lockOptions, sessionFile },
});
await controller.releaseForPrompt();
let detachedWrite!: Promise<void>;
await controller.withSessionWriteLock(async () => {
detachedWrite = (async () => {
await detachedGate;
await controller.withSessionWriteLock(async () => {
await fs.appendFile(sessionFile, '{"type":"message","id":"detached-owned"}\n', "utf8");
});
})();
});
resumeDetached();
await detachedWrite;
await expect(controller.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize");
expect(controller.hasSessionTakeover()).toBe(false);
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(4);
expect(release).toHaveBeenCalledTimes(4);
});
it("refreshes the prompt fence after an owned transcript mirror append", async () => {
const sessionFile = await createTempSessionFile();
const release = vi.fn(async () => {});
@@ -214,6 +270,23 @@ describe("embedded attempt session lock lifecycle", () => {
expect(release).toHaveBeenCalledTimes(3);
});
it("refreshes the prompt fence after an owned session manager append", async () => {
const sessionFile = await createTempSessionFile();
const release = vi.fn(async () => {});
const acquireSessionWriteLock = vi.fn(async () => ({ release }));
const controller = await createEmbeddedAttemptSessionLockController({
acquireSessionWriteLock,
lockOptions: { ...lockOptions, sessionFile },
});
await controller.releaseForPrompt();
await fs.appendFile(sessionFile, '{"type":"message","id":"owned-session-manager"}\n', "utf8");
controller.refreshAfterOwnedSessionWrite();
await expect(controller.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize");
expect(controller.hasSessionTakeover()).toBe(false);
});
it("returns a no-op cleanup lock after prompt lock reacquisition times out", async () => {
const releases: string[] = [];
const acquireSessionWriteLock = vi
@@ -379,6 +452,49 @@ describe("embedded attempt session lock lifecycle", () => {
expect(releases).toEqual(["released", "released", "released"]);
});
it("makes the Pi event listener await locked session event processing", async () => {
const events: string[] = [];
const session = {
_agentEventQueue: Promise.resolve(),
_disconnectFromAgent: vi.fn(() => events.push("disconnect")),
_reconnectToAgent: vi.fn(() => events.push("reconnect")),
_processAgentEvent: vi.fn(async (event: { type?: string }) => {
events.push(`process:${event.type}`);
}),
_handleAgentEvent(event: { type?: string }) {
events.push(`handle:${event.type}`);
session["_agentEventQueue"] = session["_agentEventQueue"].then(() =>
session["_processAgentEvent"](event),
);
session["_agentEventQueue"].catch(() => {});
},
};
installSessionEventWriteLock({
session,
withSessionWriteLock: async (run) => {
events.push("lock");
return await run();
},
});
const handleAgentEvent = session["_handleAgentEvent"];
const result = handleAgentEvent({ type: "message_end" }) as unknown as Promise<unknown>;
expect(result).toHaveProperty("then");
expect(events).toEqual(["disconnect", "reconnect", "handle:message_end"]);
await result;
expect(events).toEqual([
"disconnect",
"reconnect",
"handle:message_end",
"lock",
"process:message_end",
]);
});
it("locks Pi extension hooks that can mutate the session outside agent events", async () => {
const locked: string[] = [];
const called: string[] = [];

View File

@@ -1,10 +1,14 @@
import { AsyncLocalStorage } from "node:async_hooks";
import { statSync } from "node:fs";
import fs from "node:fs/promises";
import { isSessionWriteLockTimeoutError } from "../../session-write-lock-error.js";
import type { acquireSessionWriteLock } from "../../session-write-lock.js";
type SessionLock = Awaited<ReturnType<typeof acquireSessionWriteLock>>;
type AcquireSessionWriteLock = typeof acquireSessionWriteLock;
type ActiveWriteLockState = {
active: boolean;
};
type LockOptions = {
sessionFile: string;
@@ -25,6 +29,16 @@ type SessionEventQueueOwner = {
_agentEventQueue?: PromiseLike<unknown>;
};
type SessionEventQueueBridge = SessionEventQueueOwner & {
_handleAgentEvent?: AwaitableSessionEventHandler;
_disconnectFromAgent?: () => void;
_reconnectToAgent?: () => void;
};
type AwaitableSessionEventHandler = ((event: unknown, signal?: unknown) => unknown) & {
__openclawSessionEventQueueAwaitInstalled?: boolean;
};
type SessionWithAgentPrompt = {
agent?: {
streamFn?: PromptReleaseStreamFn;
@@ -147,6 +161,25 @@ async function readSessionFileFingerprint(sessionFile: string): Promise<SessionF
}
}
function readSessionFileFingerprintSync(sessionFile: string): SessionFileFingerprint {
try {
const stat = statSync(sessionFile, { bigint: true });
return {
exists: true,
dev: stat.dev,
ino: stat.ino,
size: stat.size,
mtimeNs: stat.mtimeNs,
ctimeNs: stat.ctimeNs,
};
} catch (err) {
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
return { exists: false };
}
throw err;
}
}
async function waitForSessionEventQueue(session: unknown): Promise<void> {
const owner = session as SessionEventQueueOwner;
for (let attempts = 0; attempts < 5; attempts += 1) {
@@ -165,6 +198,41 @@ async function waitForSessionEventQueue(session: unknown): Promise<void> {
}
}
function installAwaitableSessionEventQueue(session: unknown): void {
const owner = session as SessionEventQueueBridge;
const original = owner["_handleAgentEvent"];
if (
typeof original !== "function" ||
original["__openclawSessionEventQueueAwaitInstalled"] === true
) {
return;
}
const canReconnect =
typeof owner["_disconnectFromAgent"] === "function" &&
typeof owner["_reconnectToAgent"] === "function";
if (canReconnect) {
owner["_disconnectFromAgent"]?.();
}
const wrapped: AwaitableSessionEventHandler = function awaitableSessionEventQueue(
...args: [event: unknown, signal?: unknown]
) {
const result = original(...args);
const queue = owner["_agentEventQueue"];
if (queue && typeof queue.then === "function") {
return Promise.resolve(queue);
}
return result;
};
wrapped["__openclawSessionEventQueueAwaitInstalled"] = true;
owner["_handleAgentEvent"] = wrapped;
if (canReconnect) {
owner["_reconnectToAgent"]?.();
}
}
export class EmbeddedAttemptSessionTakeoverError extends Error {
constructor(sessionFile: string) {
super(`session file changed while embedded prompt lock was released: ${sessionFile}`);
@@ -176,6 +244,7 @@ export function installSessionEventWriteLock(params: {
session: unknown;
withSessionWriteLock: <T>(run: () => Promise<T> | T) => Promise<T>;
}): void {
installAwaitableSessionEventQueue(params.session);
const session = params.session as SessionEventProcessor;
const original = session["_processAgentEvent"];
if (
@@ -243,6 +312,7 @@ export function installSessionExternalHookWriteLock(params: {
export type EmbeddedAttemptSessionLockController = {
releaseForPrompt(): Promise<void>;
refreshAfterOwnedSessionWrite(): void;
waitForSessionEvents(session: unknown): Promise<void>;
withSessionWriteLock<T>(run: () => Promise<T> | T): Promise<T>;
acquireForCleanup(params?: { session?: unknown }): Promise<SessionLock>;
@@ -262,7 +332,7 @@ export async function createEmbeddedAttemptSessionLockController(params: {
});
let heldLock: SessionLock | undefined = await acquireLock();
const activeWriteLock = new AsyncLocalStorage<SessionLock>();
const activeWriteLock = new AsyncLocalStorage<ActiveWriteLockState>();
let fenceFingerprint: SessionFileFingerprint | undefined;
let fenceActive = false;
let takeoverDetected = false;
@@ -311,24 +381,36 @@ export async function createEmbeddedAttemptSessionLockController(params: {
fenceActive = true;
await lock.release();
},
refreshAfterOwnedSessionWrite(): void {
if (fenceActive && !takeoverDetected) {
fenceFingerprint = readSessionFileFingerprintSync(params.lockOptions.sessionFile);
}
},
waitForSessionEvents: waitForSessionEventQueue,
async withSessionWriteLock<T>(run: () => Promise<T> | T): Promise<T> {
if (takeoverDetected) {
throw new EmbeddedAttemptSessionTakeoverError(params.lockOptions.sessionFile);
}
if (activeWriteLock.getStore()) {
if (activeWriteLock.getStore()?.active === true) {
return await run();
}
const { lock, owned } = await acquireWriteLock();
try {
await assertSessionFileFence();
const runWithLock = async () => {
const result = await run();
await refreshSessionFileFence();
return result;
try {
return await run();
} finally {
await refreshSessionFileFence();
}
};
if (owned) {
return await activeWriteLock.run(lock, runWithLock);
const activeLockState: ActiveWriteLockState = { active: true };
try {
return await activeWriteLock.run(activeLockState, runWithLock);
} finally {
activeLockState.active = false;
}
}
return await runWithLock();
} finally {

View File

@@ -15,7 +15,10 @@ import {
runQuotaSuspensionMaintenance,
updateSessionStoreEntry,
} from "../../../config/sessions/store.js";
import { withOwnedSessionTranscriptWrites } from "../../../config/sessions/transcript-write-context.js";
import {
bindOwnedSessionTranscriptWrites,
withOwnedSessionTranscriptWrites,
} from "../../../config/sessions/transcript-write-context.js";
import { resolveContextEngineOwnerPluginId } from "../../../context-engine/registry.js";
import type { AssembleResult } from "../../../context-engine/types.js";
import { emitTrustedDiagnosticEvent } from "../../../infra/diagnostic-events.js";
@@ -2116,6 +2119,9 @@ export async function runEmbeddedAttempt(
suppressTranscriptOnlyAssistantPersistence:
params.suppressTranscriptOnlyAssistantPersistence,
suppressAssistantErrorPersistence: params.suppressAssistantErrorPersistence,
onMessagePersisted: () => {
sessionLockController.refreshAfterOwnedSessionWrite();
},
onUserMessagePersisted: (message) => {
params.onUserMessagePersisted?.(message);
},
@@ -3173,19 +3179,26 @@ export async function runEmbeddedAttempt(
};
const abortable = <T>(promise: Promise<T>): Promise<T> =>
abortableWithSignal(runAbortController.signal, promise);
const ownedTranscriptWriteContext = {
sessionFile: params.sessionFile,
sessionKey: params.sessionKey,
withSessionWriteLock: <T>(operation: () => Promise<T> | T) =>
sessionLockController.withSessionWriteLock(operation),
};
const promptActiveSession = (
prompt: string,
options?: Parameters<typeof activeSession.prompt>[1],
): Promise<void> =>
withOwnedSessionTranscriptWrites(
{
sessionFile: params.sessionFile,
sessionKey: params.sessionKey,
withSessionWriteLock: (operation) =>
sessionLockController.withSessionWriteLock(operation),
},
ownedTranscriptWriteContext,
async () => abortable(trackPromptSettlePromise(activeSession.prompt(prompt, options))),
);
const onBlockReply = params.onBlockReply
? bindOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, params.onBlockReply)
: undefined;
const onBlockReplyFlush = params.onBlockReplyFlush
? bindOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, params.onBlockReplyFlush)
: undefined;
const subscription = subscribeEmbeddedPiSession(
buildEmbeddedSubscriptionParams({
@@ -3203,8 +3216,8 @@ export async function runEmbeddedAttempt(
onToolResult: params.onToolResult,
onReasoningStream: params.onReasoningStream,
onReasoningEnd: params.onReasoningEnd,
onBlockReply: params.onBlockReply,
onBlockReplyFlush: params.onBlockReplyFlush,
onBlockReply,
onBlockReplyFlush,
blockReplyBreak: params.blockReplyBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: params.onPartialReply,
@@ -4204,8 +4217,8 @@ export async function runEmbeddedAttempt(
// user receives the assistant response immediately. Without this,
// coalesced/buffered blocks stay in the pipeline until compaction
// finishes — which can take minutes on large contexts (#35074).
if (params.onBlockReplyFlush) {
await params.onBlockReplyFlush();
if (onBlockReplyFlush) {
await onBlockReplyFlush();
}
// Skip compaction wait when yield aborted the run — the signal is

View File

@@ -38,6 +38,7 @@ export function guardSessionManager(
onUserMessagePersisted?: (
message: Extract<AgentMessage, { role: "user" }>,
) => void | Promise<void>;
onMessagePersisted?: (message: AgentMessage) => void | Promise<void>;
onAssistantErrorMessagePersisted?: (
message: Extract<AgentMessage, { role: "assistant" }>,
) => void | Promise<void>;
@@ -118,6 +119,7 @@ export function guardSessionManager(
suppressNextUserMessagePersistence: opts?.suppressNextUserMessagePersistence,
suppressTranscriptOnlyAssistantPersistence: opts?.suppressTranscriptOnlyAssistantPersistence,
suppressAssistantErrorPersistence: opts?.suppressAssistantErrorPersistence,
onMessagePersisted: opts?.onMessagePersisted,
onUserMessagePersisted: opts?.onUserMessagePersisted,
onAssistantErrorMessagePersisted: opts?.onAssistantErrorMessagePersisted,
});

View File

@@ -557,6 +557,7 @@ export function installSessionToolResultGuard(
onUserMessagePersisted?: (
message: Extract<AgentMessage, { role: "user" }>,
) => void | Promise<void>;
onMessagePersisted?: (message: AgentMessage) => void | Promise<void>;
onAssistantErrorMessagePersisted?: (
message: Extract<AgentMessage, { role: "assistant" }>,
) => void | Promise<void>;
@@ -598,6 +599,7 @@ export function installSessionToolResultGuard(
): { entryId: string; messageSeq?: number; sessionFile?: string | null } => {
const parentEntryId = sessionManager.getLeafId();
const entryId = originalAppend(message as never);
void opts?.onMessagePersisted?.(message);
const sessionFile = getSessionFile();
if (!sessionFile) {
return { entryId, sessionFile };

View File

@@ -10,6 +10,7 @@ import {
import { redactTranscriptMessage } from "../../agents/transcript-redact.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { redactSecrets } from "../../logging/redact.js";
import { runWithOwnedSessionTranscriptWriteLock } from "./transcript-write-context.js";
const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024;
const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024;
@@ -254,8 +255,12 @@ function isTranscriptAgentMessage(value: unknown): value is AgentMessage {
export async function appendSessionTranscriptMessage<TMessage>(
params: AppendSessionTranscriptMessageParams<TMessage>,
): Promise<{ messageId: string; message: TMessage }> {
return await withTranscriptAppendQueue(params.transcriptPath, () =>
appendSessionTranscriptMessageLocked(params),
return await runWithOwnedSessionTranscriptWriteLock(
{ sessionFile: params.transcriptPath },
async () =>
await withTranscriptAppendQueue(params.transcriptPath, () =>
appendSessionTranscriptMessageLocked(params),
),
);
}

View File

@@ -37,6 +37,13 @@ export async function withOwnedSessionTranscriptWrites<T>(
return await ownedTranscriptWriteContext.run(context, run);
}
export function bindOwnedSessionTranscriptWrites<TArgs extends unknown[], TResult>(
context: OwnedSessionTranscriptWriteContext,
run: (...args: TArgs) => TResult,
): (...args: TArgs) => TResult {
return (...args) => ownedTranscriptWriteContext.run(context, () => run(...args));
}
export async function runWithOwnedSessionTranscriptWriteLock<T>(
params: {
sessionFile?: string;

View File

@@ -6,7 +6,10 @@ import type { SessionTranscriptUpdate } from "../../sessions/transcript-events.j
import { resolveSessionTranscriptPathInDir } from "./paths.js";
import { useTempSessionsFixture } from "./test-helpers.js";
import { appendSessionTranscriptMessage } from "./transcript-append.js";
import { withOwnedSessionTranscriptWrites } from "./transcript-write-context.js";
import {
bindOwnedSessionTranscriptWrites,
withOwnedSessionTranscriptWrites,
} from "./transcript-write-context.js";
import {
appendAssistantMessageToSessionTranscript,
appendExactAssistantMessageToSessionTranscript,
@@ -134,6 +137,36 @@ describe("appendAssistantMessageToSessionTranscript", () => {
);
expect(result.ok).toBe(true);
expect(events).toEqual(["lock", "lock"]);
});
it("keeps matching owned transcript appends locked from bound callbacks", async () => {
const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir());
const events: string[] = [];
const callback = bindOwnedSessionTranscriptWrites(
{
sessionFile,
sessionKey,
withSessionWriteLock: async (run) => {
events.push("lock");
return await run();
},
},
async () =>
await appendSessionTranscriptMessage({
transcriptPath: sessionFile,
message: {
role: "assistant",
content: "Hello from bound delivery",
timestamp: Date.now(),
stopReason: "stop",
},
}),
);
const result = await callback();
expect(result.messageId).toBeTruthy();
expect(events).toEqual(["lock"]);
});