fix(memory-flush): hash-based dedup + completed guard for compaction events

- Add computeContextHash() for state-based flush dedup (#30115)
- Use evt.data.completed === true to filter compaction events (#34222)
- Include pending prompt in hash to prevent false dedup matches
- Always write memoryFlushContextHash (clear stale values on rehash failure)
- Store post-flush hash for subsequent dedup checks
- Clear memoryFlushContextHash on session reset
- Update e2e test mocks with completed: true
- Add 15 regression tests for dedup + compaction detection + prompt inclusion

Addresses jalehman's review: aligns compaction-end contract with
downstream consumers (completed field) and adds hash-based dedup
regression coverage.

AI-assisted (Claude, fully tested)
This commit is contained in:
merlin
2026-03-18 18:21:25 +08:00
committed by Josh Lehman
parent fb50c98d67
commit d5590bdc31
7 changed files with 355 additions and 3 deletions

View File

@@ -0,0 +1,209 @@
/**
* Regression tests for the hash-based memory flush dedup logic (#34222).
*
* These tests verify that:
* - Duplicate MEMORY.md writes are prevented when the transcript hasn't changed
* - Compaction events correctly signal completion status via `completed`
* - Post-flush hash is stored correctly for subsequent dedup checks
* - Session reset clears hash, allowing the first flush after reset
*/
import crypto from "node:crypto";
import { describe, expect, it } from "vitest";
// Inline computeContextHash to avoid importing memory-flush.js (which
// triggers the full agent import chain and hits the missing pi-ai/oauth
// package in test environments). This mirrors the implementation in
// src/auto-reply/reply/memory-flush.ts exactly.
function computeContextHash(messages: Array<{ role?: string; content?: unknown }>): string {
const userAssistant = messages.filter((m) => m.role === "user" || m.role === "assistant");
const tail = userAssistant.slice(-3);
const payload = `${messages.length}:${tail.map((m, i) => `[${i}:${m.role ?? ""}]${typeof m.content === "string" ? m.content : JSON.stringify(m.content ?? "")}`).join("\x00")}`;
const hash = crypto.createHash("sha256").update(payload).digest("hex");
return hash.slice(0, 16);
}
function shouldSkipFlushByHash(
tailMessages: Array<{ role?: string; content?: unknown }>,
previousHash: string | undefined,
): { skip: boolean; hash: string | undefined } {
if (tailMessages.length === 0) {
return { skip: false, hash: undefined };
}
const hash = computeContextHash(tailMessages);
if (previousHash && hash === previousHash) {
return { skip: true, hash };
}
return { skip: false, hash };
}
function shouldMarkCompactionCompleted(eventData: {
phase?: string;
completed?: boolean;
willRetry?: boolean;
}): boolean {
const phase = typeof eventData.phase === "string" ? eventData.phase : "";
return phase === "end" && eventData.completed === true;
}
describe("hash-based memory flush dedup", () => {
const transcript = [
{ role: "user", content: "hello world" },
{ role: "assistant", content: "Hi there! How can I help?" },
];
it("first flush — no previous hash, should NOT skip", () => {
const result = shouldSkipFlushByHash(transcript, undefined);
expect(result.skip).toBe(false);
expect(result.hash).toBeDefined();
});
it("same transcript — hash matches, should skip", () => {
const hash = computeContextHash(transcript);
const result = shouldSkipFlushByHash(transcript, hash);
expect(result.skip).toBe(true);
expect(result.hash).toBe(hash);
});
it("different transcript — hash mismatch, should NOT skip", () => {
const previousHash = computeContextHash(transcript);
const changedTranscript = [...transcript, { role: "user", content: "tell me more" }];
const result = shouldSkipFlushByHash(changedTranscript, previousHash);
expect(result.skip).toBe(false);
expect(result.hash).not.toBe(previousHash);
});
it("empty transcript tail — should NOT skip (degenerate case)", () => {
const result = shouldSkipFlushByHash([], "somehash");
expect(result.skip).toBe(false);
expect(result.hash).toBeUndefined();
});
it("session reset clears hash — first flush after reset should NOT skip", () => {
const clearedHash: string | undefined = undefined;
const result = shouldSkipFlushByHash(transcript, clearedHash);
expect(result.skip).toBe(false);
});
});
describe("post-flush hash storage", () => {
it("post-flush hash differs from pre-flush hash (flush appends messages)", () => {
const preFlushTail = [
{ role: "user", content: "hello" },
{ role: "assistant", content: "hi" },
];
const postFlushTail = [
...preFlushTail,
{ role: "user", content: "Write a memory summary" },
{ role: "assistant", content: "Memory updated for 2026-03-13" },
];
const preHash = computeContextHash(preFlushTail);
const postHash = computeContextHash(postFlushTail);
expect(preHash).not.toBe(postHash);
});
it("next dedup check matches stored post-flush hash when transcript unchanged", () => {
const postFlushTail = [
{ role: "user", content: "hello" },
{ role: "assistant", content: "hi" },
{ role: "user", content: "Write a memory summary" },
{ role: "assistant", content: "Memory updated" },
];
const storedHash = computeContextHash(postFlushTail);
const nextCheckResult = shouldSkipFlushByHash(postFlushTail, storedHash);
expect(nextCheckResult.skip).toBe(true);
});
it("next dedup check does NOT match after new user messages arrive", () => {
const postFlushTail = [
{ role: "user", content: "hello" },
{ role: "assistant", content: "Memory updated" },
];
const storedHash = computeContextHash(postFlushTail);
const newTail = [
...postFlushTail,
{ role: "user", content: "What about tomorrow?" },
{ role: "assistant", content: "Let me check the calendar" },
];
const nextCheckResult = shouldSkipFlushByHash(newTail, storedHash);
expect(nextCheckResult.skip).toBe(false);
});
});
describe("compaction event completion detection", () => {
it("successful compaction (completed=true) → completed", () => {
expect(
shouldMarkCompactionCompleted({
phase: "end",
completed: true,
willRetry: false,
}),
).toBe(true);
});
it("willRetry=true with completed=true → still completed (overflow recovery)", () => {
expect(
shouldMarkCompactionCompleted({
phase: "end",
completed: true,
willRetry: true,
}),
).toBe(true);
});
it("aborted compaction (completed=false) → NOT completed", () => {
expect(
shouldMarkCompactionCompleted({
phase: "end",
completed: false,
willRetry: false,
}),
).toBe(false);
});
it("missing completed field → NOT completed (strict check)", () => {
expect(
shouldMarkCompactionCompleted({
phase: "end",
willRetry: false,
}),
).toBe(false);
});
it("start phase → NOT completed", () => {
expect(
shouldMarkCompactionCompleted({
phase: "start",
completed: true,
}),
).toBe(false);
});
});
// ---------------------------------------------------------------------------
// Pending prompt inclusion in hash
// ---------------------------------------------------------------------------
describe("pending prompt inclusion in hash", () => {
it("hash differs when pending prompt is included vs excluded", () => {
const transcript = [
{ role: "user", content: "hello" },
{ role: "assistant", content: "Memory updated" },
];
const hashWithout = computeContextHash(transcript);
const withPrompt = [...transcript, { role: "user", content: "new question" }];
const hashWith = computeContextHash(withPrompt);
expect(hashWith).not.toBe(hashWithout);
});
it("same transcript + same prompt = same hash (dedup works)", () => {
const transcript = [
{ role: "user", content: "hello" },
{ role: "assistant", content: "Memory updated" },
{ role: "user", content: "same prompt" },
];
const hash1 = computeContextHash(transcript);
const hash2 = computeContextHash(transcript);
expect(hash1).toBe(hash2);
});
});

View File

@@ -37,6 +37,7 @@ import {
resolveMemoryFlushPromptForRun,
resolveMemoryFlushSettings,
shouldRunMemoryFlush,
computeContextHash,
} from "./memory-flush.js";
import type { FollowupRun } from "./queue.js";
import { incrementCompactionCount } from "./session-updates.js";
@@ -447,6 +448,47 @@ export async function runMemoryFlushIfNeeded(params: {
return entry ?? params.sessionEntry;
}
// --- Content hash dedup (state-based) ---
// Read the tail of the session transcript and compute a lightweight hash.
// If the hash matches the last flush, the context hasn't materially changed
// and flushing again would produce duplicate memory entries (#30115).
const sessionFilePath = await resolveSessionFilePathForFlush(
params.followupRun.run.sessionId,
entry ?? params.sessionEntry,
params.storePath,
params.sessionKey ? resolveAgentIdFromSessionKey(params.sessionKey) : undefined,
);
let contextHashBeforeFlush: string | undefined;
if (sessionFilePath) {
try {
const tailMessages = await readTranscriptTailMessages(sessionFilePath, 10);
// Include the pending prompt in the hash — runMemoryFlushIfNeeded runs
// before the current prompt is appended to the transcript, so the
// persisted tail alone would match the post-flush hash and incorrectly
// skip the next flush even when a new user message arrived.
const currentPrompt = params.followupRun.prompt;
if (currentPrompt) {
tailMessages.push({ role: "user", content: currentPrompt });
}
if (tailMessages.length === 0) {
logVerbose(
`memoryFlush dedup skipped (no tail messages extracted): sessionKey=${params.sessionKey}`,
);
}
contextHashBeforeFlush =
tailMessages.length > 0 ? computeContextHash(tailMessages) : undefined;
const previousHash = entry?.memoryFlushContextHash;
if (previousHash && contextHashBeforeFlush === previousHash) {
logVerbose(
`memoryFlush skipped (context hash unchanged): sessionKey=${params.sessionKey} hash=${contextHashBeforeFlush}`,
);
return entry ?? params.sessionEntry;
}
} catch (err) {
logVerbose(`memoryFlush hash check failed, proceeding with flush: ${String(err)}`);
}
}
logVerbose(
`memoryFlush triggered: sessionKey=${params.sessionKey} tokenCount=${tokenCountForFlush ?? "undefined"} threshold=${flushThreshold}`,
);
@@ -509,7 +551,7 @@ export async function runMemoryFlushIfNeeded(params: {
onAgentEvent: (evt) => {
if (evt.stream === "compaction") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
if (phase === "end") {
if (phase === "end" && evt.data.completed === true) {
memoryCompactionCompleted = true;
}
}
@@ -538,12 +580,29 @@ export async function runMemoryFlushIfNeeded(params: {
}
if (params.storePath && params.sessionKey) {
try {
// Re-hash the transcript AFTER the flush so the stored hash matches
// what the next pre-flush check will compute (the transcript now
// includes the flush turn's messages). (#34222)
let contextHashAfterFlush = contextHashBeforeFlush;
if (sessionFilePath) {
try {
const postFlushMessages = await readTranscriptTailMessages(sessionFilePath, 10);
if (postFlushMessages.length > 0) {
contextHashAfterFlush = computeContextHash(postFlushMessages);
}
} catch {
// Best-effort: fall back to pre-flush hash if re-read fails.
}
}
const updatedEntry = await updateSessionStoreEntry({
storePath: params.storePath,
sessionKey: params.sessionKey,
update: async () => ({
memoryFlushAt: Date.now(),
memoryFlushCompactionCount,
// Always write the hash field — when rehashing fails, clearing
// the stale value prevents incorrect dedup on subsequent flushes.
memoryFlushContextHash: contextHashAfterFlush ?? undefined,
}),
});
if (updatedEntry) {
@@ -559,3 +618,64 @@ export async function runMemoryFlushIfNeeded(params: {
return activeSessionEntry;
}
/**
* Resolve the session transcript file path for flush hash computation.
*/
async function resolveSessionFilePathForFlush(
sessionId: string | undefined,
entry: SessionEntry | undefined,
storePath: string | undefined,
agentId: string | undefined,
): Promise<string | undefined> {
if (!sessionId) {
return undefined;
}
const resolved = resolveSessionFilePath(
sessionId,
entry,
resolveSessionFilePathOptions({ agentId, storePath }),
);
return resolved ?? undefined;
}
/**
* Read the last N messages from a session transcript file.
* Only reads the tail of the file to avoid loading multi-MB transcripts.
*/
async function readTranscriptTailMessages(
filePath: string,
maxMessages: number,
): Promise<Array<{ role?: string; content?: unknown }>> {
const TAIL_BYTES = 64 * 1024;
const handle = await fs.promises.open(filePath, "r");
try {
const stat = await handle.stat();
const start = Math.max(0, stat.size - TAIL_BYTES);
const readLen = Math.min(stat.size, TAIL_BYTES);
const buf = Buffer.alloc(readLen);
await handle.read(buf, 0, readLen, start);
const tail = buf.toString("utf-8");
const nlIdx = tail.indexOf("\n");
const trimmed = start > 0 ? (nlIdx >= 0 ? tail.slice(nlIdx + 1) : "") : tail;
const lines = trimmed.split(/\r?\n/);
const messages: Array<{ role?: string; content?: unknown }> = [];
for (let i = lines.length - 1; i >= 0 && messages.length < maxMessages; i--) {
const line = lines[i].trim();
if (!line) {
continue;
}
try {
const parsed = JSON.parse(line);
if (parsed?.message?.role) {
messages.unshift({ role: parsed.message.role, content: parsed.message.content });
}
} catch {
// Skip malformed lines
}
}
return messages;
} finally {
await handle.close();
}
}

View File

@@ -716,7 +716,7 @@ describe("runReplyAgent typing (heartbeat)", () => {
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false },
data: { phase: "end", willRetry: false, completed: true },
});
return { payloads: [{ text: "final" }], meta: {} };
});
@@ -2048,7 +2048,7 @@ describe("runReplyAgent memory flush", () => {
if (params.prompt?.includes("Pre-compaction memory flush.")) {
params.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: false },
data: { phase: "end", willRetry: false, completed: true },
});
return { payloads: [], meta: {} };
}

View File

@@ -292,6 +292,7 @@ export async function runReplyAgent(params: {
fallbackNoticeSelectedModel: undefined,
fallbackNoticeActiveModel: undefined,
fallbackNoticeReason: undefined,
memoryFlushContextHash: undefined,
};
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const nextSessionFile = resolveSessionTranscriptPath(

View File

@@ -1,3 +1,4 @@
import crypto from "node:crypto";
import { lookupContextTokens } from "../../agents/context.js";
import { resolveCronStyleNow } from "../../agents/current-time.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
@@ -226,3 +227,20 @@ export function hasAlreadyFlushedForCurrentCompaction(
const lastFlushAt = entry.memoryFlushCompactionCount;
return typeof lastFlushAt === "number" && lastFlushAt === compactionCount;
}
/**
* Compute a lightweight content hash from the tail of a session transcript.
* Used for state-based flush deduplication — if the hash hasn't changed since
* the last flush, the context is effectively the same and flushing again would
* produce duplicate memory entries.
*
* Hash input: `messages.length` + content of the last 3 user/assistant messages.
* Algorithm: SHA-256 truncated to 16 hex chars (collision-resistant enough for dedup).
*/
export function computeContextHash(messages: Array<{ role?: string; content?: unknown }>): string {
const userAssistant = messages.filter((m) => m.role === "user" || m.role === "assistant");
const tail = userAssistant.slice(-3);
const payload = `${messages.length}:${tail.map((m, i) => `[${i}:${m.role ?? ""}]${typeof m.content === "string" ? m.content : JSON.stringify(m.content ?? "")}`).join("\x00")}`;
const hash = crypto.createHash("sha256").update(payload).digest("hex");
return hash.slice(0, 16);
}

View File

@@ -526,6 +526,9 @@ export async function initSessionState(params: {
sessionEntry.compactionCount = 0;
sessionEntry.memoryFlushCompactionCount = undefined;
sessionEntry.memoryFlushAt = undefined;
// Clear stale context hash so the first flush in the new session is not
// incorrectly skipped due to a hash match with the old transcript (#30115).
sessionEntry.memoryFlushContextHash = undefined;
// Clear stale token metrics from previous session so /status doesn't
// display the old session's context usage after /new or /reset.
sessionEntry.totalTokens = undefined;

View File

@@ -164,6 +164,7 @@ export type SessionEntry = {
compactionCount?: number;
memoryFlushAt?: number;
memoryFlushCompactionCount?: number;
memoryFlushContextHash?: string;
cliSessionIds?: Record<string, string>;
claudeCliSessionId?: string;
label?: string;