From 3de5476f5145046c0e597b3c76691b0beb8ffaab Mon Sep 17 00:00:00 2001 From: Neerav Makwana Date: Tue, 28 Apr 2026 13:01:15 -0400 Subject: [PATCH] fix(auto-reply): preserve DM continuity across silent session rotations (#70898) Merged via squash. Prepared head SHA: 13bd2cef860d37b05e606194216583da234ff624 Co-authored-by: neeravmakwana <261249544+neeravmakwana@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 4 + .../reply/agent-runner-session-reset.ts | 8 ++ .../reply/session-transcript-replay.test.ts | 115 ++++++++++++++++++ .../reply/session-transcript-replay.ts | 93 ++++++++++++++ 4 files changed, 220 insertions(+) create mode 100644 src/auto-reply/reply/session-transcript-replay.test.ts create mode 100644 src/auto-reply/reply/session-transcript-replay.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a380dbc08a..0eec478d0a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,10 @@ Docs: https://docs.openclaw.ai - Channels/Telegram: persist native command metadata on target sessions so topic, helper, and ACP-bound slash commands keep their session metadata attached to the routed conversation. (#57548) Thanks @GaosCode. - Channels/native commands: keep validated native slash command replies visible in group chats while preserving explicit owner allowlists for command authorization. (#73672) Thanks @obviyus. +### Fixes + +- Auto-reply/session: carry the tail of user/assistant turns into the freshly-rotated transcript on silent in-reply session resets (compaction failure, role-ordering conflict) so direct-chat continuity survives the rebind. Fixes #70853. (#70898) Thanks @neeravmakwana. + ## 2026.4.27 ### Changes diff --git a/src/auto-reply/reply/agent-runner-session-reset.ts b/src/auto-reply/reply/agent-runner-session-reset.ts index 3a7eaed9601..55df3cbc5a1 100644 --- a/src/auto-reply/reply/agent-runner-session-reset.ts +++ b/src/auto-reply/reply/agent-runner-session-reset.ts @@ -10,6 +10,7 @@ import { import { generateSecureUuid } from "../../infra/secure-random.js"; import { defaultRuntime } from "../../runtime.js"; import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js"; +import { replayRecentUserAssistantMessages } from "./session-transcript-replay.js"; type ResetSessionOptions = { failureLabel: string; @@ -96,6 +97,13 @@ export async function resetReplyRunSession(params: { `Failed to persist session reset after ${params.options.failureLabel} (${params.sessionKey}): ${String(err)}`, ); } + // Silent rotations (compaction/role-ordering) fire without user intent, so + // preserve recent user/assistant turns for direct-chat continuity. + await replayRecentUserAssistantMessages({ + sourceTranscript: prevEntry.sessionFile, + targetTranscript: nextSessionFile, + newSessionId: nextSessionId, + }); params.followupRun.run.sessionId = nextSessionId; params.followupRun.run.sessionFile = nextSessionFile; deps.refreshQueuedFollowupSession({ diff --git a/src/auto-reply/reply/session-transcript-replay.test.ts b/src/auto-reply/reply/session-transcript-replay.test.ts new file mode 100644 index 00000000000..76d83a62a89 --- /dev/null +++ b/src/auto-reply/reply/session-transcript-replay.test.ts @@ -0,0 +1,115 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { + DEFAULT_REPLAY_MAX_MESSAGES, + replayRecentUserAssistantMessages, +} from "./session-transcript-replay.js"; + +const j = (obj: unknown): string => `${JSON.stringify(obj)}\n`; + +describe("replayRecentUserAssistantMessages", () => { + let root = ""; + beforeEach(async () => { + root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-replay-")); + }); + afterEach(async () => { + await fs.rm(root, { recursive: true, force: true }); + }); + const call = (source: string, target: string): Promise => + replayRecentUserAssistantMessages({ + sourceTranscript: source, + targetTranscript: target, + newSessionId: "new-session", + }); + + it("replays only the user/assistant tail and skips tool/system/malformed records", async () => { + const source = path.join(root, "prev.jsonl"); + const target = path.join(root, "next.jsonl"); + const lines: string[] = [j({ type: "session", id: "old" })]; + for (let i = 0; i < DEFAULT_REPLAY_MAX_MESSAGES + 4; i += 1) { + lines.push(j({ message: { role: i % 2 === 0 ? "user" : "assistant", content: `m${i}` } })); + } + lines.push(j({ message: { role: "tool" } })); + lines.push(j({ type: "compaction", timestamp: new Date().toISOString() })); + lines.push("not-json-line\n"); + await fs.writeFile(source, lines.join(""), "utf8"); + + expect(await call(source, target)).toBe(DEFAULT_REPLAY_MAX_MESSAGES); + const records = (await fs.readFile(target, "utf8")) + .split(/\r?\n/) + .filter((line) => line.trim().length > 0) + .map((line) => JSON.parse(line)); + expect(records[0]).toMatchObject({ type: "session", id: "new-session" }); + expect(records).toHaveLength(1 + DEFAULT_REPLAY_MAX_MESSAGES); + for (const r of records.slice(1)) { + expect(["user", "assistant"]).toContain(r.message.role); + } + expect(await call(path.join(root, "missing.jsonl"), path.join(root, "out.jsonl"))).toBe(0); + + const assistantSource = path.join(root, "all-assistant.jsonl"); + const assistantTarget = path.join(root, "all-assistant-out.jsonl"); + const onlyAssistants = Array.from({ length: 3 }, () => + j({ message: { role: "assistant", content: "x" } }), + ).join(""); + await fs.writeFile(assistantSource, onlyAssistants, "utf8"); + expect(await call(assistantSource, assistantTarget)).toBe(0); + await expect(fs.stat(assistantTarget)).rejects.toThrow(); + }); + + it("skips header for pre-existing targets and aligns the tail to a user turn", async () => { + const source = path.join(root, "prev.jsonl"); + const target = path.join(root, "next.jsonl"); + await fs.writeFile(target, j({ type: "session", id: "existing" }), "utf8"); + const lines: string[] = []; + for (let i = 0; i < DEFAULT_REPLAY_MAX_MESSAGES + 1; i += 1) { + lines.push(j({ message: { role: i % 2 === 0 ? "user" : "assistant", content: `m${i}` } })); + } + await fs.writeFile(source, lines.join(""), "utf8"); + + expect(await call(source, target)).toBe(DEFAULT_REPLAY_MAX_MESSAGES - 1); + const records = (await fs.readFile(target, "utf8")) + .split(/\r?\n/) + .filter((line) => line.trim().length > 0) + .map((line) => JSON.parse(line)); + expect(records.filter((r) => r.type === "session")).toHaveLength(1); + expect(records[0]).toMatchObject({ id: "existing" }); + expect(records[1].message.role).toBe("user"); + }); + + it("coalesces same-role runs so replayed records strictly alternate", async () => { + const source = path.join(root, "prev.jsonl"); + const target = path.join(root, "next.jsonl"); + await fs.writeFile( + source, + [ + j({ message: { role: "user", content: "older user" } }), + j({ message: { role: "user", content: "latest user" } }), + j({ message: { role: "assistant", content: "older assistant" } }), + j({ message: { role: "assistant", content: "latest assistant" } }), + j({ message: { role: "user", content: "follow-up" } }), + j({ message: { role: "assistant", content: "answer" } }), + ].join(""), + "utf8", + ); + + expect(await call(source, target)).toBe(4); + const records = (await fs.readFile(target, "utf8")) + .split(/\r?\n/) + .filter((line) => line.trim().length > 0) + .map((line) => JSON.parse(line)); + expect(records.slice(1).map((r) => r.message.role)).toEqual([ + "user", + "assistant", + "user", + "assistant", + ]); + expect(records.slice(1).map((r) => r.message.content)).toEqual([ + "latest user", + "latest assistant", + "follow-up", + "answer", + ]); + }); +}); diff --git a/src/auto-reply/reply/session-transcript-replay.ts b/src/auto-reply/reply/session-transcript-replay.ts new file mode 100644 index 00000000000..b7afdbc9763 --- /dev/null +++ b/src/auto-reply/reply/session-transcript-replay.ts @@ -0,0 +1,93 @@ +import fs from "node:fs"; +import fsp from "node:fs/promises"; +import path from "node:path"; +import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; + +/** Tail kept so DM continuity survives silent session rotations. */ +export const DEFAULT_REPLAY_MAX_MESSAGES = 6; + +type SessionRecord = { message?: { role?: unknown } }; +type KeptRecord = { role: "user" | "assistant"; line: string }; + +/** + * Copy the tail of user/assistant JSONL records from a prior transcript into a + * freshly-rotated one. Tool, system, and compaction records are skipped so + * replay cannot reshape tool/role ordering, and the tail is aligned and + * coalesced into alternating user/assistant turns so role-ordering resets + * cannot immediately recur. Uses async I/O so long transcripts do not block + * the event loop. Returns 0 on any error. + */ +export async function replayRecentUserAssistantMessages(params: { + sourceTranscript?: string; + targetTranscript: string; + newSessionId: string; + maxMessages?: number; +}): Promise { + const max = Math.max(0, params.maxMessages ?? DEFAULT_REPLAY_MAX_MESSAGES); + const src = params.sourceTranscript; + if (max === 0 || !src || !fs.existsSync(src)) { + return 0; + } + try { + const kept: KeptRecord[] = []; + for (const line of (await fsp.readFile(src, "utf-8")).split(/\r?\n/)) { + if (!line.trim()) { + continue; + } + try { + const role = (JSON.parse(line) as SessionRecord | null)?.message?.role; + if (role === "user" || role === "assistant") { + kept.push({ role, line }); + } + } catch { + // Skip malformed lines. + } + } + if (kept.length === 0) { + return 0; + } + let startIdx = Math.max(0, kept.length - max); + while (startIdx < kept.length && kept[startIdx].role === "assistant") { + startIdx += 1; + } + if (startIdx === kept.length) { + // Retained window is assistant-only; replaying would re-create the same + // role-ordering hazard this reset path is recovering from. + return 0; + } + const tail = coalesceAlternatingReplayTail(kept.slice(startIdx)).map((entry) => entry.line); + if (!fs.existsSync(params.targetTranscript)) { + await fsp.mkdir(path.dirname(params.targetTranscript), { recursive: true }); + const header = JSON.stringify({ + type: "session", + version: CURRENT_SESSION_VERSION, + id: params.newSessionId, + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }); + await fsp.writeFile(params.targetTranscript, `${header}\n`, { + encoding: "utf-8", + mode: 0o600, + }); + } + await fsp.appendFile(params.targetTranscript, `${tail.join("\n")}\n`, "utf-8"); + return tail.length; + } catch { + return 0; + } +} + +// Keep the newest record from each same-role run, preserving original JSONL bytes +// for replay while ensuring strict provider alternation. +function coalesceAlternatingReplayTail(entries: KeptRecord[]): KeptRecord[] { + const tail: KeptRecord[] = []; + for (const entry of entries) { + const lastIdx = tail.length - 1; + if (lastIdx >= 0 && tail[lastIdx]?.role === entry.role) { + tail[lastIdx] = entry; + continue; + } + tail.push(entry); + } + return tail; +}