diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts index 58e5f825956..4ebbbbc6c54 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts @@ -1,7 +1,6 @@ import fs from "node:fs/promises"; -import { tmpdir } from "node:os"; import path from "node:path"; -import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import type { SessionEntry } from "../../config/sessions.js"; import * as sessions from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; @@ -27,18 +26,6 @@ type AgentRunParams = { silentExpected?: boolean; }; -type EmbeddedRunParams = { - prompt?: string; - extraSystemPrompt?: string; - memoryFlushWritePath?: string; - sessionId?: string; - sessionFile?: string; - silentExpected?: boolean; - bootstrapPromptWarningSignaturesSeen?: string[]; - bootstrapPromptWarningSignature?: string; - onAgentEvent?: (evt: { stream?: string; data?: { phase?: string; willRetry?: boolean } }) => void; -}; - const state = vi.hoisted(() => ({ compactEmbeddedPiSessionMock: vi.fn(), runEmbeddedPiAgentMock: vi.fn(), @@ -195,111 +182,6 @@ function createMinimalRun(params?: { }; } -async function seedSessionStore(params: { - storePath: string; - sessionKey: string; - entry: Record; -}) { - await fs.mkdir(path.dirname(params.storePath), { recursive: true }); - await fs.writeFile( - params.storePath, - JSON.stringify({ [params.sessionKey]: params.entry }, null, 2), - "utf-8", - ); -} - -function createBaseRun(params: { - storePath: string; - sessionEntry: Record; - config?: Record; - runOverrides?: Partial; -}) { - const typing = createMockTypingController(); - const sessionCtx = { - Provider: "whatsapp", - OriginatingTo: "+15550001111", - AccountId: "primary", - MessageSid: "msg", - } as unknown as TemplateContext; - const resolvedQueue = { mode: "interrupt" } as unknown as QueueSettings; - const followupRun = { - prompt: "hello", - summaryLine: "hello", - enqueuedAt: Date.now(), - run: { - agentId: "main", - agentDir: "/tmp/agent", - sessionId: "session", - sessionKey: "main", - messageProvider: "whatsapp", - sessionFile: "/tmp/session.jsonl", - workspaceDir: "/tmp", - config: params.config ?? {}, - skillsSnapshot: {}, - provider: "anthropic", - model: "claude", - thinkLevel: "low", - verboseLevel: "off", - elevatedLevel: "off", - bashElevated: { - enabled: false, - allowed: false, - defaultLevel: "off", - }, - timeoutMs: 1_000, - blockReplyBreak: "message_end", - }, - } as unknown as FollowupRun; - const run = { - ...followupRun.run, - ...params.runOverrides, - config: params.config ?? followupRun.run.config, - }; - - return { - typing, - sessionCtx, - resolvedQueue, - followupRun: { ...followupRun, run }, - }; -} - -async function runReplyAgentWithBase(params: { - baseRun: ReturnType; - storePath: string; - sessionKey: string; - sessionEntry: SessionEntry; - commandBody: string; - typingMode?: "instant"; -}): Promise { - const runReplyAgent = await getRunReplyAgent(); - const { typing, sessionCtx, resolvedQueue, followupRun } = params.baseRun; - await runReplyAgent({ - commandBody: params.commandBody, - followupRun, - queueKey: params.sessionKey, - resolvedQueue, - shouldSteer: false, - shouldFollowup: false, - isActive: false, - isStreaming: false, - typing, - sessionCtx, - sessionEntry: params.sessionEntry, - sessionStore: { [params.sessionKey]: params.sessionEntry } as Record, - sessionKey: params.sessionKey, - storePath: params.storePath, - defaultModel: "anthropic/claude-opus-4-6", - agentCfgContextTokens: 100_000, - resolvedVerboseLevel: "off", - isNewSession: false, - blockStreamingEnabled: false, - resolvedBlockStreamingBreak: "message_end", - shouldInjectGroupIntro: false, - typingMode: params.typingMode ?? "instant", - }); -} - describe("runReplyAgent heartbeat followup guard", () => { it("drops heartbeat runs when another run is active", async () => { const { run, typing } = createMinimalRun({ @@ -1716,642 +1598,4 @@ describe("runReplyAgent typing (heartbeat)", () => { }); }); -describe("runReplyAgent memory flush", () => { - let fixtureRoot = ""; - let caseId = 0; - - async function withTempStore(fn: (storePath: string) => Promise): Promise { - const dir = path.join(fixtureRoot, `case-${++caseId}`); - await fs.mkdir(dir, { recursive: true }); - return await fn(path.join(dir, "sessions.json")); - } - - async function normalizeComparablePath(filePath: string): Promise { - const parent = await fs.realpath(path.dirname(filePath)).catch(() => path.dirname(filePath)); - return path.join(parent, path.basename(filePath)); - } - - beforeAll(async () => { - fixtureRoot = await fs.mkdtemp(path.join(tmpdir(), "openclaw-memory-flush-")); - }); - - afterAll(async () => { - if (fixtureRoot) { - await fs.rm(fixtureRoot, { recursive: true, force: true }); - } - }); - - it("skips memory flush for CLI providers", async () => { - await withTempStore(async (storePath) => { - const sessionKey = "main"; - const sessionEntry: SessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - state.runEmbeddedPiAgentMock.mockImplementation(async () => ({ - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - })); - const baseRun = createBaseRun({ - storePath, - sessionEntry, - runOverrides: { provider: "codex-cli" }, - }); - - await runReplyAgentWithBase({ - baseRun, - storePath, - sessionKey, - sessionEntry, - commandBody: "hello", - }); - - expect(state.runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - const call = state.runEmbeddedPiAgentMock.mock.calls[0]?.[0] as - | { prompt?: string } - | undefined; - expect(call?.prompt).toBe("hello"); - }); - }); - - it("runs preflight compaction when transcript-estimated tokens cross the threshold", async () => { - await withTempStore(async (storePath) => { - const sessionKey = "main"; - const sessionFile = "session-relative.jsonl"; - const workspaceDir = path.dirname(storePath); - const transcriptPath = path.join(path.dirname(storePath), sessionFile); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile( - transcriptPath, - `${JSON.stringify({ - message: { - role: "user", - content: "x".repeat(320_000), - timestamp: Date.now(), - }, - })}\n`, - "utf-8", - ); - await fs.writeFile( - path.join(workspaceDir, "AGENTS.md"), - [ - "## Session Startup", - "Read AGENTS.md before replying.", - "", - "## Red Lines", - "Never skip safety checks.", - ].join("\n"), - "utf-8", - ); - - const sessionEntry: SessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - sessionFile, - totalTokens: 10, - totalTokensFresh: false, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - state.compactEmbeddedPiSessionMock.mockResolvedValueOnce({ - ok: true, - compacted: true, - result: { - summary: "compacted", - firstKeptEntryId: "first-kept", - tokensBefore: 90_000, - tokensAfter: 8_000, - }, - }); - const calls: Array<{ prompt?: string; extraSystemPrompt?: string }> = []; - state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push({ - prompt: params.prompt, - extraSystemPrompt: params.extraSystemPrompt, - }); - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const baseRun = createBaseRun({ - storePath, - sessionEntry, - runOverrides: { sessionFile, workspaceDir }, - }); - - await runReplyAgentWithBase({ - baseRun, - storePath, - sessionKey, - sessionEntry, - commandBody: "hello", - }); - - expect(state.compactEmbeddedPiSessionMock).toHaveBeenCalledOnce(); - const compactionCall = state.compactEmbeddedPiSessionMock.mock.calls[0]?.[0] as - | { - sessionId?: string; - sessionKey?: string; - trigger?: string; - currentTokenCount?: number; - sessionFile?: string; - } - | undefined; - expect(compactionCall?.sessionId).toBe("session"); - expect(compactionCall?.sessionKey).toBe(sessionKey); - expect(compactionCall?.trigger).toBe("budget"); - expect(compactionCall?.currentTokenCount).toEqual(expect.any(Number)); - expect(await normalizeComparablePath(compactionCall?.sessionFile ?? "")).toBe( - await normalizeComparablePath(transcriptPath), - ); - expect(calls.map((call) => call.prompt)).toEqual(["hello"]); - expect(calls[0]?.extraSystemPrompt).toContain("Post-compaction context refresh"); - expect(calls[0]?.extraSystemPrompt).toContain("Read AGENTS.md before replying."); - - const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(stored[sessionKey].compactionCount).toBe(2); - }); - }); - - it("uses configured prompts for memory flush runs", async () => { - await withTempStore(async (storePath) => { - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array = []; - state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push(params); - if (params.prompt?.includes("Write notes.")) { - return { payloads: [], meta: {} }; - } - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const baseRun = createBaseRun({ - storePath, - sessionEntry, - config: { - agents: { - defaults: { - compaction: { - memoryFlush: { - prompt: "Write notes.", - systemPrompt: "Flush memory now.", - }, - }, - }, - }, - }, - runOverrides: { extraSystemPrompt: "extra system" }, - }); - - await runReplyAgentWithBase({ - baseRun, - storePath, - sessionKey, - sessionEntry, - commandBody: "hello", - }); - - const flushCall = calls[0]; - expect(flushCall?.prompt).toContain("Write notes."); - expect(flushCall?.prompt).toContain("NO_REPLY"); - expect(flushCall?.prompt).toMatch(/memory\/\d{4}-\d{2}-\d{2}\.md/); - expect(flushCall?.prompt).toContain("MEMORY.md"); - expect(flushCall?.memoryFlushWritePath).toMatch(/^memory\/\d{4}-\d{2}-\d{2}\.md$/); - expect(flushCall?.extraSystemPrompt).toContain("extra system"); - expect(flushCall?.extraSystemPrompt).toContain("Flush memory now."); - expect(flushCall?.extraSystemPrompt).toContain("NO_REPLY"); - expect(flushCall?.extraSystemPrompt).toContain("memory/YYYY-MM-DD.md"); - expect(flushCall?.extraSystemPrompt).toContain("MEMORY.md"); - expect(flushCall?.silentExpected).toBe(true); - expect(calls[1]?.prompt).toBe("hello"); - }); - }); - - it("passes stored bootstrap warning signatures to memory flush runs", async () => { - await withTempStore(async (storePath) => { - const sessionKey = "main"; - const sessionEntry: SessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - systemPromptReport: { - source: "run", - generatedAt: Date.now(), - systemPrompt: { - chars: 1, - projectContextChars: 0, - nonProjectContextChars: 1, - }, - injectedWorkspaceFiles: [], - skills: { - promptChars: 0, - entries: [], - }, - tools: { - listChars: 0, - schemaChars: 0, - entries: [], - }, - bootstrapTruncation: { - warningMode: "once", - warningShown: true, - promptWarningSignature: "sig-b", - warningSignaturesSeen: ["sig-a", "sig-b"], - truncatedFiles: 1, - nearLimitFiles: 0, - totalNearLimit: false, - }, - }, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array = []; - state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push(params); - if (params.prompt?.includes("Pre-compaction memory flush.")) { - return { payloads: [], meta: {} }; - } - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const baseRun = createBaseRun({ - storePath, - sessionEntry, - }); - - await runReplyAgentWithBase({ - baseRun, - storePath, - sessionKey, - sessionEntry, - commandBody: "hello", - }); - - expect(calls).toHaveLength(2); - expect(calls[0]?.bootstrapPromptWarningSignaturesSeen).toEqual(["sig-a", "sig-b"]); - expect(calls[0]?.bootstrapPromptWarningSignature).toBe("sig-b"); - }); - }); - - it("runs a memory flush turn and updates session metadata", async () => { - await withTempStore(async (storePath) => { - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array<{ - prompt?: string; - extraSystemPrompt?: string; - memoryFlushWritePath?: string; - sessionId?: string; - sessionFile?: string; - }> = []; - state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push({ - prompt: params.prompt, - extraSystemPrompt: params.extraSystemPrompt, - memoryFlushWritePath: params.memoryFlushWritePath, - sessionId: params.sessionId, - sessionFile: params.sessionFile, - }); - if (params.prompt?.includes("Pre-compaction memory flush.")) { - params.onAgentEvent?.({ - stream: "compaction", - data: { phase: "end", willRetry: false }, - }); - return { - payloads: [], - meta: { agentMeta: { sessionId: "session-rotated" } }, - }; - } - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const baseRun = createBaseRun({ - storePath, - sessionEntry, - }); - - await runReplyAgentWithBase({ - baseRun, - storePath, - sessionKey, - sessionEntry, - commandBody: "hello", - }); - - expect(calls).toHaveLength(2); - expect(calls[0]?.prompt).toContain("Pre-compaction memory flush."); - expect(calls[0]?.prompt).toContain("Current time:"); - expect(calls[0]?.prompt).toMatch(/memory\/\d{4}-\d{2}-\d{2}\.md/); - expect(calls[0]?.prompt).toContain("MEMORY.md"); - expect(calls[0]?.memoryFlushWritePath).toMatch(/^memory\/\d{4}-\d{2}-\d{2}\.md$/); - expect(calls[0]?.extraSystemPrompt).toContain("memory/YYYY-MM-DD.md"); - expect(calls[0]?.extraSystemPrompt).toContain("MEMORY.md"); - expect(calls[1]?.prompt).toBe("hello"); - expect(calls[1]?.sessionId).toBe("session-rotated"); - expect(await normalizeComparablePath(calls[1]?.sessionFile ?? "")).toBe( - await normalizeComparablePath(path.join(path.dirname(storePath), "session-rotated.jsonl")), - ); - expect(vi.mocked(refreshQueuedFollowupSession)).toHaveBeenCalledWith({ - key: sessionKey, - previousSessionId: "session", - nextSessionId: "session-rotated", - nextSessionFile: expect.stringContaining("session-rotated.jsonl"), - }); - - const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(stored[sessionKey].memoryFlushAt).toBeTypeOf("number"); - expect(stored[sessionKey].memoryFlushCompactionCount).toBe(2); - expect(stored[sessionKey].compactionCount).toBe(2); - expect(stored[sessionKey].sessionId).toBe("session-rotated"); - expect(await normalizeComparablePath(stored[sessionKey].sessionFile)).toBe( - await normalizeComparablePath(path.join(path.dirname(storePath), "session-rotated.jsonl")), - ); - }); - }); - - it("runs memory flush when transcript fallback uses a relative sessionFile path", async () => { - await withTempStore(async (storePath) => { - const sessionKey = "main"; - const sessionFile = "session-relative.jsonl"; - const transcriptPath = path.join(path.dirname(storePath), sessionFile); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile( - transcriptPath, - JSON.stringify({ usage: { input: 90_000, output: 8_000 } }), - "utf-8", - ); - - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - sessionFile, - totalTokens: 10, - totalTokensFresh: false, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array<{ prompt?: string }> = []; - state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push({ prompt: params.prompt }); - if (params.prompt?.includes("Pre-compaction memory flush.")) { - return { payloads: [], meta: {} }; - } - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const baseRun = createBaseRun({ - storePath, - sessionEntry, - runOverrides: { sessionFile }, - }); - - await runReplyAgentWithBase({ - baseRun, - storePath, - sessionKey, - sessionEntry, - commandBody: "hello", - }); - - expect(calls).toHaveLength(2); - expect(calls[0]?.prompt).toContain("Pre-compaction memory flush."); - expect(calls[0]?.prompt).toContain("Current time:"); - expect(calls[0]?.prompt).toMatch(/memory\/\d{4}-\d{2}-\d{2}\.md/); - expect(calls[1]?.prompt).toBe("hello"); - - const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(stored[sessionKey].memoryFlushAt).toBeTypeOf("number"); - }); - }); - - it("forces memory flush when transcript file exceeds configured byte threshold", async () => { - await withTempStore(async (storePath) => { - const sessionKey = "main"; - const sessionFile = "oversized-session.jsonl"; - const transcriptPath = path.join(path.dirname(storePath), sessionFile); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); - await fs.writeFile(transcriptPath, "x".repeat(3_000), "utf-8"); - - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - sessionFile, - totalTokens: 10, - totalTokensFresh: false, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array<{ prompt?: string }> = []; - state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push({ prompt: params.prompt }); - if (params.prompt?.includes("Pre-compaction memory flush.")) { - return { payloads: [], meta: {} }; - } - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const baseRun = createBaseRun({ - storePath, - sessionEntry, - config: { - agents: { - defaults: { - compaction: { - memoryFlush: { - forceFlushTranscriptBytes: 256, - }, - }, - }, - }, - }, - runOverrides: { sessionFile }, - }); - - await runReplyAgentWithBase({ - baseRun, - storePath, - sessionKey, - sessionEntry, - commandBody: "hello", - }); - - expect(calls).toHaveLength(2); - expect(calls[0]?.prompt).toContain("Pre-compaction memory flush."); - expect(calls[1]?.prompt).toBe("hello"); - }); - }); - - it("skips memory flush when disabled in config", async () => { - await withTempStore(async (storePath) => { - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - state.runEmbeddedPiAgentMock.mockImplementation(async () => ({ - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - })); - - const baseRun = createBaseRun({ - storePath, - sessionEntry, - config: { agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } } }, - }); - - await runReplyAgentWithBase({ - baseRun, - storePath, - sessionKey, - sessionEntry, - commandBody: "hello", - }); - - expect(state.runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - const call = state.runEmbeddedPiAgentMock.mock.calls[0]?.[0] as - | { prompt?: string } - | undefined; - expect(call?.prompt).toBe("hello"); - - const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(stored[sessionKey].memoryFlushAt).toBeUndefined(); - }); - }); - - it("skips memory flush after a prior flush in the same compaction cycle", async () => { - await withTempStore(async (storePath) => { - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 2, - memoryFlushCompactionCount: 2, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - const calls: Array<{ prompt?: string }> = []; - state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - calls.push({ prompt: params.prompt }); - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const baseRun = createBaseRun({ - storePath, - sessionEntry, - }); - - await runReplyAgentWithBase({ - baseRun, - storePath, - sessionKey, - sessionEntry, - commandBody: "hello", - }); - - expect(calls.map((call) => call.prompt)).toEqual(["hello"]); - }); - }); - - it("increments compaction count when flush compaction completes", async () => { - await withTempStore(async (storePath) => { - const sessionKey = "main"; - const sessionEntry = { - sessionId: "session", - updatedAt: Date.now(), - totalTokens: 80_000, - compactionCount: 1, - }; - - await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); - - state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { - if (params.prompt?.includes("Pre-compaction memory flush.")) { - params.onAgentEvent?.({ - stream: "compaction", - data: { phase: "end", willRetry: false }, - }); - return { payloads: [], meta: {} }; - } - return { - payloads: [{ text: "ok" }], - meta: { agentMeta: { usage: { input: 1, output: 1 } } }, - }; - }); - - const baseRun = createBaseRun({ - storePath, - sessionEntry, - }); - - await runReplyAgentWithBase({ - baseRun, - storePath, - sessionKey, - sessionEntry, - commandBody: "hello", - }); - - const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); - expect(stored[sessionKey].compactionCount).toBe(2); - expect(stored[sessionKey].memoryFlushCompactionCount).toBe(2); - }); - }); -}); import type { ReplyPayload } from "../types.js";