test: cover memory flush rotation in helper tests

This commit is contained in:
Peter Steinberger
2026-04-09 05:21:52 +01:00
parent b024fae9e5
commit 37625cff6f
2 changed files with 232 additions and 9 deletions

View File

@@ -0,0 +1,198 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../../config/sessions.js";
import {
clearMemoryPluginState,
registerMemoryFlushPlanResolver,
} from "../../plugins/memory-state.js";
import type { TemplateContext } from "../templating.js";
import { runMemoryFlushIfNeeded, setAgentRunnerMemoryTestDeps } from "./agent-runner-memory.js";
import type { FollowupRun } from "./queue.js";
const runWithModelFallbackMock = vi.fn();
const runEmbeddedPiAgentMock = vi.fn();
const refreshQueuedFollowupSessionMock = vi.fn();
const incrementCompactionCountMock = vi.fn();
function createReplyOperation() {
return {
abortSignal: new AbortController().signal,
setPhase: vi.fn(),
updateSessionId: vi.fn(),
} as never;
}
function createFollowupRun(overrides: Partial<FollowupRun["run"]> = {}): FollowupRun {
return {
prompt: "hello",
summaryLine: "hello",
enqueuedAt: Date.now(),
run: {
agentId: "main",
agentDir: "/tmp/agent",
sessionId: "session",
sessionKey: "main",
messageProvider: "whatsapp",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
config: {},
skillsSnapshot: {},
provider: "anthropic",
model: "claude",
thinkLevel: "low",
verboseLevel: "off",
elevatedLevel: "off",
bashElevated: { enabled: false, allowed: false, defaultLevel: "off" },
timeoutMs: 1_000,
blockReplyBreak: "message_end",
skipProviderRuntimeHints: true,
...overrides,
},
} as unknown as FollowupRun;
}
async function writeSessionStore(
storePath: string,
sessionKey: string,
entry: SessionEntry,
): Promise<void> {
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify({ [sessionKey]: entry }, null, 2), "utf8");
}
describe("runMemoryFlushIfNeeded", () => {
let rootDir = "";
beforeEach(async () => {
rootDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-memory-unit-"));
registerMemoryFlushPlanResolver(() => ({
softThresholdTokens: 4_000,
forceFlushTranscriptBytes: 1_000_000_000,
reserveTokensFloor: 20_000,
prompt: "Pre-compaction memory flush.\nNO_REPLY",
systemPrompt: "Write memory to memory/YYYY-MM-DD.md.",
relativePath: "memory/2023-11-14.md",
}));
runWithModelFallbackMock.mockReset().mockImplementation(async ({ provider, model, run }) => ({
result: await run(provider, model),
provider,
model,
attempts: [],
}));
runEmbeddedPiAgentMock.mockReset().mockResolvedValue({ payloads: [], meta: {} });
refreshQueuedFollowupSessionMock.mockReset();
incrementCompactionCountMock.mockReset().mockImplementation(async (params) => {
const sessionKey = String(params.sessionKey ?? "");
if (!sessionKey || !params.sessionStore?.[sessionKey]) {
return undefined;
}
const previous = params.sessionStore[sessionKey] as SessionEntry;
const nextEntry: SessionEntry = {
...previous,
compactionCount: (previous.compactionCount ?? 0) + 1,
};
if (typeof params.newSessionId === "string" && params.newSessionId) {
nextEntry.sessionId = params.newSessionId;
const storePath = typeof params.storePath === "string" ? params.storePath : rootDir;
nextEntry.sessionFile = path.join(path.dirname(storePath), `${params.newSessionId}.jsonl`);
}
params.sessionStore[sessionKey] = nextEntry;
if (typeof params.storePath === "string") {
await writeSessionStore(params.storePath, sessionKey, nextEntry);
}
return nextEntry.compactionCount;
});
setAgentRunnerMemoryTestDeps({
runWithModelFallback: runWithModelFallbackMock as never,
runEmbeddedPiAgent: runEmbeddedPiAgentMock as never,
refreshQueuedFollowupSession: refreshQueuedFollowupSessionMock as never,
incrementCompactionCount: incrementCompactionCountMock as never,
registerAgentRunContext: vi.fn() as never,
randomUUID: () => "00000000-0000-0000-0000-000000000001",
now: () => 1_700_000_000_000,
});
});
afterEach(async () => {
setAgentRunnerMemoryTestDeps();
clearMemoryPluginState();
await fs.rm(rootDir, { recursive: true, force: true });
});
it("runs a memory flush turn, rotates after compaction, and persists metadata", async () => {
const storePath = path.join(rootDir, "sessions.json");
const sessionKey = "main";
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
const sessionStore = { [sessionKey]: sessionEntry };
await writeSessionStore(storePath, sessionKey, sessionEntry);
runEmbeddedPiAgentMock.mockImplementationOnce(
async (params: {
onAgentEvent?: (evt: { stream: string; data: { phase: string } }) => void;
}) => {
params.onAgentEvent?.({ stream: "compaction", data: { phase: "end" } });
return {
payloads: [],
meta: { agentMeta: { sessionId: "session-rotated" } },
};
},
);
const followupRun = createFollowupRun();
const entry = await runMemoryFlushIfNeeded({
cfg: {
agents: {
defaults: {
compaction: {
memoryFlush: {},
},
},
},
},
followupRun,
sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext,
defaultModel: "anthropic/claude-opus-4-6",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
sessionEntry,
sessionStore,
sessionKey,
storePath,
isHeartbeat: false,
replyOperation: createReplyOperation(),
});
expect(entry?.sessionId).toBe("session-rotated");
expect(followupRun.run.sessionId).toBe("session-rotated");
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
const flushCall = runEmbeddedPiAgentMock.mock.calls[0]?.[0] as {
prompt?: string;
memoryFlushWritePath?: string;
silentExpected?: boolean;
};
expect(flushCall.prompt).toContain("Pre-compaction memory flush.");
expect(flushCall.memoryFlushWritePath).toMatch(/^memory\/\d{4}-\d{2}-\d{2}\.md$/);
expect(flushCall.silentExpected).toBe(true);
expect(refreshQueuedFollowupSessionMock).toHaveBeenCalledWith({
key: sessionKey,
previousSessionId: "session",
nextSessionId: "session-rotated",
nextSessionFile: expect.stringContaining("session-rotated.jsonl"),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
main: SessionEntry;
};
expect(persisted.main.sessionId).toBe("session-rotated");
expect(persisted.main.compactionCount).toBe(2);
expect(persisted.main.memoryFlushCompactionCount).toBe(2);
expect(persisted.main.memoryFlushAt).toBe(1_700_000_000_000);
});
});

View File

@@ -45,6 +45,31 @@ import { refreshQueuedFollowupSession, type FollowupRun } from "./queue.js";
import type { ReplyOperation } from "./reply-run-registry.js";
import { incrementCompactionCount } from "./session-updates.js";
const memoryDeps = {
runWithModelFallback,
runEmbeddedPiAgent,
registerAgentRunContext,
refreshQueuedFollowupSession,
incrementCompactionCount,
updateSessionStoreEntry,
randomUUID: () => crypto.randomUUID(),
now: () => Date.now(),
};
export function setAgentRunnerMemoryTestDeps(overrides?: Partial<typeof memoryDeps>): void {
Object.assign(memoryDeps, {
runWithModelFallback,
runEmbeddedPiAgent,
registerAgentRunContext,
refreshQueuedFollowupSession,
incrementCompactionCount,
updateSessionStoreEntry,
randomUUID: () => crypto.randomUUID(),
now: () => Date.now(),
...overrides,
});
}
export function estimatePromptTokensForMemoryFlush(prompt?: string): number | undefined {
const trimmed = normalizeOptionalString(prompt);
if (!trimmed) {
@@ -663,15 +688,15 @@ export async function runMemoryFlushIfNeeded(params: {
activeSessionEntry?.systemPromptReport ??
(params.sessionKey ? activeSessionStore?.[params.sessionKey]?.systemPromptReport : undefined),
);
const flushRunId = crypto.randomUUID();
const flushRunId = memoryDeps.randomUUID();
if (params.sessionKey) {
registerAgentRunContext(flushRunId, {
memoryDeps.registerAgentRunContext(flushRunId, {
sessionKey: params.sessionKey,
verboseLevel: params.resolvedVerboseLevel,
});
}
let memoryCompactionCompleted = false;
const memoryFlushNowMs = Date.now();
const memoryFlushNowMs = memoryDeps.now();
const activeMemoryFlushPlan =
resolveMemoryFlushPlan({
cfg: params.cfg,
@@ -686,7 +711,7 @@ export async function runMemoryFlushIfNeeded(params: {
.join("\n\n");
let postCompactionSessionId: string | undefined;
try {
await runWithModelFallback({
await memoryDeps.runWithModelFallback({
...resolveModelFallbackOptions(params.followupRun.run),
runId: flushRunId,
run: async (provider, model, runOptions) => {
@@ -699,7 +724,7 @@ export async function runMemoryFlushIfNeeded(params: {
runId: flushRunId,
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
});
const result = await runEmbeddedPiAgent({
const result = await memoryDeps.runEmbeddedPiAgent({
...embeddedContext,
...senderContext,
...runBaseParams,
@@ -738,7 +763,7 @@ export async function runMemoryFlushIfNeeded(params: {
0;
if (memoryCompactionCompleted) {
const previousSessionId = activeSessionEntry?.sessionId ?? params.followupRun.run.sessionId;
const nextCount = await incrementCompactionCount({
const nextCount = await memoryDeps.incrementCompactionCount({
cfg: params.cfg,
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
@@ -756,7 +781,7 @@ export async function runMemoryFlushIfNeeded(params: {
}
const queueKey = params.followupRun.run.sessionKey ?? params.sessionKey;
if (queueKey) {
refreshQueuedFollowupSession({
memoryDeps.refreshQueuedFollowupSession({
key: queueKey,
previousSessionId,
nextSessionId: updatedEntry.sessionId,
@@ -770,11 +795,11 @@ export async function runMemoryFlushIfNeeded(params: {
}
if (params.storePath && params.sessionKey) {
try {
const updatedEntry = await updateSessionStoreEntry({
const updatedEntry = await memoryDeps.updateSessionStoreEntry({
storePath: params.storePath,
sessionKey: params.sessionKey,
update: async () => ({
memoryFlushAt: Date.now(),
memoryFlushAt: memoryDeps.now(),
memoryFlushCompactionCount,
}),
});