fix(auto-reply): preserve DM continuity across silent session rotations (#70898)

Merged via squash.

Prepared head SHA: 13bd2cef86
Co-authored-by: neeravmakwana <261249544+neeravmakwana@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
Neerav Makwana
2026-04-28 13:01:15 -04:00
committed by GitHub
parent 7120f5b254
commit 3de5476f51
4 changed files with 220 additions and 0 deletions

View File

@@ -32,6 +32,10 @@ Docs: https://docs.openclaw.ai
- Channels/Telegram: persist native command metadata on target sessions so topic, helper, and ACP-bound slash commands keep their session metadata attached to the routed conversation. (#57548) Thanks @GaosCode.
- Channels/native commands: keep validated native slash command replies visible in group chats while preserving explicit owner allowlists for command authorization. (#73672) Thanks @obviyus.
### Fixes
- Auto-reply/session: carry the tail of user/assistant turns into the freshly-rotated transcript on silent in-reply session resets (compaction failure, role-ordering conflict) so direct-chat continuity survives the rebind. Fixes #70853. (#70898) Thanks @neeravmakwana.
## 2026.4.27
### Changes

View File

@@ -10,6 +10,7 @@ import {
import { generateSecureUuid } from "../../infra/secure-random.js";
import { defaultRuntime } from "../../runtime.js";
import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js";
import { replayRecentUserAssistantMessages } from "./session-transcript-replay.js";
type ResetSessionOptions = {
failureLabel: string;
@@ -96,6 +97,13 @@ export async function resetReplyRunSession(params: {
`Failed to persist session reset after ${params.options.failureLabel} (${params.sessionKey}): ${String(err)}`,
);
}
// Silent rotations (compaction/role-ordering) fire without user intent, so
// preserve recent user/assistant turns for direct-chat continuity.
await replayRecentUserAssistantMessages({
sourceTranscript: prevEntry.sessionFile,
targetTranscript: nextSessionFile,
newSessionId: nextSessionId,
});
params.followupRun.run.sessionId = nextSessionId;
params.followupRun.run.sessionFile = nextSessionFile;
deps.refreshQueuedFollowupSession({

View File

@@ -0,0 +1,115 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
DEFAULT_REPLAY_MAX_MESSAGES,
replayRecentUserAssistantMessages,
} from "./session-transcript-replay.js";
const j = (obj: unknown): string => `${JSON.stringify(obj)}\n`;
describe("replayRecentUserAssistantMessages", () => {
let root = "";
beforeEach(async () => {
root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-replay-"));
});
afterEach(async () => {
await fs.rm(root, { recursive: true, force: true });
});
const call = (source: string, target: string): Promise<number> =>
replayRecentUserAssistantMessages({
sourceTranscript: source,
targetTranscript: target,
newSessionId: "new-session",
});
it("replays only the user/assistant tail and skips tool/system/malformed records", async () => {
const source = path.join(root, "prev.jsonl");
const target = path.join(root, "next.jsonl");
const lines: string[] = [j({ type: "session", id: "old" })];
for (let i = 0; i < DEFAULT_REPLAY_MAX_MESSAGES + 4; i += 1) {
lines.push(j({ message: { role: i % 2 === 0 ? "user" : "assistant", content: `m${i}` } }));
}
lines.push(j({ message: { role: "tool" } }));
lines.push(j({ type: "compaction", timestamp: new Date().toISOString() }));
lines.push("not-json-line\n");
await fs.writeFile(source, lines.join(""), "utf8");
expect(await call(source, target)).toBe(DEFAULT_REPLAY_MAX_MESSAGES);
const records = (await fs.readFile(target, "utf8"))
.split(/\r?\n/)
.filter((line) => line.trim().length > 0)
.map((line) => JSON.parse(line));
expect(records[0]).toMatchObject({ type: "session", id: "new-session" });
expect(records).toHaveLength(1 + DEFAULT_REPLAY_MAX_MESSAGES);
for (const r of records.slice(1)) {
expect(["user", "assistant"]).toContain(r.message.role);
}
expect(await call(path.join(root, "missing.jsonl"), path.join(root, "out.jsonl"))).toBe(0);
const assistantSource = path.join(root, "all-assistant.jsonl");
const assistantTarget = path.join(root, "all-assistant-out.jsonl");
const onlyAssistants = Array.from({ length: 3 }, () =>
j({ message: { role: "assistant", content: "x" } }),
).join("");
await fs.writeFile(assistantSource, onlyAssistants, "utf8");
expect(await call(assistantSource, assistantTarget)).toBe(0);
await expect(fs.stat(assistantTarget)).rejects.toThrow();
});
it("skips header for pre-existing targets and aligns the tail to a user turn", async () => {
const source = path.join(root, "prev.jsonl");
const target = path.join(root, "next.jsonl");
await fs.writeFile(target, j({ type: "session", id: "existing" }), "utf8");
const lines: string[] = [];
for (let i = 0; i < DEFAULT_REPLAY_MAX_MESSAGES + 1; i += 1) {
lines.push(j({ message: { role: i % 2 === 0 ? "user" : "assistant", content: `m${i}` } }));
}
await fs.writeFile(source, lines.join(""), "utf8");
expect(await call(source, target)).toBe(DEFAULT_REPLAY_MAX_MESSAGES - 1);
const records = (await fs.readFile(target, "utf8"))
.split(/\r?\n/)
.filter((line) => line.trim().length > 0)
.map((line) => JSON.parse(line));
expect(records.filter((r) => r.type === "session")).toHaveLength(1);
expect(records[0]).toMatchObject({ id: "existing" });
expect(records[1].message.role).toBe("user");
});
it("coalesces same-role runs so replayed records strictly alternate", async () => {
const source = path.join(root, "prev.jsonl");
const target = path.join(root, "next.jsonl");
await fs.writeFile(
source,
[
j({ message: { role: "user", content: "older user" } }),
j({ message: { role: "user", content: "latest user" } }),
j({ message: { role: "assistant", content: "older assistant" } }),
j({ message: { role: "assistant", content: "latest assistant" } }),
j({ message: { role: "user", content: "follow-up" } }),
j({ message: { role: "assistant", content: "answer" } }),
].join(""),
"utf8",
);
expect(await call(source, target)).toBe(4);
const records = (await fs.readFile(target, "utf8"))
.split(/\r?\n/)
.filter((line) => line.trim().length > 0)
.map((line) => JSON.parse(line));
expect(records.slice(1).map((r) => r.message.role)).toEqual([
"user",
"assistant",
"user",
"assistant",
]);
expect(records.slice(1).map((r) => r.message.content)).toEqual([
"latest user",
"latest assistant",
"follow-up",
"answer",
]);
});
});

View File

@@ -0,0 +1,93 @@
import fs from "node:fs";
import fsp from "node:fs/promises";
import path from "node:path";
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
/** Tail kept so DM continuity survives silent session rotations. */
export const DEFAULT_REPLAY_MAX_MESSAGES = 6;
type SessionRecord = { message?: { role?: unknown } };
type KeptRecord = { role: "user" | "assistant"; line: string };
/**
* Copy the tail of user/assistant JSONL records from a prior transcript into a
* freshly-rotated one. Tool, system, and compaction records are skipped so
* replay cannot reshape tool/role ordering, and the tail is aligned and
* coalesced into alternating user/assistant turns so role-ordering resets
* cannot immediately recur. Uses async I/O so long transcripts do not block
* the event loop. Returns 0 on any error.
*/
export async function replayRecentUserAssistantMessages(params: {
sourceTranscript?: string;
targetTranscript: string;
newSessionId: string;
maxMessages?: number;
}): Promise<number> {
const max = Math.max(0, params.maxMessages ?? DEFAULT_REPLAY_MAX_MESSAGES);
const src = params.sourceTranscript;
if (max === 0 || !src || !fs.existsSync(src)) {
return 0;
}
try {
const kept: KeptRecord[] = [];
for (const line of (await fsp.readFile(src, "utf-8")).split(/\r?\n/)) {
if (!line.trim()) {
continue;
}
try {
const role = (JSON.parse(line) as SessionRecord | null)?.message?.role;
if (role === "user" || role === "assistant") {
kept.push({ role, line });
}
} catch {
// Skip malformed lines.
}
}
if (kept.length === 0) {
return 0;
}
let startIdx = Math.max(0, kept.length - max);
while (startIdx < kept.length && kept[startIdx].role === "assistant") {
startIdx += 1;
}
if (startIdx === kept.length) {
// Retained window is assistant-only; replaying would re-create the same
// role-ordering hazard this reset path is recovering from.
return 0;
}
const tail = coalesceAlternatingReplayTail(kept.slice(startIdx)).map((entry) => entry.line);
if (!fs.existsSync(params.targetTranscript)) {
await fsp.mkdir(path.dirname(params.targetTranscript), { recursive: true });
const header = JSON.stringify({
type: "session",
version: CURRENT_SESSION_VERSION,
id: params.newSessionId,
timestamp: new Date().toISOString(),
cwd: process.cwd(),
});
await fsp.writeFile(params.targetTranscript, `${header}\n`, {
encoding: "utf-8",
mode: 0o600,
});
}
await fsp.appendFile(params.targetTranscript, `${tail.join("\n")}\n`, "utf-8");
return tail.length;
} catch {
return 0;
}
}
// Keep the newest record from each same-role run, preserving original JSONL bytes
// for replay while ensuring strict provider alternation.
function coalesceAlternatingReplayTail(entries: KeptRecord[]): KeptRecord[] {
const tail: KeptRecord[] = [];
for (const entry of entries) {
const lastIdx = tail.length - 1;
if (lastIdx >= 0 && tail[lastIdx]?.role === entry.role) {
tail[lastIdx] = entry;
continue;
}
tail.push(entry);
}
return tail;
}