fix(agents): close session lock cleanup trust gaps

This commit is contained in:
tianxiaochannel-oss88
2026-05-20 20:07:29 +08:00
committed by Josh Lehman
parent 02cad4a33a
commit f59991c572
5 changed files with 164 additions and 39 deletions

View File

@@ -4,6 +4,7 @@ import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
runWithOwnedSessionTranscriptWriteLock,
runWithOwnedSessionTranscriptWritePublication,
withOwnedSessionTranscriptWrites,
} from "../../../config/sessions/transcript-write-context.js";
import { SessionWriteLockTimeoutError } from "../../session-write-lock-error.js";
@@ -287,7 +288,95 @@ describe("embedded attempt session lock lifecycle", () => {
expect(controller.hasSessionTakeover()).toBe(false);
});
it("allows post-prompt writes after another controller publishes an owned transcript write", async () => {
it("allows post-prompt writes after the prompt context publishes an owned transcript write", async () => {
const sessionFile = await createTempSessionFile();
const releases: string[] = [];
const acquireSessionWriteLock = vi.fn(async () => ({
release: vi.fn(async () => {
releases.push("release");
}),
}));
const firstController = await createEmbeddedAttemptSessionLockController({
acquireSessionWriteLock,
lockOptions: { ...lockOptions, sessionFile },
});
await firstController.releaseForPrompt();
const secondController = await createEmbeddedAttemptSessionLockController({
acquireSessionWriteLock,
lockOptions: { ...lockOptions, sessionFile },
});
const promptActiveSession = async (run: () => Promise<void>): Promise<void> =>
await withOwnedSessionTranscriptWrites(
{
sessionFile,
sessionKey: "agent:main:slack:channel:456",
withSessionWriteLock: (operation, options) =>
secondController.withSessionWriteLock(operation, options),
},
run,
);
await promptActiveSession(
async () =>
await runWithOwnedSessionTranscriptWritePublication(
{ sessionFile, sessionKey: "agent:main:slack:channel:456" },
async () => {
await fs.appendFile(sessionFile, '{"type":"message","id":"same-process"}\n', "utf8");
},
),
);
await secondController.releaseForPrompt();
await expect(
firstController.withSessionWriteLock(async () => {
await fs.appendFile(sessionFile, '{"type":"message","id":"post-prompt"}\n', "utf8");
return "post-write";
}),
).resolves.toBe("post-write");
expect(firstController.hasSessionTakeover()).toBe(false);
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(3);
expect(releases).toEqual(["release", "release", "release"]);
});
it("rejects external edits interleaved while another controller holds cleanup lock", async () => {
const sessionFile = await createTempSessionFile();
const releases: string[] = [];
const acquireSessionWriteLock = vi.fn(async () => ({
release: vi.fn(async () => {
releases.push("release");
}),
}));
const firstController = await createEmbeddedAttemptSessionLockController({
acquireSessionWriteLock,
lockOptions: { ...lockOptions, sessionFile },
});
await firstController.releaseForPrompt();
const secondController = await createEmbeddedAttemptSessionLockController({
acquireSessionWriteLock,
lockOptions: { ...lockOptions, sessionFile },
});
await secondController.releaseForPrompt();
const cleanupLock = await secondController.acquireForCleanup();
await fs.appendFile(sessionFile, '{"type":"message","id":"external-cleanup"}\n', "utf8");
await cleanupLock.release();
await expect(
firstController.withSessionWriteLock(async () => {
await fs.appendFile(sessionFile, '{"type":"message","id":"late"}\n', "utf8");
}),
).rejects.toBeInstanceOf(EmbeddedAttemptSessionTakeoverError);
expect(firstController.hasSessionTakeover()).toBe(true);
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(4);
expect(releases).toEqual(["release", "release", "release", "release"]);
});
it("rejects external edits interleaved inside a broad owned transcript lock", async () => {
const sessionFile = await createTempSessionFile();
const releases: string[] = [];
const acquireSessionWriteLock = vi.fn(async () => ({
@@ -309,15 +398,29 @@ describe("embedded attempt session lock lifecycle", () => {
await withOwnedSessionTranscriptWrites(
{
sessionFile,
sessionKey: "agent:main:slack:channel:456",
sessionKey: "agent:main:slack:channel:789",
withSessionWriteLock: (operation, options) =>
secondController.withSessionWriteLock(operation, options),
},
async () =>
await runWithOwnedSessionTranscriptWriteLock(
{ sessionFile, sessionKey: "agent:main:slack:channel:456" },
{ sessionFile, sessionKey: "agent:main:slack:channel:789" },
async () => {
await fs.appendFile(sessionFile, '{"type":"message","id":"same-process"}\n', "utf8");
await fs.appendFile(
sessionFile,
'{"type":"message","id":"external-owned-scope"}\n',
"utf8",
);
await runWithOwnedSessionTranscriptWritePublication(
{ sessionFile, sessionKey: "agent:main:slack:channel:789" },
async () => {
await fs.appendFile(
sessionFile,
'{"type":"message","id":"same-process"}\n',
"utf8",
);
},
);
},
),
);
@@ -325,12 +428,11 @@ describe("embedded attempt session lock lifecycle", () => {
await expect(
firstController.withSessionWriteLock(async () => {
await fs.appendFile(sessionFile, '{"type":"message","id":"post-prompt"}\n', "utf8");
return "post-write";
await fs.appendFile(sessionFile, '{"type":"message","id":"late"}\n', "utf8");
}),
).resolves.toBe("post-write");
).rejects.toBeInstanceOf(EmbeddedAttemptSessionTakeoverError);
expect(firstController.hasSessionTakeover()).toBe(false);
expect(firstController.hasSessionTakeover()).toBe(true);
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(3);
expect(releases).toEqual(["release", "release", "release"]);
});

View File

@@ -488,23 +488,6 @@ export async function createEmbeddedAttemptSessionLockController(params: {
const noopLock: SessionLock = { release: async () => {} };
function wrapOwnedCleanupLock(
lock: SessionLock,
beforeCleanup: SessionFileFingerprint,
): SessionLock {
return {
release: async () => {
try {
if (!takeoverDetected) {
await publishOwnedSessionFileWriteIfChanged(beforeCleanup).catch(() => {});
}
} finally {
await lock.release();
}
},
};
}
return {
async releaseForPrompt(): Promise<void> {
if (!heldLock) {
@@ -537,7 +520,15 @@ export async function createEmbeddedAttemptSessionLockController(params: {
throw new EmbeddedAttemptSessionTakeoverError(params.lockOptions.sessionFile);
}
if (activeWriteLock.getStore()?.active === true) {
return await run();
if (options?.publishOwnedWrite !== true) {
return await run();
}
const beforeWrite = await readSessionFileFingerprint(params.lockOptions.sessionFile);
try {
return await run();
} finally {
await publishOwnedSessionFileFence(beforeWrite);
}
}
const { lock, owned } = await acquireWriteLock();
try {
@@ -596,8 +587,7 @@ export async function createEmbeddedAttemptSessionLockController(params: {
}
throw err;
}
const beforeCleanup = await readSessionFileFingerprint(params.lockOptions.sessionFile);
return wrapOwnedCleanupLock(cleanupLock, beforeCleanup);
return cleanupLock;
},
hasSessionTakeover(): boolean {
return takeoverDetected;

View File

@@ -3182,8 +3182,10 @@ export async function runEmbeddedAttempt(
const ownedTranscriptWriteContext = {
sessionFile: params.sessionFile,
sessionKey: params.sessionKey,
withSessionWriteLock: <T>(operation: () => Promise<T> | T) =>
sessionLockController.withSessionWriteLock(operation),
withSessionWriteLock: <T>(
operation: () => Promise<T> | T,
options?: { publishOwnedWrite?: boolean },
) => sessionLockController.withSessionWriteLock(operation, options),
};
const promptActiveSession = (
prompt: string,

View File

@@ -53,10 +53,33 @@ export async function runWithOwnedSessionTranscriptWriteLock<T>(
sessionKey?: string;
},
run: () => Promise<T> | T,
): Promise<T> {
return await runWithOwnedSessionTranscriptWriteContext(params, run);
}
export async function runWithOwnedSessionTranscriptWritePublication<T>(
params: {
sessionFile?: string;
sessionKey?: string;
},
run: () => Promise<T> | T,
): Promise<T> {
return await runWithOwnedSessionTranscriptWriteContext(params, run, {
publishOwnedWrite: true,
});
}
async function runWithOwnedSessionTranscriptWriteContext<T>(
params: {
sessionFile?: string;
sessionKey?: string;
},
run: () => Promise<T> | T,
options?: { publishOwnedWrite?: boolean },
): Promise<T> {
const context = ownedTranscriptWriteContext.getStore();
if (!context || !contextMatches({ context, ...params })) {
return await run();
}
return await context.withSessionWriteLock(run, { publishOwnedWrite: true });
return await context.withSessionWriteLock(run, options);
}

View File

@@ -22,7 +22,10 @@ import {
streamSessionTranscriptLines,
streamSessionTranscriptLinesReverse,
} from "./transcript-stream.js";
import { runWithOwnedSessionTranscriptWriteLock } from "./transcript-write-context.js";
import {
runWithOwnedSessionTranscriptWriteLock,
runWithOwnedSessionTranscriptWritePublication,
} from "./transcript-write-context.js";
import type { SessionEntry } from "./types.js";
let piCodingAgentModulePromise: Promise<typeof import("@earendil-works/pi-coding-agent")> | null =
@@ -317,8 +320,6 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
return await runWithOwnedSessionTranscriptWriteLock(
{ sessionFile, sessionKey: resolved.normalizedKey },
async () => {
await ensureSessionHeader({ sessionFile, sessionId: entry.sessionId });
const explicitIdempotencyKey =
params.idempotencyKey ??
((params.message as { idempotencyKey?: unknown }).idempotencyKey as string | undefined);
@@ -344,11 +345,18 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
...params.message,
...(explicitIdempotencyKey ? { idempotencyKey: explicitIdempotencyKey } : {}),
} as Parameters<SessionManager["appendMessage"]>[0];
const { messageId, message: appendedMessage } = await appendSessionTranscriptMessage({
transcriptPath: sessionFile,
message,
config: params.config,
});
const { messageId, message: appendedMessage } =
await runWithOwnedSessionTranscriptWritePublication(
{ sessionFile, sessionKey: resolved.normalizedKey },
async () => {
await ensureSessionHeader({ sessionFile, sessionId: entry.sessionId });
return await appendSessionTranscriptMessage({
transcriptPath: sessionFile,
message,
config: params.config,
});
},
);
switch (params.updateMode ?? "inline") {
case "inline":