mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 10:20:42 +00:00
fix(compaction): avoid preserving duplicate user turns
This commit is contained in:
@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
|
||||
- MCP/bundle-mcp: normalize CLI-native `type: "http"` MCP server entries to OpenClaw `transport: "streamable-http"` on save, repair existing configs with doctor, and keep embedded Pi from falling back to legacy SSE GET-first startup for those servers. Fixes #72757. Thanks @Studioscale.
|
||||
- Media-understanding/audio: migrate deprecated `{input}` placeholders in legacy `audio.transcription.command` configs to `{{MediaPath}}`, so custom audio transcribers no longer receive the literal placeholder after doctor repair. Fixes #72760. Thanks @krisfanue3-hash.
|
||||
- Ollama/onboarding: de-dupe suggested bare local models against installed `:latest` tags and skip redundant pulls, so setup shows the installed model once and no longer says it is downloading an already available model. Fixes #68952. Thanks @tleyden.
|
||||
- Compaction: skip oversized pre-compaction checkpoint snapshots and prune duplicate long user turns from compaction input and rotated successor transcripts, preventing retry storms from being preserved across checkpoint cycles. Fixes #72780. Thanks @SweetSophia.
|
||||
- Control UI/Gateway: preserve WebChat client version labels across localhost, 127.0.0.1, and IPv6 loopback aliases on the same port, avoiding misleading `vcontrol-ui` connection logs while investigating duplicate-message reports. Refs #72753 and #72742. Thanks @LumenFromTheFuture and @allesgutefy.
|
||||
- Agents/reasoning: treat orphan closing reasoning tags with following answer text as a privacy boundary across delivery, history, streaming, and Control UI sanitizers so malformed local-model output cannot leak chain-of-thought text. Fixes #67092. Thanks @AnildoSilva.
|
||||
- Memory-core: run one-shot memory CLI commands through transient builtin and QMD managers so `memory index`, `memory status --index`, and `memory search` no longer start long-lived file watchers that can hit macOS `EMFILE` limits. Fixes #59101; carries forward #49851. Thanks @mbear469210-coder and @maoyuanxue.
|
||||
|
||||
@@ -106,6 +106,13 @@ The byte guard requires `truncateAfterCompaction: true`. Without transcript rota
|
||||
### Successor transcripts
|
||||
|
||||
When `agents.defaults.compaction.truncateAfterCompaction` is enabled, OpenClaw does not rewrite the existing transcript in place. It creates a new active successor transcript from the compaction summary, preserved state, and unsummarized tail, then keeps the previous JSONL as the archived checkpoint source.
|
||||
Successor transcripts also drop exact duplicate long user turns that arrive
|
||||
inside a short retry window, so channel retry storms are not carried into the
|
||||
next active transcript after compaction.
|
||||
|
||||
Pre-compaction checkpoints are retained only while they stay below OpenClaw's
|
||||
checkpoint size cap; oversized active transcripts still compact, but OpenClaw
|
||||
skips the large debug snapshot instead of doubling disk usage.
|
||||
|
||||
### Compaction notices
|
||||
|
||||
|
||||
@@ -50,6 +50,9 @@ OpenClaw persists sessions in two layers:
|
||||
- Append-only transcript with tree structure (entries have `id` + `parentId`)
|
||||
- Stores the actual conversation + tool calls + compaction summaries
|
||||
- Used to rebuild the model context for future turns
|
||||
- Large pre-compaction debug checkpoints are skipped once the active
|
||||
transcript exceeds the checkpoint size cap, avoiding a second giant
|
||||
`.checkpoint.*.jsonl` copy.
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -99,6 +99,7 @@ import {
|
||||
import { resolveSystemPromptOverride } from "../system-prompt-override.js";
|
||||
import { classifyCompactionReason, resolveCompactionFailureReason } from "./compact-reasons.js";
|
||||
import type { CompactEmbeddedPiSessionParams, CompactionMessageMetrics } from "./compact.types.js";
|
||||
import { dedupeDuplicateUserMessagesForCompaction } from "./compaction-duplicate-user-messages.js";
|
||||
import {
|
||||
asCompactionHookRunner,
|
||||
buildBeforeCompactionHookMetrics,
|
||||
@@ -972,9 +973,10 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
sessionId: params.sessionId,
|
||||
policy: transcriptPolicy,
|
||||
});
|
||||
const dedupedValidated = dedupeDuplicateUserMessagesForCompaction(validated);
|
||||
// Apply validated transcript to the live session even when no history limit is configured,
|
||||
// so compaction and hook metrics are based on the same message set.
|
||||
session.agent.state.messages = validated;
|
||||
session.agent.state.messages = dedupedValidated;
|
||||
// "Original" compaction metrics should describe the validated transcript that enters
|
||||
// limiting/compaction, not the raw on-disk session snapshot.
|
||||
const originalMessages = session.messages.slice();
|
||||
|
||||
@@ -0,0 +1,76 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
collectDuplicateUserMessageEntryIdsForCompaction,
|
||||
dedupeDuplicateUserMessagesForCompaction,
|
||||
} from "./compaction-duplicate-user-messages.js";
|
||||
|
||||
describe("compaction duplicate user message pruning", () => {
|
||||
it("drops identical long user messages inside the duplicate window", () => {
|
||||
const first = {
|
||||
role: "user",
|
||||
content: "please run the deployment status check for production",
|
||||
timestamp: 1_000,
|
||||
} as const;
|
||||
const second = {
|
||||
role: "user",
|
||||
content: " please run the deployment status check for production ",
|
||||
timestamp: 2_000,
|
||||
} as const;
|
||||
const third = {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "checking" }],
|
||||
timestamp: 3_000,
|
||||
} as const;
|
||||
|
||||
expect(dedupeDuplicateUserMessagesForCompaction([first, second, third])).toEqual([
|
||||
first,
|
||||
third,
|
||||
]);
|
||||
});
|
||||
|
||||
it("keeps short repeated acknowledgements and distant repeats", () => {
|
||||
const short = { role: "user", content: "next", timestamp: 1_000 } as const;
|
||||
const shortAgain = { role: "user", content: "next", timestamp: 2_000 } as const;
|
||||
const long = {
|
||||
role: "user",
|
||||
content: "please run the deployment status check for production",
|
||||
timestamp: 1_000,
|
||||
} as const;
|
||||
const longLater = {
|
||||
role: "user",
|
||||
content: "please run the deployment status check for production",
|
||||
timestamp: 70_000,
|
||||
} as const;
|
||||
|
||||
expect(dedupeDuplicateUserMessagesForCompaction([short, shortAgain])).toEqual([
|
||||
short,
|
||||
shortAgain,
|
||||
]);
|
||||
expect(dedupeDuplicateUserMessagesForCompaction([long, longLater])).toEqual([long, longLater]);
|
||||
});
|
||||
|
||||
it("collects duplicate transcript entry ids from active branch entries", () => {
|
||||
const duplicateIds = collectDuplicateUserMessageEntryIdsForCompaction([
|
||||
{
|
||||
id: "entry-1",
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
content: "please run the deployment status check for production",
|
||||
timestamp: 1_000,
|
||||
},
|
||||
},
|
||||
{
|
||||
id: "entry-2",
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
content: "please run the deployment status check for production",
|
||||
timestamp: 2_000,
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
expect(duplicateIds).toEqual(new Set(["entry-2"]));
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,109 @@
|
||||
const DEFAULT_DUPLICATE_USER_MESSAGE_WINDOW_MS = 60_000;
|
||||
const MIN_DUPLICATE_USER_MESSAGE_CHARS = 24;
|
||||
|
||||
type MessageLike = {
|
||||
role?: unknown;
|
||||
content?: unknown;
|
||||
timestamp?: unknown;
|
||||
};
|
||||
|
||||
type EntryLike = {
|
||||
id?: unknown;
|
||||
type?: unknown;
|
||||
message?: unknown;
|
||||
};
|
||||
|
||||
type DuplicateUserMessageOptions = {
|
||||
windowMs?: number;
|
||||
};
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return Boolean(value) && typeof value === "object" && !Array.isArray(value);
|
||||
}
|
||||
|
||||
function normalizeUserMessageContent(content: unknown): string | undefined {
|
||||
if (typeof content === "string") {
|
||||
return content.replace(/\s+/g, " ").trim();
|
||||
}
|
||||
if (!Array.isArray(content)) {
|
||||
return undefined;
|
||||
}
|
||||
const textParts: string[] = [];
|
||||
for (const block of content) {
|
||||
if (!isRecord(block)) {
|
||||
return undefined;
|
||||
}
|
||||
if (block.type === "image") {
|
||||
return undefined;
|
||||
}
|
||||
if (block.type === "text" && typeof block.text === "string") {
|
||||
textParts.push(block.text);
|
||||
}
|
||||
}
|
||||
return textParts.join("\n").replace(/\s+/g, " ").trim();
|
||||
}
|
||||
|
||||
function duplicateSignature(message: unknown): { key: string; timestamp: number } | undefined {
|
||||
if (!isRecord(message) || message.role !== "user" || typeof message.timestamp !== "number") {
|
||||
return undefined;
|
||||
}
|
||||
const text = normalizeUserMessageContent(message.content);
|
||||
if (!text || text.length < MIN_DUPLICATE_USER_MESSAGE_CHARS) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
key: text.normalize("NFC").toLowerCase(),
|
||||
timestamp: message.timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
export function dedupeDuplicateUserMessagesForCompaction<T extends MessageLike>(
|
||||
messages: readonly T[],
|
||||
options: DuplicateUserMessageOptions = {},
|
||||
): T[] {
|
||||
const windowMs = options.windowMs ?? DEFAULT_DUPLICATE_USER_MESSAGE_WINDOW_MS;
|
||||
const lastSeenAtByKey = new Map<string, number>();
|
||||
let removed = 0;
|
||||
const result: T[] = [];
|
||||
for (const message of messages) {
|
||||
const signature = duplicateSignature(message);
|
||||
if (!signature) {
|
||||
result.push(message);
|
||||
continue;
|
||||
}
|
||||
const lastSeenAt = lastSeenAtByKey.get(signature.key);
|
||||
lastSeenAtByKey.set(signature.key, signature.timestamp);
|
||||
if (typeof lastSeenAt === "number" && signature.timestamp - lastSeenAt <= windowMs) {
|
||||
removed += 1;
|
||||
continue;
|
||||
}
|
||||
result.push(message);
|
||||
}
|
||||
return removed > 0 ? result : [...messages];
|
||||
}
|
||||
|
||||
export function collectDuplicateUserMessageEntryIdsForCompaction(
|
||||
entries: readonly EntryLike[],
|
||||
options: DuplicateUserMessageOptions = {},
|
||||
): Set<string> {
|
||||
const windowMs = options.windowMs ?? DEFAULT_DUPLICATE_USER_MESSAGE_WINDOW_MS;
|
||||
const lastSeenAtByKey = new Map<string, number>();
|
||||
const duplicateIds = new Set<string>();
|
||||
for (const entry of entries) {
|
||||
if (entry.type !== "message" || typeof entry.id !== "string") {
|
||||
continue;
|
||||
}
|
||||
const signature = duplicateSignature(
|
||||
isRecord(entry.message) ? (entry.message as MessageLike) : undefined,
|
||||
);
|
||||
if (!signature) {
|
||||
continue;
|
||||
}
|
||||
const lastSeenAt = lastSeenAtByKey.get(signature.key);
|
||||
lastSeenAtByKey.set(signature.key, signature.timestamp);
|
||||
if (typeof lastSeenAt === "number" && signature.timestamp - lastSeenAt <= windowMs) {
|
||||
duplicateIds.add(entry.id);
|
||||
}
|
||||
}
|
||||
return duplicateIds;
|
||||
}
|
||||
@@ -153,6 +153,39 @@ describe("rotateTranscriptAfterCompaction", () => {
|
||||
expect(successor.getSessionName()).toBe("current title");
|
||||
});
|
||||
|
||||
it("drops duplicate user messages from the rotated active branch tail", async () => {
|
||||
const dir = await createTmpDir();
|
||||
const manager = SessionManager.create(dir, dir);
|
||||
manager.appendMessage({ role: "user", content: "old user", timestamp: 1 });
|
||||
const firstKeptId = manager.appendMessage(makeAssistant("old assistant", 2));
|
||||
manager.appendCompaction("Summary of old work.", firstKeptId, 5000);
|
||||
const firstDuplicateId = manager.appendMessage({
|
||||
role: "user",
|
||||
content: "please run the deployment status check for production",
|
||||
timestamp: 3_000,
|
||||
});
|
||||
const secondDuplicateId = manager.appendMessage({
|
||||
role: "user",
|
||||
content: " please run the deployment status check for production ",
|
||||
timestamp: 4_000,
|
||||
});
|
||||
manager.appendMessage(makeAssistant("status checked", 5_000));
|
||||
|
||||
const result = await rotateTranscriptAfterCompaction({
|
||||
sessionManager: manager,
|
||||
sessionFile: manager.getSessionFile()!,
|
||||
now: () => new Date("2026-04-27T12:10:00.000Z"),
|
||||
});
|
||||
|
||||
expect(result.rotated).toBe(true);
|
||||
const successor = SessionManager.open(result.sessionFile!);
|
||||
const entries = successor.getEntries();
|
||||
expect(entries.find((entry) => entry.id === firstDuplicateId)).toBeDefined();
|
||||
expect(entries.find((entry) => entry.id === secondDuplicateId)).toBeUndefined();
|
||||
const contextText = JSON.stringify(successor.buildSessionContext().messages);
|
||||
expect(contextText.match(/deployment status check/g)).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("skips sessions with no compaction entry", async () => {
|
||||
const dir = await createTmpDir();
|
||||
const manager = SessionManager.create(dir, dir);
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
type SessionHeader,
|
||||
} from "@mariozechner/pi-coding-agent";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { collectDuplicateUserMessageEntryIdsForCompaction } from "./compaction-duplicate-user-messages.js";
|
||||
|
||||
type ReadonlySessionManagerForRotation = Pick<
|
||||
SessionManager,
|
||||
@@ -126,10 +127,12 @@ function buildSuccessorEntries(params: {
|
||||
}
|
||||
|
||||
const removedIds = new Set<string>();
|
||||
const duplicateUserMessageIds = collectDuplicateUserMessageEntryIdsForCompaction(branch);
|
||||
for (const entry of allEntries) {
|
||||
if (
|
||||
(summarizedBranchIds.has(entry.id) && entry.type === "message") ||
|
||||
staleStateEntryIds.has(entry.id)
|
||||
staleStateEntryIds.has(entry.id) ||
|
||||
duplicateUserMessageIds.has(entry.id)
|
||||
) {
|
||||
removedIds.add(entry.id);
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import {
|
||||
captureCompactionCheckpointSnapshot,
|
||||
cleanupCompactionCheckpointSnapshot,
|
||||
MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES,
|
||||
persistSessionCompactionCheckpoint,
|
||||
} from "./session-compaction-checkpoints.js";
|
||||
|
||||
@@ -84,6 +85,31 @@ describe("session-compaction-checkpoints", () => {
|
||||
expect(fsSync.existsSync(sessionFile!)).toBe(true);
|
||||
});
|
||||
|
||||
test("capture skips oversized pre-compaction transcripts", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-oversized-"));
|
||||
tempDirs.push(dir);
|
||||
|
||||
const session = SessionManager.create(dir, dir);
|
||||
session.appendMessage({
|
||||
role: "user",
|
||||
content: "before compaction",
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
const sessionFile = session.getSessionFile();
|
||||
expect(sessionFile).toBeTruthy();
|
||||
await fs.appendFile(sessionFile!, "x".repeat(128), "utf-8");
|
||||
|
||||
const snapshot = captureCompactionCheckpointSnapshot({
|
||||
sessionManager: session,
|
||||
sessionFile: sessionFile!,
|
||||
maxBytes: 64,
|
||||
});
|
||||
|
||||
expect(snapshot).toBeNull();
|
||||
expect(MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES).toBeGreaterThan(64);
|
||||
expect(fsSync.readdirSync(dir).filter((file) => file.includes(".checkpoint."))).toEqual([]);
|
||||
});
|
||||
|
||||
test("persist trims old checkpoint metadata and removes trimmed snapshot files", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-trim-"));
|
||||
tempDirs.push(dir);
|
||||
|
||||
@@ -16,6 +16,7 @@ import { resolveGatewaySessionStoreTarget } from "./session-utils.js";
|
||||
|
||||
const log = createSubsystemLogger("gateway/session-compaction-checkpoints");
|
||||
const MAX_COMPACTION_CHECKPOINTS_PER_SESSION = 25;
|
||||
export const MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES = 64 * 1024 * 1024;
|
||||
|
||||
export type CapturedCompactionCheckpointSnapshot = {
|
||||
sessionId: string;
|
||||
@@ -62,6 +63,7 @@ export function resolveSessionCompactionCheckpointReason(params: {
|
||||
export function captureCompactionCheckpointSnapshot(params: {
|
||||
sessionManager: Pick<SessionManager, "getLeafId">;
|
||||
sessionFile: string;
|
||||
maxBytes?: number;
|
||||
}): CapturedCompactionCheckpointSnapshot | null {
|
||||
const getLeafId =
|
||||
params.sessionManager && typeof params.sessionManager.getLeafId === "function"
|
||||
@@ -71,6 +73,15 @@ export function captureCompactionCheckpointSnapshot(params: {
|
||||
if (!getLeafId || !sessionFile) {
|
||||
return null;
|
||||
}
|
||||
const maxBytes = params.maxBytes ?? MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES;
|
||||
try {
|
||||
const stat = fsSync.statSync(sessionFile);
|
||||
if (!stat.isFile() || stat.size > maxBytes) {
|
||||
return null;
|
||||
}
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
const leafId = getLeafId();
|
||||
if (!leafId) {
|
||||
return null;
|
||||
|
||||
Reference in New Issue
Block a user