From f59991c572e65c55a8aaeaee63303bafae757d62 Mon Sep 17 00:00:00 2001 From: tianxiaochannel-oss88 Date: Wed, 20 May 2026 20:07:29 +0800 Subject: [PATCH] fix(agents): close session lock cleanup trust gaps --- .../run/attempt.session-lock.test.ts | 118 ++++++++++++++++-- .../run/attempt.session-lock.ts | 30 ++--- src/agents/pi-embedded-runner/run/attempt.ts | 6 +- .../sessions/transcript-write-context.ts | 25 +++- src/config/sessions/transcript.ts | 24 ++-- 5 files changed, 164 insertions(+), 39 deletions(-) diff --git a/src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts b/src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts index ee5971a9171..cea2d9dd91b 100644 --- a/src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; import { runWithOwnedSessionTranscriptWriteLock, + runWithOwnedSessionTranscriptWritePublication, withOwnedSessionTranscriptWrites, } from "../../../config/sessions/transcript-write-context.js"; import { SessionWriteLockTimeoutError } from "../../session-write-lock-error.js"; @@ -287,7 +288,95 @@ describe("embedded attempt session lock lifecycle", () => { expect(controller.hasSessionTakeover()).toBe(false); }); - it("allows post-prompt writes after another controller publishes an owned transcript write", async () => { + it("allows post-prompt writes after the prompt context publishes an owned transcript write", async () => { + const sessionFile = await createTempSessionFile(); + const releases: string[] = []; + const acquireSessionWriteLock = vi.fn(async () => ({ + release: vi.fn(async () => { + releases.push("release"); + }), + })); + const firstController = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions: { ...lockOptions, sessionFile }, + }); + + await firstController.releaseForPrompt(); + + const secondController = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions: { ...lockOptions, sessionFile }, + }); + const promptActiveSession = async (run: () => Promise): Promise => + await withOwnedSessionTranscriptWrites( + { + sessionFile, + sessionKey: "agent:main:slack:channel:456", + withSessionWriteLock: (operation, options) => + secondController.withSessionWriteLock(operation, options), + }, + run, + ); + await promptActiveSession( + async () => + await runWithOwnedSessionTranscriptWritePublication( + { sessionFile, sessionKey: "agent:main:slack:channel:456" }, + async () => { + await fs.appendFile(sessionFile, '{"type":"message","id":"same-process"}\n', "utf8"); + }, + ), + ); + await secondController.releaseForPrompt(); + + await expect( + firstController.withSessionWriteLock(async () => { + await fs.appendFile(sessionFile, '{"type":"message","id":"post-prompt"}\n', "utf8"); + return "post-write"; + }), + ).resolves.toBe("post-write"); + + expect(firstController.hasSessionTakeover()).toBe(false); + expect(acquireSessionWriteLock).toHaveBeenCalledTimes(3); + expect(releases).toEqual(["release", "release", "release"]); + }); + + it("rejects external edits interleaved while another controller holds cleanup lock", async () => { + const sessionFile = await createTempSessionFile(); + const releases: string[] = []; + const acquireSessionWriteLock = vi.fn(async () => ({ + release: vi.fn(async () => { + releases.push("release"); + }), + })); + const firstController = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions: { ...lockOptions, sessionFile }, + }); + + await firstController.releaseForPrompt(); + + const secondController = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions: { ...lockOptions, sessionFile }, + }); + await secondController.releaseForPrompt(); + const cleanupLock = await secondController.acquireForCleanup(); + + await fs.appendFile(sessionFile, '{"type":"message","id":"external-cleanup"}\n', "utf8"); + await cleanupLock.release(); + + await expect( + firstController.withSessionWriteLock(async () => { + await fs.appendFile(sessionFile, '{"type":"message","id":"late"}\n', "utf8"); + }), + ).rejects.toBeInstanceOf(EmbeddedAttemptSessionTakeoverError); + + expect(firstController.hasSessionTakeover()).toBe(true); + expect(acquireSessionWriteLock).toHaveBeenCalledTimes(4); + expect(releases).toEqual(["release", "release", "release", "release"]); + }); + + it("rejects external edits interleaved inside a broad owned transcript lock", async () => { const sessionFile = await createTempSessionFile(); const releases: string[] = []; const acquireSessionWriteLock = vi.fn(async () => ({ @@ -309,15 +398,29 @@ describe("embedded attempt session lock lifecycle", () => { await withOwnedSessionTranscriptWrites( { sessionFile, - sessionKey: "agent:main:slack:channel:456", + sessionKey: "agent:main:slack:channel:789", withSessionWriteLock: (operation, options) => secondController.withSessionWriteLock(operation, options), }, async () => await runWithOwnedSessionTranscriptWriteLock( - { sessionFile, sessionKey: "agent:main:slack:channel:456" }, + { sessionFile, sessionKey: "agent:main:slack:channel:789" }, async () => { - await fs.appendFile(sessionFile, '{"type":"message","id":"same-process"}\n', "utf8"); + await fs.appendFile( + sessionFile, + '{"type":"message","id":"external-owned-scope"}\n', + "utf8", + ); + await runWithOwnedSessionTranscriptWritePublication( + { sessionFile, sessionKey: "agent:main:slack:channel:789" }, + async () => { + await fs.appendFile( + sessionFile, + '{"type":"message","id":"same-process"}\n', + "utf8", + ); + }, + ); }, ), ); @@ -325,12 +428,11 @@ describe("embedded attempt session lock lifecycle", () => { await expect( firstController.withSessionWriteLock(async () => { - await fs.appendFile(sessionFile, '{"type":"message","id":"post-prompt"}\n', "utf8"); - return "post-write"; + await fs.appendFile(sessionFile, '{"type":"message","id":"late"}\n', "utf8"); }), - ).resolves.toBe("post-write"); + ).rejects.toBeInstanceOf(EmbeddedAttemptSessionTakeoverError); - expect(firstController.hasSessionTakeover()).toBe(false); + expect(firstController.hasSessionTakeover()).toBe(true); expect(acquireSessionWriteLock).toHaveBeenCalledTimes(3); expect(releases).toEqual(["release", "release", "release"]); }); diff --git a/src/agents/pi-embedded-runner/run/attempt.session-lock.ts b/src/agents/pi-embedded-runner/run/attempt.session-lock.ts index 75f565c4352..688bd696bae 100644 --- a/src/agents/pi-embedded-runner/run/attempt.session-lock.ts +++ b/src/agents/pi-embedded-runner/run/attempt.session-lock.ts @@ -488,23 +488,6 @@ export async function createEmbeddedAttemptSessionLockController(params: { const noopLock: SessionLock = { release: async () => {} }; - function wrapOwnedCleanupLock( - lock: SessionLock, - beforeCleanup: SessionFileFingerprint, - ): SessionLock { - return { - release: async () => { - try { - if (!takeoverDetected) { - await publishOwnedSessionFileWriteIfChanged(beforeCleanup).catch(() => {}); - } - } finally { - await lock.release(); - } - }, - }; - } - return { async releaseForPrompt(): Promise { if (!heldLock) { @@ -537,7 +520,15 @@ export async function createEmbeddedAttemptSessionLockController(params: { throw new EmbeddedAttemptSessionTakeoverError(params.lockOptions.sessionFile); } if (activeWriteLock.getStore()?.active === true) { - return await run(); + if (options?.publishOwnedWrite !== true) { + return await run(); + } + const beforeWrite = await readSessionFileFingerprint(params.lockOptions.sessionFile); + try { + return await run(); + } finally { + await publishOwnedSessionFileFence(beforeWrite); + } } const { lock, owned } = await acquireWriteLock(); try { @@ -596,8 +587,7 @@ export async function createEmbeddedAttemptSessionLockController(params: { } throw err; } - const beforeCleanup = await readSessionFileFingerprint(params.lockOptions.sessionFile); - return wrapOwnedCleanupLock(cleanupLock, beforeCleanup); + return cleanupLock; }, hasSessionTakeover(): boolean { return takeoverDetected; diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 2352b20ea38..ec221d4654d 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -3182,8 +3182,10 @@ export async function runEmbeddedAttempt( const ownedTranscriptWriteContext = { sessionFile: params.sessionFile, sessionKey: params.sessionKey, - withSessionWriteLock: (operation: () => Promise | T) => - sessionLockController.withSessionWriteLock(operation), + withSessionWriteLock: ( + operation: () => Promise | T, + options?: { publishOwnedWrite?: boolean }, + ) => sessionLockController.withSessionWriteLock(operation, options), }; const promptActiveSession = ( prompt: string, diff --git a/src/config/sessions/transcript-write-context.ts b/src/config/sessions/transcript-write-context.ts index 02dc68f33a4..0f65d50ecd7 100644 --- a/src/config/sessions/transcript-write-context.ts +++ b/src/config/sessions/transcript-write-context.ts @@ -53,10 +53,33 @@ export async function runWithOwnedSessionTranscriptWriteLock( sessionKey?: string; }, run: () => Promise | T, +): Promise { + return await runWithOwnedSessionTranscriptWriteContext(params, run); +} + +export async function runWithOwnedSessionTranscriptWritePublication( + params: { + sessionFile?: string; + sessionKey?: string; + }, + run: () => Promise | T, +): Promise { + return await runWithOwnedSessionTranscriptWriteContext(params, run, { + publishOwnedWrite: true, + }); +} + +async function runWithOwnedSessionTranscriptWriteContext( + params: { + sessionFile?: string; + sessionKey?: string; + }, + run: () => Promise | T, + options?: { publishOwnedWrite?: boolean }, ): Promise { const context = ownedTranscriptWriteContext.getStore(); if (!context || !contextMatches({ context, ...params })) { return await run(); } - return await context.withSessionWriteLock(run, { publishOwnedWrite: true }); + return await context.withSessionWriteLock(run, options); } diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index a729402a8bf..8aece357ee5 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -22,7 +22,10 @@ import { streamSessionTranscriptLines, streamSessionTranscriptLinesReverse, } from "./transcript-stream.js"; -import { runWithOwnedSessionTranscriptWriteLock } from "./transcript-write-context.js"; +import { + runWithOwnedSessionTranscriptWriteLock, + runWithOwnedSessionTranscriptWritePublication, +} from "./transcript-write-context.js"; import type { SessionEntry } from "./types.js"; let piCodingAgentModulePromise: Promise | null = @@ -317,8 +320,6 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { return await runWithOwnedSessionTranscriptWriteLock( { sessionFile, sessionKey: resolved.normalizedKey }, async () => { - await ensureSessionHeader({ sessionFile, sessionId: entry.sessionId }); - const explicitIdempotencyKey = params.idempotencyKey ?? ((params.message as { idempotencyKey?: unknown }).idempotencyKey as string | undefined); @@ -344,11 +345,18 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { ...params.message, ...(explicitIdempotencyKey ? { idempotencyKey: explicitIdempotencyKey } : {}), } as Parameters[0]; - const { messageId, message: appendedMessage } = await appendSessionTranscriptMessage({ - transcriptPath: sessionFile, - message, - config: params.config, - }); + const { messageId, message: appendedMessage } = + await runWithOwnedSessionTranscriptWritePublication( + { sessionFile, sessionKey: resolved.normalizedKey }, + async () => { + await ensureSessionHeader({ sessionFile, sessionId: entry.sessionId }); + return await appendSessionTranscriptMessage({ + transcriptPath: sessionFile, + message, + config: params.config, + }); + }, + ); switch (params.updateMode ?? "inline") { case "inline":