mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:40:44 +00:00
fix: serialize concurrent transcript appends
This commit is contained in:
@@ -13,6 +13,7 @@ const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024;
|
||||
|
||||
let piCodingAgentModulePromise: Promise<typeof import("@mariozechner/pi-coding-agent")> | null =
|
||||
null;
|
||||
const transcriptAppendQueues = new Map<string, Promise<void>>();
|
||||
|
||||
async function loadCurrentSessionVersion(): Promise<number> {
|
||||
piCodingAgentModulePromise ??= import("@mariozechner/pi-coding-agent");
|
||||
@@ -194,6 +195,40 @@ async function ensureTranscriptHeader(
|
||||
});
|
||||
}
|
||||
|
||||
async function resolveTranscriptAppendQueueKey(transcriptPath: string): Promise<string> {
|
||||
const resolvedTranscriptPath = path.resolve(transcriptPath);
|
||||
const transcriptDir = path.dirname(resolvedTranscriptPath);
|
||||
await fs.mkdir(transcriptDir, { recursive: true });
|
||||
try {
|
||||
return path.join(await fs.realpath(transcriptDir), path.basename(resolvedTranscriptPath));
|
||||
} catch {
|
||||
return resolvedTranscriptPath;
|
||||
}
|
||||
}
|
||||
|
||||
async function withTranscriptAppendQueue<T>(
|
||||
transcriptPath: string,
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
const queueKey = await resolveTranscriptAppendQueueKey(transcriptPath);
|
||||
const previous = transcriptAppendQueues.get(queueKey) ?? Promise.resolve();
|
||||
let releaseCurrent!: () => void;
|
||||
const current = new Promise<void>((resolve) => {
|
||||
releaseCurrent = resolve;
|
||||
});
|
||||
const tail = previous.catch(() => undefined).then(() => current);
|
||||
transcriptAppendQueues.set(queueKey, tail);
|
||||
await previous.catch(() => undefined);
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
releaseCurrent();
|
||||
if (transcriptAppendQueues.get(queueKey) === tail) {
|
||||
transcriptAppendQueues.delete(queueKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function appendSessionTranscriptMessage(params: {
|
||||
transcriptPath: string;
|
||||
message: unknown;
|
||||
@@ -202,6 +237,20 @@ export async function appendSessionTranscriptMessage(params: {
|
||||
cwd?: string;
|
||||
useRawWhenLinear?: boolean;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<{ messageId: string }> {
|
||||
return await withTranscriptAppendQueue(params.transcriptPath, () =>
|
||||
appendSessionTranscriptMessageLocked(params),
|
||||
);
|
||||
}
|
||||
|
||||
async function appendSessionTranscriptMessageLocked(params: {
|
||||
transcriptPath: string;
|
||||
message: unknown;
|
||||
now?: number;
|
||||
sessionId?: string;
|
||||
cwd?: string;
|
||||
useRawWhenLinear?: boolean;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<{ messageId: string }> {
|
||||
const lock = await acquireSessionWriteLock({
|
||||
sessionFile: params.transcriptPath,
|
||||
|
||||
Reference in New Issue
Block a user