diff --git a/src/auto-reply/reply/agent-runner-memory.dedup.test.ts b/src/auto-reply/reply/agent-runner-memory.dedup.test.ts new file mode 100644 index 00000000000..d2afa5a3e6f --- /dev/null +++ b/src/auto-reply/reply/agent-runner-memory.dedup.test.ts @@ -0,0 +1,209 @@ +/** + * Regression tests for the hash-based memory flush dedup logic (#34222). + * + * These tests verify that: + * - Duplicate MEMORY.md writes are prevented when the transcript hasn't changed + * - Compaction events correctly signal completion status via `completed` + * - Post-flush hash is stored correctly for subsequent dedup checks + * - Session reset clears hash, allowing the first flush after reset + */ +import crypto from "node:crypto"; +import { describe, expect, it } from "vitest"; + +// Inline computeContextHash to avoid importing memory-flush.js (which +// triggers the full agent import chain and hits the missing pi-ai/oauth +// package in test environments). This mirrors the implementation in +// src/auto-reply/reply/memory-flush.ts exactly. +function computeContextHash(messages: Array<{ role?: string; content?: unknown }>): string { + const userAssistant = messages.filter((m) => m.role === "user" || m.role === "assistant"); + const tail = userAssistant.slice(-3); + const payload = `${messages.length}:${tail.map((m, i) => `[${i}:${m.role ?? ""}]${typeof m.content === "string" ? m.content : JSON.stringify(m.content ?? "")}`).join("\x00")}`; + const hash = crypto.createHash("sha256").update(payload).digest("hex"); + return hash.slice(0, 16); +} + +function shouldSkipFlushByHash( + tailMessages: Array<{ role?: string; content?: unknown }>, + previousHash: string | undefined, +): { skip: boolean; hash: string | undefined } { + if (tailMessages.length === 0) { + return { skip: false, hash: undefined }; + } + const hash = computeContextHash(tailMessages); + if (previousHash && hash === previousHash) { + return { skip: true, hash }; + } + return { skip: false, hash }; +} + +function shouldMarkCompactionCompleted(eventData: { + phase?: string; + completed?: boolean; + willRetry?: boolean; +}): boolean { + const phase = typeof eventData.phase === "string" ? eventData.phase : ""; + return phase === "end" && eventData.completed === true; +} + +describe("hash-based memory flush dedup", () => { + const transcript = [ + { role: "user", content: "hello world" }, + { role: "assistant", content: "Hi there! How can I help?" }, + ]; + + it("first flush — no previous hash, should NOT skip", () => { + const result = shouldSkipFlushByHash(transcript, undefined); + expect(result.skip).toBe(false); + expect(result.hash).toBeDefined(); + }); + + it("same transcript — hash matches, should skip", () => { + const hash = computeContextHash(transcript); + const result = shouldSkipFlushByHash(transcript, hash); + expect(result.skip).toBe(true); + expect(result.hash).toBe(hash); + }); + + it("different transcript — hash mismatch, should NOT skip", () => { + const previousHash = computeContextHash(transcript); + const changedTranscript = [...transcript, { role: "user", content: "tell me more" }]; + const result = shouldSkipFlushByHash(changedTranscript, previousHash); + expect(result.skip).toBe(false); + expect(result.hash).not.toBe(previousHash); + }); + + it("empty transcript tail — should NOT skip (degenerate case)", () => { + const result = shouldSkipFlushByHash([], "somehash"); + expect(result.skip).toBe(false); + expect(result.hash).toBeUndefined(); + }); + + it("session reset clears hash — first flush after reset should NOT skip", () => { + const clearedHash: string | undefined = undefined; + const result = shouldSkipFlushByHash(transcript, clearedHash); + expect(result.skip).toBe(false); + }); +}); + +describe("post-flush hash storage", () => { + it("post-flush hash differs from pre-flush hash (flush appends messages)", () => { + const preFlushTail = [ + { role: "user", content: "hello" }, + { role: "assistant", content: "hi" }, + ]; + const postFlushTail = [ + ...preFlushTail, + { role: "user", content: "Write a memory summary" }, + { role: "assistant", content: "Memory updated for 2026-03-13" }, + ]; + + const preHash = computeContextHash(preFlushTail); + const postHash = computeContextHash(postFlushTail); + expect(preHash).not.toBe(postHash); + }); + + it("next dedup check matches stored post-flush hash when transcript unchanged", () => { + const postFlushTail = [ + { role: "user", content: "hello" }, + { role: "assistant", content: "hi" }, + { role: "user", content: "Write a memory summary" }, + { role: "assistant", content: "Memory updated" }, + ]; + const storedHash = computeContextHash(postFlushTail); + const nextCheckResult = shouldSkipFlushByHash(postFlushTail, storedHash); + expect(nextCheckResult.skip).toBe(true); + }); + + it("next dedup check does NOT match after new user messages arrive", () => { + const postFlushTail = [ + { role: "user", content: "hello" }, + { role: "assistant", content: "Memory updated" }, + ]; + const storedHash = computeContextHash(postFlushTail); + const newTail = [ + ...postFlushTail, + { role: "user", content: "What about tomorrow?" }, + { role: "assistant", content: "Let me check the calendar" }, + ]; + const nextCheckResult = shouldSkipFlushByHash(newTail, storedHash); + expect(nextCheckResult.skip).toBe(false); + }); +}); + +describe("compaction event completion detection", () => { + it("successful compaction (completed=true) → completed", () => { + expect( + shouldMarkCompactionCompleted({ + phase: "end", + completed: true, + willRetry: false, + }), + ).toBe(true); + }); + + it("willRetry=true with completed=true → still completed (overflow recovery)", () => { + expect( + shouldMarkCompactionCompleted({ + phase: "end", + completed: true, + willRetry: true, + }), + ).toBe(true); + }); + + it("aborted compaction (completed=false) → NOT completed", () => { + expect( + shouldMarkCompactionCompleted({ + phase: "end", + completed: false, + willRetry: false, + }), + ).toBe(false); + }); + + it("missing completed field → NOT completed (strict check)", () => { + expect( + shouldMarkCompactionCompleted({ + phase: "end", + willRetry: false, + }), + ).toBe(false); + }); + + it("start phase → NOT completed", () => { + expect( + shouldMarkCompactionCompleted({ + phase: "start", + completed: true, + }), + ).toBe(false); + }); +}); + +// --------------------------------------------------------------------------- +// Pending prompt inclusion in hash +// --------------------------------------------------------------------------- + +describe("pending prompt inclusion in hash", () => { + it("hash differs when pending prompt is included vs excluded", () => { + const transcript = [ + { role: "user", content: "hello" }, + { role: "assistant", content: "Memory updated" }, + ]; + const hashWithout = computeContextHash(transcript); + const withPrompt = [...transcript, { role: "user", content: "new question" }]; + const hashWith = computeContextHash(withPrompt); + expect(hashWith).not.toBe(hashWithout); + }); + + it("same transcript + same prompt = same hash (dedup works)", () => { + const transcript = [ + { role: "user", content: "hello" }, + { role: "assistant", content: "Memory updated" }, + { role: "user", content: "same prompt" }, + ]; + const hash1 = computeContextHash(transcript); + const hash2 = computeContextHash(transcript); + expect(hash1).toBe(hash2); + }); +}); diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index 267326a7e20..c6b36eb0029 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -37,6 +37,7 @@ import { resolveMemoryFlushPromptForRun, resolveMemoryFlushSettings, shouldRunMemoryFlush, + computeContextHash, } from "./memory-flush.js"; import type { FollowupRun } from "./queue.js"; import { incrementCompactionCount } from "./session-updates.js"; @@ -447,6 +448,47 @@ export async function runMemoryFlushIfNeeded(params: { return entry ?? params.sessionEntry; } + // --- Content hash dedup (state-based) --- + // Read the tail of the session transcript and compute a lightweight hash. + // If the hash matches the last flush, the context hasn't materially changed + // and flushing again would produce duplicate memory entries (#30115). + const sessionFilePath = await resolveSessionFilePathForFlush( + params.followupRun.run.sessionId, + entry ?? params.sessionEntry, + params.storePath, + params.sessionKey ? resolveAgentIdFromSessionKey(params.sessionKey) : undefined, + ); + let contextHashBeforeFlush: string | undefined; + if (sessionFilePath) { + try { + const tailMessages = await readTranscriptTailMessages(sessionFilePath, 10); + // Include the pending prompt in the hash — runMemoryFlushIfNeeded runs + // before the current prompt is appended to the transcript, so the + // persisted tail alone would match the post-flush hash and incorrectly + // skip the next flush even when a new user message arrived. + const currentPrompt = params.followupRun.prompt; + if (currentPrompt) { + tailMessages.push({ role: "user", content: currentPrompt }); + } + if (tailMessages.length === 0) { + logVerbose( + `memoryFlush dedup skipped (no tail messages extracted): sessionKey=${params.sessionKey}`, + ); + } + contextHashBeforeFlush = + tailMessages.length > 0 ? computeContextHash(tailMessages) : undefined; + const previousHash = entry?.memoryFlushContextHash; + if (previousHash && contextHashBeforeFlush === previousHash) { + logVerbose( + `memoryFlush skipped (context hash unchanged): sessionKey=${params.sessionKey} hash=${contextHashBeforeFlush}`, + ); + return entry ?? params.sessionEntry; + } + } catch (err) { + logVerbose(`memoryFlush hash check failed, proceeding with flush: ${String(err)}`); + } + } + logVerbose( `memoryFlush triggered: sessionKey=${params.sessionKey} tokenCount=${tokenCountForFlush ?? "undefined"} threshold=${flushThreshold}`, ); @@ -509,7 +551,7 @@ export async function runMemoryFlushIfNeeded(params: { onAgentEvent: (evt) => { if (evt.stream === "compaction") { const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - if (phase === "end") { + if (phase === "end" && evt.data.completed === true) { memoryCompactionCompleted = true; } } @@ -538,12 +580,29 @@ export async function runMemoryFlushIfNeeded(params: { } if (params.storePath && params.sessionKey) { try { + // Re-hash the transcript AFTER the flush so the stored hash matches + // what the next pre-flush check will compute (the transcript now + // includes the flush turn's messages). (#34222) + let contextHashAfterFlush = contextHashBeforeFlush; + if (sessionFilePath) { + try { + const postFlushMessages = await readTranscriptTailMessages(sessionFilePath, 10); + if (postFlushMessages.length > 0) { + contextHashAfterFlush = computeContextHash(postFlushMessages); + } + } catch { + // Best-effort: fall back to pre-flush hash if re-read fails. + } + } const updatedEntry = await updateSessionStoreEntry({ storePath: params.storePath, sessionKey: params.sessionKey, update: async () => ({ memoryFlushAt: Date.now(), memoryFlushCompactionCount, + // Always write the hash field — when rehashing fails, clearing + // the stale value prevents incorrect dedup on subsequent flushes. + memoryFlushContextHash: contextHashAfterFlush ?? undefined, }), }); if (updatedEntry) { @@ -559,3 +618,64 @@ export async function runMemoryFlushIfNeeded(params: { return activeSessionEntry; } + +/** + * Resolve the session transcript file path for flush hash computation. + */ +async function resolveSessionFilePathForFlush( + sessionId: string | undefined, + entry: SessionEntry | undefined, + storePath: string | undefined, + agentId: string | undefined, +): Promise { + if (!sessionId) { + return undefined; + } + const resolved = resolveSessionFilePath( + sessionId, + entry, + resolveSessionFilePathOptions({ agentId, storePath }), + ); + return resolved ?? undefined; +} + +/** + * Read the last N messages from a session transcript file. + * Only reads the tail of the file to avoid loading multi-MB transcripts. + */ +async function readTranscriptTailMessages( + filePath: string, + maxMessages: number, +): Promise> { + const TAIL_BYTES = 64 * 1024; + const handle = await fs.promises.open(filePath, "r"); + try { + const stat = await handle.stat(); + const start = Math.max(0, stat.size - TAIL_BYTES); + const readLen = Math.min(stat.size, TAIL_BYTES); + const buf = Buffer.alloc(readLen); + await handle.read(buf, 0, readLen, start); + const tail = buf.toString("utf-8"); + const nlIdx = tail.indexOf("\n"); + const trimmed = start > 0 ? (nlIdx >= 0 ? tail.slice(nlIdx + 1) : "") : tail; + const lines = trimmed.split(/\r?\n/); + const messages: Array<{ role?: string; content?: unknown }> = []; + for (let i = lines.length - 1; i >= 0 && messages.length < maxMessages; i--) { + const line = lines[i].trim(); + if (!line) { + continue; + } + try { + const parsed = JSON.parse(line); + if (parsed?.message?.role) { + messages.unshift({ role: parsed.message.role, content: parsed.message.content }); + } + } catch { + // Skip malformed lines + } + } + return messages; + } finally { + await handle.close(); + } +} diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts index 6bebdc6a390..ce4d5bae9e5 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts @@ -716,7 +716,7 @@ describe("runReplyAgent typing (heartbeat)", () => { state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { params.onAgentEvent?.({ stream: "compaction", - data: { phase: "end", willRetry: false }, + data: { phase: "end", willRetry: false, completed: true }, }); return { payloads: [{ text: "final" }], meta: {} }; }); @@ -2048,7 +2048,7 @@ describe("runReplyAgent memory flush", () => { if (params.prompt?.includes("Pre-compaction memory flush.")) { params.onAgentEvent?.({ stream: "compaction", - data: { phase: "end", willRetry: false }, + data: { phase: "end", willRetry: false, completed: true }, }); return { payloads: [], meta: {} }; } diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 7499657d6d4..bc7cbe4cb35 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -292,6 +292,7 @@ export async function runReplyAgent(params: { fallbackNoticeSelectedModel: undefined, fallbackNoticeActiveModel: undefined, fallbackNoticeReason: undefined, + memoryFlushContextHash: undefined, }; const agentId = resolveAgentIdFromSessionKey(sessionKey); const nextSessionFile = resolveSessionTranscriptPath( diff --git a/src/auto-reply/reply/memory-flush.ts b/src/auto-reply/reply/memory-flush.ts index 95f6dbaa053..bad052531ee 100644 --- a/src/auto-reply/reply/memory-flush.ts +++ b/src/auto-reply/reply/memory-flush.ts @@ -1,3 +1,4 @@ +import crypto from "node:crypto"; import { lookupContextTokens } from "../../agents/context.js"; import { resolveCronStyleNow } from "../../agents/current-time.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; @@ -226,3 +227,20 @@ export function hasAlreadyFlushedForCurrentCompaction( const lastFlushAt = entry.memoryFlushCompactionCount; return typeof lastFlushAt === "number" && lastFlushAt === compactionCount; } + +/** + * Compute a lightweight content hash from the tail of a session transcript. + * Used for state-based flush deduplication — if the hash hasn't changed since + * the last flush, the context is effectively the same and flushing again would + * produce duplicate memory entries. + * + * Hash input: `messages.length` + content of the last 3 user/assistant messages. + * Algorithm: SHA-256 truncated to 16 hex chars (collision-resistant enough for dedup). + */ +export function computeContextHash(messages: Array<{ role?: string; content?: unknown }>): string { + const userAssistant = messages.filter((m) => m.role === "user" || m.role === "assistant"); + const tail = userAssistant.slice(-3); + const payload = `${messages.length}:${tail.map((m, i) => `[${i}:${m.role ?? ""}]${typeof m.content === "string" ? m.content : JSON.stringify(m.content ?? "")}`).join("\x00")}`; + const hash = crypto.createHash("sha256").update(payload).digest("hex"); + return hash.slice(0, 16); +} diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index 6c1b2233c0f..a129af958b7 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -526,6 +526,9 @@ export async function initSessionState(params: { sessionEntry.compactionCount = 0; sessionEntry.memoryFlushCompactionCount = undefined; sessionEntry.memoryFlushAt = undefined; + // Clear stale context hash so the first flush in the new session is not + // incorrectly skipped due to a hash match with the old transcript (#30115). + sessionEntry.memoryFlushContextHash = undefined; // Clear stale token metrics from previous session so /status doesn't // display the old session's context usage after /new or /reset. sessionEntry.totalTokens = undefined; diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 6513fc81b37..27acdf2aca7 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -164,6 +164,7 @@ export type SessionEntry = { compactionCount?: number; memoryFlushAt?: number; memoryFlushCompactionCount?: number; + memoryFlushContextHash?: string; cliSessionIds?: Record; claudeCliSessionId?: string; label?: string;