From 3f6b481464bfb6d14592f91d33d76d6d481daed6 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 6 May 2026 04:06:23 +0100 Subject: [PATCH] fix: serialize concurrent transcript appends --- src/config/sessions/transcript-append.ts | 49 ++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/config/sessions/transcript-append.ts b/src/config/sessions/transcript-append.ts index 7b4959d3f35..9f88b786427 100644 --- a/src/config/sessions/transcript-append.ts +++ b/src/config/sessions/transcript-append.ts @@ -13,6 +13,7 @@ const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024; let piCodingAgentModulePromise: Promise | null = null; +const transcriptAppendQueues = new Map>(); async function loadCurrentSessionVersion(): Promise { piCodingAgentModulePromise ??= import("@mariozechner/pi-coding-agent"); @@ -194,6 +195,40 @@ async function ensureTranscriptHeader( }); } +async function resolveTranscriptAppendQueueKey(transcriptPath: string): Promise { + const resolvedTranscriptPath = path.resolve(transcriptPath); + const transcriptDir = path.dirname(resolvedTranscriptPath); + await fs.mkdir(transcriptDir, { recursive: true }); + try { + return path.join(await fs.realpath(transcriptDir), path.basename(resolvedTranscriptPath)); + } catch { + return resolvedTranscriptPath; + } +} + +async function withTranscriptAppendQueue( + transcriptPath: string, + fn: () => Promise, +): Promise { + const queueKey = await resolveTranscriptAppendQueueKey(transcriptPath); + const previous = transcriptAppendQueues.get(queueKey) ?? Promise.resolve(); + let releaseCurrent!: () => void; + const current = new Promise((resolve) => { + releaseCurrent = resolve; + }); + const tail = previous.catch(() => undefined).then(() => current); + transcriptAppendQueues.set(queueKey, tail); + await previous.catch(() => undefined); + try { + return await fn(); + } finally { + releaseCurrent(); + if (transcriptAppendQueues.get(queueKey) === tail) { + transcriptAppendQueues.delete(queueKey); + } + } +} + export async function appendSessionTranscriptMessage(params: { transcriptPath: string; message: unknown; @@ -202,6 +237,20 @@ export async function appendSessionTranscriptMessage(params: { cwd?: string; useRawWhenLinear?: boolean; config?: SessionWriteLockAcquireTimeoutConfig; +}): Promise<{ messageId: string }> { + return await withTranscriptAppendQueue(params.transcriptPath, () => + appendSessionTranscriptMessageLocked(params), + ); +} + +async function appendSessionTranscriptMessageLocked(params: { + transcriptPath: string; + message: unknown; + now?: number; + sessionId?: string; + cwd?: string; + useRawWhenLinear?: boolean; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise<{ messageId: string }> { const lock = await acquireSessionWriteLock({ sessionFile: params.transcriptPath,