diff --git a/extensions/codex/src/app-server/transcript-mirror.ts b/extensions/codex/src/app-server/transcript-mirror.ts index 8742ab4d8e4..4b445cda106 100644 --- a/extensions/codex/src/app-server/transcript-mirror.ts +++ b/extensions/codex/src/app-server/transcript-mirror.ts @@ -1,10 +1,7 @@ -import { randomUUID } from "node:crypto"; import fs from "node:fs/promises"; -import path from "node:path"; -import { StringDecoder } from "node:string_decoder"; -import { CURRENT_SESSION_VERSION, type SessionManager } from "@mariozechner/pi-coding-agent"; import { acquireSessionWriteLock, + appendSessionTranscriptMessage, emitSessionTranscriptUpdate, resolveSessionWriteLockAcquireTimeoutMs, runAgentHarnessBeforeMessageWriteHook, @@ -12,15 +9,6 @@ import { type SessionWriteLockAcquireTimeoutConfig, } from "openclaw/plugin-sdk/agent-harness-runtime"; -const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024; -const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024; - -type TranscriptLeafInfo = { - leafId?: string; - hasParentLinkedEntries: boolean; - nonSessionEntryCount: number; -}; - export async function mirrorCodexAppServerTranscript(params: { sessionFile: string; sessionKey?: string; @@ -36,7 +24,6 @@ export async function mirrorCodexAppServerTranscript(params: { return; } - await fs.mkdir(path.dirname(params.sessionFile), { recursive: true }); const lock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), @@ -53,7 +40,7 @@ export async function mirrorCodexAppServerTranscript(params: { const transcriptMessage = { ...message, ...(idempotencyKey ? { idempotencyKey } : {}), - } as Parameters[0]; + } as AgentMessage; const nextMessage = runAgentHarnessBeforeMessageWriteHook({ message: transcriptMessage, agentId: params.agentId, @@ -62,15 +49,18 @@ export async function mirrorCodexAppServerTranscript(params: { if (!nextMessage) { continue; } - const messageToAppend = (idempotencyKey - ? { - ...(nextMessage as unknown as Record), - idempotencyKey, - } - : nextMessage) as unknown as Parameters[0]; - await appendCodexAppServerTranscriptMessage({ + const messageToAppend = ( + idempotencyKey + ? { + ...(nextMessage as unknown as Record), + idempotencyKey, + } + : nextMessage + ) as AgentMessage; + await appendSessionTranscriptMessage({ transcriptPath: params.sessionFile, message: messageToAppend, + config: params.config, }); if (idempotencyKey) { existingIdempotencyKeys.add(idempotencyKey); @@ -87,202 +77,6 @@ export async function mirrorCodexAppServerTranscript(params: { } } -async function appendCodexAppServerTranscriptMessage(params: { - transcriptPath: string; - message: unknown; -}): Promise { - await ensureTranscriptHeader(params.transcriptPath); - const stat = await fs.stat(params.transcriptPath).catch(() => null); - let leafInfo: TranscriptLeafInfo = await readTranscriptLeafInfo(params.transcriptPath).catch( - () => ({ - hasParentLinkedEntries: false, - nonSessionEntryCount: 0, - }), - ); - const hasLinearEntries = !leafInfo.hasParentLinkedEntries && leafInfo.nonSessionEntryCount > 0; - const shouldRawAppend = hasLinearEntries && (stat?.size ?? 0) > SESSION_MANAGER_APPEND_MAX_BYTES; - if (hasLinearEntries && !shouldRawAppend) { - const migrated = await migrateLinearTranscriptToParentLinked(params.transcriptPath); - leafInfo = { - ...(migrated.leafId ? { leafId: migrated.leafId } : {}), - hasParentLinkedEntries: Boolean(migrated.leafId), - nonSessionEntryCount: leafInfo.nonSessionEntryCount, - }; - } - const entry = { - type: "message", - id: randomUUID(), - ...(shouldRawAppend ? {} : { parentId: leafInfo.leafId ?? null }), - timestamp: new Date().toISOString(), - message: params.message, - }; - await fs.appendFile(params.transcriptPath, `${JSON.stringify(entry)}\n`, "utf-8"); -} - -async function ensureTranscriptHeader(transcriptPath: string): Promise { - const stat = await fs.stat(transcriptPath).catch(() => null); - if (stat?.isFile() && stat.size > 0) { - return; - } - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - const header = { - type: "session", - version: CURRENT_SESSION_VERSION, - id: randomUUID(), - timestamp: new Date().toISOString(), - cwd: process.cwd(), - }; - await fs.writeFile(transcriptPath, `${JSON.stringify(header)}\n`, { - encoding: "utf-8", - mode: 0o600, - flag: stat?.isFile() ? "w" : "wx", - }); -} - -async function readTranscriptLeafInfo(transcriptPath: string): Promise { - const handle = await fs.open(transcriptPath, "r"); - try { - const decoder = new StringDecoder("utf8"); - const buffer = Buffer.allocUnsafe(TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES); - let carry = ""; - let leafId: string | undefined; - let hasParentLinkedEntries = false; - let nonSessionEntryCount = 0; - while (true) { - const { bytesRead } = await handle.read(buffer, 0, buffer.length, null); - if (bytesRead <= 0) { - break; - } - const text = carry + decoder.write(buffer.subarray(0, bytesRead)); - const lines = text.split(/\r?\n/); - carry = lines.pop() ?? ""; - for (const line of lines) { - if (lineHasNonSessionEntry(line)) { - nonSessionEntryCount += 1; - } - const id = lineParentLinkedEntryId(line); - if (id) { - leafId = id; - hasParentLinkedEntries = true; - } - } - await yieldTranscriptAppendScan(); - } - const tail = carry + decoder.end(); - if (lineHasNonSessionEntry(tail)) { - nonSessionEntryCount += 1; - } - const id = lineParentLinkedEntryId(tail); - if (id) { - leafId = id; - hasParentLinkedEntries = true; - } - return { - ...(leafId ? { leafId } : {}), - hasParentLinkedEntries, - nonSessionEntryCount, - }; - } finally { - await handle.close(); - } -} - -async function migrateLinearTranscriptToParentLinked(transcriptPath: string): Promise<{ - leafId?: string; -}> { - const raw = await fs.readFile(transcriptPath, "utf-8"); - const existingIds = new Set(); - const output: string[] = []; - let previousId: string | null = null; - let leafId: string | undefined; - for (const line of raw.split(/\r?\n/)) { - if (!line.trim()) { - continue; - } - let parsed: unknown; - try { - parsed = JSON.parse(line); - } catch { - output.push(line); - continue; - } - if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { - output.push(line); - continue; - } - const record = parsed as Record; - if (record.type === "session") { - output.push(JSON.stringify({ ...record, version: CURRENT_SESSION_VERSION })); - continue; - } - const id = normalizeEntryId(record.id) ?? generateEntryId(existingIds); - existingIds.add(id); - record.id = id; - if (!Object.hasOwn(record, "parentId")) { - record.parentId = previousId; - } - previousId = id; - leafId = id; - output.push(JSON.stringify(record)); - } - await fs.writeFile(transcriptPath, `${output.join("\n")}\n`, { - encoding: "utf-8", - mode: 0o600, - }); - const result: { leafId?: string } = {}; - if (leafId) { - result.leafId = leafId; - } - return result; -} - -function normalizeEntryId(value: unknown): string | undefined { - return typeof value === "string" && value.trim().length > 0 ? value : undefined; -} - -function generateEntryId(existingIds: Set): string { - for (let attempt = 0; attempt < 100; attempt += 1) { - const id = randomUUID().slice(0, 8); - if (!existingIds.has(id)) { - existingIds.add(id); - return id; - } - } - const id = randomUUID(); - existingIds.add(id); - return id; -} - -function lineHasNonSessionEntry(line: string): boolean { - if (!line.trim()) { - return false; - } - try { - const parsed = JSON.parse(line) as { type?: unknown }; - return parsed.type !== "session"; - } catch { - return false; - } -} - -function lineParentLinkedEntryId(line: string): string | undefined { - if (!line.trim()) { - return undefined; - } - try { - const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown }; - return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed - ? parsed.id - : undefined; - } catch { - return undefined; - } -} - -async function yieldTranscriptAppendScan(): Promise { - await new Promise((resolve) => setImmediate(resolve)); -} - async function readTranscriptIdempotencyKeys(sessionFile: string): Promise> { const keys = new Set(); let raw: string; diff --git a/src/plugin-sdk/agent-harness-runtime.ts b/src/plugin-sdk/agent-harness-runtime.ts index e31a7f95cae..e7b3393605a 100644 --- a/src/plugin-sdk/agent-harness-runtime.ts +++ b/src/plugin-sdk/agent-harness-runtime.ts @@ -113,6 +113,7 @@ export { resolveSessionWriteLockAcquireTimeoutMs, type SessionWriteLockAcquireTimeoutConfig, } from "../agents/session-write-lock.js"; +export { appendSessionTranscriptMessage } from "../config/sessions/transcript-append.js"; export { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js"; export { isToolWrappedWithBeforeToolCallHook,