diff --git a/src/auto-reply/reply/agent-runner-session-reset.ts b/src/auto-reply/reply/agent-runner-session-reset.ts index 46cfc2efbee..55df3cbc5a1 100644 --- a/src/auto-reply/reply/agent-runner-session-reset.ts +++ b/src/auto-reply/reply/agent-runner-session-reset.ts @@ -99,7 +99,7 @@ export async function resetReplyRunSession(params: { } // Silent rotations (compaction/role-ordering) fire without user intent, so // preserve recent user/assistant turns for direct-chat continuity. - replayRecentUserAssistantMessages({ + await replayRecentUserAssistantMessages({ sourceTranscript: prevEntry.sessionFile, targetTranscript: nextSessionFile, newSessionId: nextSessionId, diff --git a/src/auto-reply/reply/session-transcript-replay.test.ts b/src/auto-reply/reply/session-transcript-replay.test.ts index 20c754623dd..e670dec4c66 100644 --- a/src/auto-reply/reply/session-transcript-replay.test.ts +++ b/src/auto-reply/reply/session-transcript-replay.test.ts @@ -17,7 +17,7 @@ describe("replayRecentUserAssistantMessages", () => { afterEach(async () => { await fs.rm(root, { recursive: true, force: true }); }); - const call = (source: string, target: string): number => + const call = (source: string, target: string): Promise => replayRecentUserAssistantMessages({ sourceTranscript: source, targetTranscript: target, @@ -36,7 +36,7 @@ describe("replayRecentUserAssistantMessages", () => { lines.push("not-json-line\n"); await fs.writeFile(source, lines.join(""), "utf8"); - expect(call(source, target)).toBe(DEFAULT_REPLAY_MAX_MESSAGES); + expect(await call(source, target)).toBe(DEFAULT_REPLAY_MAX_MESSAGES); const records = (await fs.readFile(target, "utf8")) .split(/\r?\n/) .filter((line) => line.trim().length > 0) @@ -46,6 +46,26 @@ describe("replayRecentUserAssistantMessages", () => { for (const r of records.slice(1)) { expect(["user", "assistant"]).toContain(r.message.role); } - expect(call(path.join(root, "missing.jsonl"), path.join(root, "out.jsonl"))).toBe(0); + expect(await call(path.join(root, "missing.jsonl"), path.join(root, "out.jsonl"))).toBe(0); + }); + + it("skips header for pre-existing targets and aligns the tail to a user turn", async () => { + const source = path.join(root, "prev.jsonl"); + const target = path.join(root, "next.jsonl"); + await fs.writeFile(target, j({ type: "session", id: "existing" }), "utf8"); + const lines: string[] = []; + for (let i = 0; i < DEFAULT_REPLAY_MAX_MESSAGES + 1; i += 1) { + lines.push(j({ message: { role: i % 2 === 0 ? "user" : "assistant", content: `m${i}` } })); + } + await fs.writeFile(source, lines.join(""), "utf8"); + + expect(await call(source, target)).toBe(DEFAULT_REPLAY_MAX_MESSAGES - 1); + const records = (await fs.readFile(target, "utf8")) + .split(/\r?\n/) + .filter((line) => line.trim().length > 0) + .map((line) => JSON.parse(line)); + expect(records.filter((r) => r.type === "session")).toHaveLength(1); + expect(records[0]).toMatchObject({ id: "existing" }); + expect(records[1].message.role).toBe("user"); }); }); diff --git a/src/auto-reply/reply/session-transcript-replay.ts b/src/auto-reply/reply/session-transcript-replay.ts index e40fdb38b94..5af998e0c8d 100644 --- a/src/auto-reply/reply/session-transcript-replay.ts +++ b/src/auto-reply/reply/session-transcript-replay.ts @@ -1,4 +1,5 @@ import fs from "node:fs"; +import fsp from "node:fs/promises"; import path from "node:path"; import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; @@ -6,33 +7,37 @@ import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; export const DEFAULT_REPLAY_MAX_MESSAGES = 6; type SessionRecord = { message?: { role?: unknown } }; +type KeptRecord = { role: "user" | "assistant"; line: string }; /** * Copy the tail of user/assistant JSONL records from a prior transcript into a * freshly-rotated one. Tool, system, and compaction records are skipped so - * replay cannot reshape tool/role ordering. Returns 0 on any error. + * replay cannot reshape tool/role ordering, and the tail is aligned to start + * with a user turn so role-ordering resets cannot immediately recur. Uses + * async I/O so long transcripts do not block the event loop. Returns 0 on + * any error. */ -export function replayRecentUserAssistantMessages(params: { +export async function replayRecentUserAssistantMessages(params: { sourceTranscript?: string; targetTranscript: string; newSessionId: string; maxMessages?: number; -}): number { +}): Promise { const max = Math.max(0, params.maxMessages ?? DEFAULT_REPLAY_MAX_MESSAGES); const src = params.sourceTranscript; if (max === 0 || !src || !fs.existsSync(src)) { return 0; } try { - const kept: string[] = []; - for (const line of fs.readFileSync(src, "utf-8").split(/\r?\n/)) { + const kept: KeptRecord[] = []; + for (const line of (await fsp.readFile(src, "utf-8")).split(/\r?\n/)) { if (!line.trim()) { continue; } try { const role = (JSON.parse(line) as SessionRecord | null)?.message?.role; if (role === "user" || role === "assistant") { - kept.push(line); + kept.push({ role, line }); } } catch { // Skip malformed lines. @@ -41,8 +46,13 @@ export function replayRecentUserAssistantMessages(params: { if (kept.length === 0) { return 0; } + let startIdx = Math.max(0, kept.length - max); + while (startIdx < kept.length - 1 && kept[startIdx].role === "assistant") { + startIdx += 1; + } + const tail = kept.slice(startIdx).map((entry) => entry.line); if (!fs.existsSync(params.targetTranscript)) { - fs.mkdirSync(path.dirname(params.targetTranscript), { recursive: true }); + await fsp.mkdir(path.dirname(params.targetTranscript), { recursive: true }); const header = JSON.stringify({ type: "session", version: CURRENT_SESSION_VERSION, @@ -50,13 +60,12 @@ export function replayRecentUserAssistantMessages(params: { timestamp: new Date().toISOString(), cwd: process.cwd(), }); - fs.writeFileSync(params.targetTranscript, `${header}\n`, { encoding: "utf-8", mode: 0o600 }); + await fsp.writeFile(params.targetTranscript, `${header}\n`, { + encoding: "utf-8", + mode: 0o600, + }); } - const tail = kept.slice(-max); - fs.appendFileSync(params.targetTranscript, `${tail.join("\n")}\n`, { - encoding: "utf-8", - mode: 0o600, - }); + await fsp.appendFile(params.targetTranscript, `${tail.join("\n")}\n`, "utf-8"); return tail.length; } catch { return 0;