diff --git a/CHANGELOG.md b/CHANGELOG.md index d16e2d53512..645a700eaf9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai - MCP/bundle-mcp: normalize CLI-native `type: "http"` MCP server entries to OpenClaw `transport: "streamable-http"` on save, repair existing configs with doctor, and keep embedded Pi from falling back to legacy SSE GET-first startup for those servers. Fixes #72757. Thanks @Studioscale. - Media-understanding/audio: migrate deprecated `{input}` placeholders in legacy `audio.transcription.command` configs to `{{MediaPath}}`, so custom audio transcribers no longer receive the literal placeholder after doctor repair. Fixes #72760. Thanks @krisfanue3-hash. - Ollama/onboarding: de-dupe suggested bare local models against installed `:latest` tags and skip redundant pulls, so setup shows the installed model once and no longer says it is downloading an already available model. Fixes #68952. Thanks @tleyden. +- Compaction: skip oversized pre-compaction checkpoint snapshots and prune duplicate long user turns from compaction input and rotated successor transcripts, preventing retry storms from being preserved across checkpoint cycles. Fixes #72780. Thanks @SweetSophia. - Control UI/Gateway: preserve WebChat client version labels across localhost, 127.0.0.1, and IPv6 loopback aliases on the same port, avoiding misleading `vcontrol-ui` connection logs while investigating duplicate-message reports. Refs #72753 and #72742. Thanks @LumenFromTheFuture and @allesgutefy. - Agents/reasoning: treat orphan closing reasoning tags with following answer text as a privacy boundary across delivery, history, streaming, and Control UI sanitizers so malformed local-model output cannot leak chain-of-thought text. Fixes #67092. Thanks @AnildoSilva. - Memory-core: run one-shot memory CLI commands through transient builtin and QMD managers so `memory index`, `memory status --index`, and `memory search` no longer start long-lived file watchers that can hit macOS `EMFILE` limits. Fixes #59101; carries forward #49851. Thanks @mbear469210-coder and @maoyuanxue. diff --git a/docs/concepts/compaction.md b/docs/concepts/compaction.md index f70af2e82c3..4b78806afa5 100644 --- a/docs/concepts/compaction.md +++ b/docs/concepts/compaction.md @@ -106,6 +106,13 @@ The byte guard requires `truncateAfterCompaction: true`. Without transcript rota ### Successor transcripts When `agents.defaults.compaction.truncateAfterCompaction` is enabled, OpenClaw does not rewrite the existing transcript in place. It creates a new active successor transcript from the compaction summary, preserved state, and unsummarized tail, then keeps the previous JSONL as the archived checkpoint source. +Successor transcripts also drop exact duplicate long user turns that arrive +inside a short retry window, so channel retry storms are not carried into the +next active transcript after compaction. + +Pre-compaction checkpoints are retained only while they stay below OpenClaw's +checkpoint size cap; oversized active transcripts still compact, but OpenClaw +skips the large debug snapshot instead of doubling disk usage. ### Compaction notices diff --git a/docs/reference/session-management-compaction.md b/docs/reference/session-management-compaction.md index aa83a33a418..1c9c5f7b6dc 100644 --- a/docs/reference/session-management-compaction.md +++ b/docs/reference/session-management-compaction.md @@ -50,6 +50,9 @@ OpenClaw persists sessions in two layers: - Append-only transcript with tree structure (entries have `id` + `parentId`) - Stores the actual conversation + tool calls + compaction summaries - Used to rebuild the model context for future turns + - Large pre-compaction debug checkpoints are skipped once the active + transcript exceeds the checkpoint size cap, avoiding a second giant + `.checkpoint.*.jsonl` copy. --- diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 8306f37b3c9..5d666f68d02 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -99,6 +99,7 @@ import { import { resolveSystemPromptOverride } from "../system-prompt-override.js"; import { classifyCompactionReason, resolveCompactionFailureReason } from "./compact-reasons.js"; import type { CompactEmbeddedPiSessionParams, CompactionMessageMetrics } from "./compact.types.js"; +import { dedupeDuplicateUserMessagesForCompaction } from "./compaction-duplicate-user-messages.js"; import { asCompactionHookRunner, buildBeforeCompactionHookMetrics, @@ -972,9 +973,10 @@ export async function compactEmbeddedPiSessionDirect( sessionId: params.sessionId, policy: transcriptPolicy, }); + const dedupedValidated = dedupeDuplicateUserMessagesForCompaction(validated); // Apply validated transcript to the live session even when no history limit is configured, // so compaction and hook metrics are based on the same message set. - session.agent.state.messages = validated; + session.agent.state.messages = dedupedValidated; // "Original" compaction metrics should describe the validated transcript that enters // limiting/compaction, not the raw on-disk session snapshot. const originalMessages = session.messages.slice(); diff --git a/src/agents/pi-embedded-runner/compaction-duplicate-user-messages.test.ts b/src/agents/pi-embedded-runner/compaction-duplicate-user-messages.test.ts new file mode 100644 index 00000000000..5d3257436b8 --- /dev/null +++ b/src/agents/pi-embedded-runner/compaction-duplicate-user-messages.test.ts @@ -0,0 +1,76 @@ +import { describe, expect, it } from "vitest"; +import { + collectDuplicateUserMessageEntryIdsForCompaction, + dedupeDuplicateUserMessagesForCompaction, +} from "./compaction-duplicate-user-messages.js"; + +describe("compaction duplicate user message pruning", () => { + it("drops identical long user messages inside the duplicate window", () => { + const first = { + role: "user", + content: "please run the deployment status check for production", + timestamp: 1_000, + } as const; + const second = { + role: "user", + content: " please run the deployment status check for production ", + timestamp: 2_000, + } as const; + const third = { + role: "assistant", + content: [{ type: "text", text: "checking" }], + timestamp: 3_000, + } as const; + + expect(dedupeDuplicateUserMessagesForCompaction([first, second, third])).toEqual([ + first, + third, + ]); + }); + + it("keeps short repeated acknowledgements and distant repeats", () => { + const short = { role: "user", content: "next", timestamp: 1_000 } as const; + const shortAgain = { role: "user", content: "next", timestamp: 2_000 } as const; + const long = { + role: "user", + content: "please run the deployment status check for production", + timestamp: 1_000, + } as const; + const longLater = { + role: "user", + content: "please run the deployment status check for production", + timestamp: 70_000, + } as const; + + expect(dedupeDuplicateUserMessagesForCompaction([short, shortAgain])).toEqual([ + short, + shortAgain, + ]); + expect(dedupeDuplicateUserMessagesForCompaction([long, longLater])).toEqual([long, longLater]); + }); + + it("collects duplicate transcript entry ids from active branch entries", () => { + const duplicateIds = collectDuplicateUserMessageEntryIdsForCompaction([ + { + id: "entry-1", + type: "message", + message: { + role: "user", + content: "please run the deployment status check for production", + timestamp: 1_000, + }, + }, + { + id: "entry-2", + type: "message", + message: { + role: "user", + content: "please run the deployment status check for production", + timestamp: 2_000, + }, + }, + ]); + + expect(duplicateIds).toEqual(new Set(["entry-2"])); + }); +}); diff --git a/src/agents/pi-embedded-runner/compaction-duplicate-user-messages.ts b/src/agents/pi-embedded-runner/compaction-duplicate-user-messages.ts new file mode 100644 index 00000000000..9b978859a5a --- /dev/null +++ b/src/agents/pi-embedded-runner/compaction-duplicate-user-messages.ts @@ -0,0 +1,109 @@ +const DEFAULT_DUPLICATE_USER_MESSAGE_WINDOW_MS = 60_000; +const MIN_DUPLICATE_USER_MESSAGE_CHARS = 24; + +type MessageLike = { + role?: unknown; + content?: unknown; + timestamp?: unknown; +}; + +type EntryLike = { + id?: unknown; + type?: unknown; + message?: unknown; +}; + +type DuplicateUserMessageOptions = { + windowMs?: number; +}; + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === "object" && !Array.isArray(value); +} + +function normalizeUserMessageContent(content: unknown): string | undefined { + if (typeof content === "string") { + return content.replace(/\s+/g, " ").trim(); + } + if (!Array.isArray(content)) { + return undefined; + } + const textParts: string[] = []; + for (const block of content) { + if (!isRecord(block)) { + return undefined; + } + if (block.type === "image") { + return undefined; + } + if (block.type === "text" && typeof block.text === "string") { + textParts.push(block.text); + } + } + return textParts.join("\n").replace(/\s+/g, " ").trim(); +} + +function duplicateSignature(message: unknown): { key: string; timestamp: number } | undefined { + if (!isRecord(message) || message.role !== "user" || typeof message.timestamp !== "number") { + return undefined; + } + const text = normalizeUserMessageContent(message.content); + if (!text || text.length < MIN_DUPLICATE_USER_MESSAGE_CHARS) { + return undefined; + } + return { + key: text.normalize("NFC").toLowerCase(), + timestamp: message.timestamp, + }; +} + +export function dedupeDuplicateUserMessagesForCompaction( + messages: readonly T[], + options: DuplicateUserMessageOptions = {}, +): T[] { + const windowMs = options.windowMs ?? DEFAULT_DUPLICATE_USER_MESSAGE_WINDOW_MS; + const lastSeenAtByKey = new Map(); + let removed = 0; + const result: T[] = []; + for (const message of messages) { + const signature = duplicateSignature(message); + if (!signature) { + result.push(message); + continue; + } + const lastSeenAt = lastSeenAtByKey.get(signature.key); + lastSeenAtByKey.set(signature.key, signature.timestamp); + if (typeof lastSeenAt === "number" && signature.timestamp - lastSeenAt <= windowMs) { + removed += 1; + continue; + } + result.push(message); + } + return removed > 0 ? result : [...messages]; +} + +export function collectDuplicateUserMessageEntryIdsForCompaction( + entries: readonly EntryLike[], + options: DuplicateUserMessageOptions = {}, +): Set { + const windowMs = options.windowMs ?? DEFAULT_DUPLICATE_USER_MESSAGE_WINDOW_MS; + const lastSeenAtByKey = new Map(); + const duplicateIds = new Set(); + for (const entry of entries) { + if (entry.type !== "message" || typeof entry.id !== "string") { + continue; + } + const signature = duplicateSignature( + isRecord(entry.message) ? (entry.message as MessageLike) : undefined, + ); + if (!signature) { + continue; + } + const lastSeenAt = lastSeenAtByKey.get(signature.key); + lastSeenAtByKey.set(signature.key, signature.timestamp); + if (typeof lastSeenAt === "number" && signature.timestamp - lastSeenAt <= windowMs) { + duplicateIds.add(entry.id); + } + } + return duplicateIds; +} diff --git a/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts b/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts index 3329e3f10c8..94524628e80 100644 --- a/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts +++ b/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts @@ -153,6 +153,39 @@ describe("rotateTranscriptAfterCompaction", () => { expect(successor.getSessionName()).toBe("current title"); }); + it("drops duplicate user messages from the rotated active branch tail", async () => { + const dir = await createTmpDir(); + const manager = SessionManager.create(dir, dir); + manager.appendMessage({ role: "user", content: "old user", timestamp: 1 }); + const firstKeptId = manager.appendMessage(makeAssistant("old assistant", 2)); + manager.appendCompaction("Summary of old work.", firstKeptId, 5000); + const firstDuplicateId = manager.appendMessage({ + role: "user", + content: "please run the deployment status check for production", + timestamp: 3_000, + }); + const secondDuplicateId = manager.appendMessage({ + role: "user", + content: " please run the deployment status check for production ", + timestamp: 4_000, + }); + manager.appendMessage(makeAssistant("status checked", 5_000)); + + const result = await rotateTranscriptAfterCompaction({ + sessionManager: manager, + sessionFile: manager.getSessionFile()!, + now: () => new Date("2026-04-27T12:10:00.000Z"), + }); + + expect(result.rotated).toBe(true); + const successor = SessionManager.open(result.sessionFile!); + const entries = successor.getEntries(); + expect(entries.find((entry) => entry.id === firstDuplicateId)).toBeDefined(); + expect(entries.find((entry) => entry.id === secondDuplicateId)).toBeUndefined(); + const contextText = JSON.stringify(successor.buildSessionContext().messages); + expect(contextText.match(/deployment status check/g)).toHaveLength(1); + }); + it("skips sessions with no compaction entry", async () => { const dir = await createTmpDir(); const manager = SessionManager.create(dir, dir); diff --git a/src/agents/pi-embedded-runner/compaction-successor-transcript.ts b/src/agents/pi-embedded-runner/compaction-successor-transcript.ts index 1e6dd00347a..878e4567599 100644 --- a/src/agents/pi-embedded-runner/compaction-successor-transcript.ts +++ b/src/agents/pi-embedded-runner/compaction-successor-transcript.ts @@ -9,6 +9,7 @@ import { type SessionHeader, } from "@mariozechner/pi-coding-agent"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import { collectDuplicateUserMessageEntryIdsForCompaction } from "./compaction-duplicate-user-messages.js"; type ReadonlySessionManagerForRotation = Pick< SessionManager, @@ -126,10 +127,12 @@ function buildSuccessorEntries(params: { } const removedIds = new Set(); + const duplicateUserMessageIds = collectDuplicateUserMessageEntryIdsForCompaction(branch); for (const entry of allEntries) { if ( (summarizedBranchIds.has(entry.id) && entry.type === "message") || - staleStateEntryIds.has(entry.id) + staleStateEntryIds.has(entry.id) || + duplicateUserMessageIds.has(entry.id) ) { removedIds.add(entry.id); } diff --git a/src/gateway/session-compaction-checkpoints.test.ts b/src/gateway/session-compaction-checkpoints.test.ts index 28b5de4409b..8cc32f415f7 100644 --- a/src/gateway/session-compaction-checkpoints.test.ts +++ b/src/gateway/session-compaction-checkpoints.test.ts @@ -9,6 +9,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; import { captureCompactionCheckpointSnapshot, cleanupCompactionCheckpointSnapshot, + MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES, persistSessionCompactionCheckpoint, } from "./session-compaction-checkpoints.js"; @@ -84,6 +85,31 @@ describe("session-compaction-checkpoints", () => { expect(fsSync.existsSync(sessionFile!)).toBe(true); }); + test("capture skips oversized pre-compaction transcripts", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-oversized-")); + tempDirs.push(dir); + + const session = SessionManager.create(dir, dir); + session.appendMessage({ + role: "user", + content: "before compaction", + timestamp: Date.now(), + }); + const sessionFile = session.getSessionFile(); + expect(sessionFile).toBeTruthy(); + await fs.appendFile(sessionFile!, "x".repeat(128), "utf-8"); + + const snapshot = captureCompactionCheckpointSnapshot({ + sessionManager: session, + sessionFile: sessionFile!, + maxBytes: 64, + }); + + expect(snapshot).toBeNull(); + expect(MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES).toBeGreaterThan(64); + expect(fsSync.readdirSync(dir).filter((file) => file.includes(".checkpoint."))).toEqual([]); + }); + test("persist trims old checkpoint metadata and removes trimmed snapshot files", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-trim-")); tempDirs.push(dir); diff --git a/src/gateway/session-compaction-checkpoints.ts b/src/gateway/session-compaction-checkpoints.ts index 6c9a3adaeae..50680dd6a0c 100644 --- a/src/gateway/session-compaction-checkpoints.ts +++ b/src/gateway/session-compaction-checkpoints.ts @@ -16,6 +16,7 @@ import { resolveGatewaySessionStoreTarget } from "./session-utils.js"; const log = createSubsystemLogger("gateway/session-compaction-checkpoints"); const MAX_COMPACTION_CHECKPOINTS_PER_SESSION = 25; +export const MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES = 64 * 1024 * 1024; export type CapturedCompactionCheckpointSnapshot = { sessionId: string; @@ -62,6 +63,7 @@ export function resolveSessionCompactionCheckpointReason(params: { export function captureCompactionCheckpointSnapshot(params: { sessionManager: Pick; sessionFile: string; + maxBytes?: number; }): CapturedCompactionCheckpointSnapshot | null { const getLeafId = params.sessionManager && typeof params.sessionManager.getLeafId === "function" @@ -71,6 +73,15 @@ export function captureCompactionCheckpointSnapshot(params: { if (!getLeafId || !sessionFile) { return null; } + const maxBytes = params.maxBytes ?? MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES; + try { + const stat = fsSync.statSync(sessionFile); + if (!stat.isFile() || stat.size > maxBytes) { + return null; + } + } catch { + return null; + } const leafId = getLeafId(); if (!leafId) { return null;