fix: prevent duplicate memory flush retry writes

This commit is contained in:
Josh Lehman
2026-03-21 09:49:00 -07:00
parent d5590bdc31
commit 80f972b5d3
3 changed files with 131 additions and 1 deletions

View File

@@ -120,6 +120,7 @@ Docs: https://docs.openclaw.ai
- Onboarding/custom providers: store Azure OpenAI and Azure AI Foundry custom endpoints with the Responses API config shape, normalized `/openai/v1` base URLs, and Azure-safe defaults so TUI and agent runs work after setup. (#49543) Thanks @kunalk16.
- Docker/live tests: mount external CLI auth homes into writable container copies, derive Codex OAuth expiry from JWT `exp`, refresh synced CLI creds instead of trusting stale cached expiry, and make gateway live probes wait on transcript output so `pnpm test:docker:all` stays green in Linux.
- Plugins/install precedence: keep bundled plugins ahead of auto-discovered globals by default, but let an explicitly installed plugin record win its own duplicate-id tie so installed channel plugins load from `~/.openclaw/extensions` after `openclaw plugins install`. (#46722) Thanks @Takhoffman.
- Agents/memory flush: keep transcript-hash dedup active across memory-flush fallback retries so a write-then-throw flush attempt cannot append duplicate `MEMORY.md` entries before the fallback cycle completes. (#34222) Thanks @lml2468.
- Control UI/logging: make browser-safe logger imports avoid eager temp-dir resolution so the bundled Control UI no longer crashes to a blank screen when logging reaches `tmp-openclaw-dir`. (#48469) Fixes #48062. Thanks @7inspire.
- Plugins/scoped ids: preserve scoped plugin ids during install and config keying, and keep bundled plugins ahead of discovered duplicate ids by default so `@scope/name` plugins no longer collide with unscoped installs. (#47413) Thanks @vincentkoc.
- Gateway/watch mode: restart on bundled-plugin package and manifest metadata changes, rebuild `dist` for extension source and `tsdown.config.ts` changes, and still ignore extension docs. (#47571) Thanks @gumadeiras.

View File

@@ -507,6 +507,7 @@ export async function runMemoryFlushIfNeeded(params: {
});
}
let memoryCompactionCompleted = false;
let fallbackFlushAttemptedForCurrentHash = false;
const memoryFlushNowMs = Date.now();
const memoryFlushWritePath = resolveMemoryFlushRelativePathForRun({
cfg: params.cfg,
@@ -523,6 +524,16 @@ export async function runMemoryFlushIfNeeded(params: {
...resolveModelFallbackOptions(params.followupRun.run),
runId: flushRunId,
run: async (provider, model, runOptions) => {
if (contextHashBeforeFlush && fallbackFlushAttemptedForCurrentHash) {
logVerbose(
`memoryFlush fallback candidate skipped (context hash already attempted): sessionKey=${params.sessionKey} hash=${contextHashBeforeFlush} provider=${provider} model=${model}`,
);
// A prior candidate already attempted this exact flush context. Be
// conservative and skip later candidates so a write-then-throw failure
// cannot append the same memory twice during a single fallback cycle.
return { payloads: [], meta: {} };
}
fallbackFlushAttemptedForCurrentHash = Boolean(contextHashBeforeFlush);
const { embeddedContext, senderContext, runBaseParams } = buildEmbeddedRunExecutionParams({
run: params.followupRun.run,
sessionCtx: params.sessionCtx,

View File

@@ -31,7 +31,10 @@ type EmbeddedRunParams = {
memoryFlushWritePath?: string;
bootstrapPromptWarningSignaturesSeen?: string[];
bootstrapPromptWarningSignature?: string;
onAgentEvent?: (evt: { stream?: string; data?: { phase?: string; willRetry?: boolean } }) => void;
onAgentEvent?: (evt: {
stream?: string;
data?: { phase?: string; willRetry?: boolean; completed?: boolean };
}) => void;
};
const state = vi.hoisted(() => ({
@@ -2032,6 +2035,121 @@ describe("runReplyAgent memory flush", () => {
});
});
it("skips duplicate memory writes across memory-flush fallback retries", async () => {
await withTempStore(async (storePath) => {
const sessionKey = "main";
const sessionFile = "session-relative.jsonl";
const fixtureDir = path.dirname(storePath);
const transcriptPath = path.join(fixtureDir, sessionFile);
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
await fs.writeFile(
transcriptPath,
[
JSON.stringify({ message: { role: "user", content: "Remember alpha." } }),
JSON.stringify({ message: { role: "assistant", content: "Stored alpha." } }),
].join("\n") + "\n",
"utf-8",
);
const sessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
sessionFile,
totalTokens: 80_000,
compactionCount: 1,
};
await seedSessionStore({ storePath, sessionKey, entry: sessionEntry });
let flushAttemptCount = 0;
let memoryFilePath: string | undefined;
const prompts: string[] = [];
state.runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => {
prompts.push(params.prompt ?? "");
if (params.prompt?.includes("Pre-compaction memory flush.")) {
flushAttemptCount += 1;
memoryFilePath = path.join(fixtureDir, params.memoryFlushWritePath ?? "memory/flush.md");
await fs.mkdir(path.dirname(memoryFilePath), { recursive: true });
await fs.appendFile(memoryFilePath, "remember alpha\n", "utf-8");
if (flushAttemptCount === 1) {
throw new Error("flush failed after write");
}
return { payloads: [], meta: {} };
}
return {
payloads: [{ text: "ok" }],
meta: { agentMeta: { usage: { input: 1, output: 1 } } },
};
});
const fallbackSpy = vi
.spyOn(modelFallbackModule, "runWithModelFallback")
.mockImplementationOnce(
async ({
provider,
model,
run,
}: {
provider: string;
model: string;
run: (provider: string, model: string) => Promise<unknown>;
}) => {
try {
await run(provider, model);
} catch {
// Simulate advancing to the next fallback candidate after the first
// memory flush attempt already wrote and then failed.
}
return {
result: await run("openai", "gpt-5.4"),
provider: "openai",
model: "gpt-5.4",
attempts: [
{
provider,
model,
error: "flush failed after write",
reason: "unknown",
},
],
};
},
);
try {
const baseRun = createBaseRun({
storePath,
sessionEntry,
runOverrides: {
sessionFile,
workspaceDir: fixtureDir,
},
});
await runReplyAgentWithBase({
baseRun,
storePath,
sessionKey,
sessionEntry,
commandBody: "hello",
});
} finally {
fallbackSpy.mockRestore();
}
expect(flushAttemptCount).toBe(1);
expect(
prompts.filter((prompt) => prompt.includes("Pre-compaction memory flush.")),
).toHaveLength(1);
expect(memoryFilePath).toBeDefined();
await expect(fs.readFile(memoryFilePath!, "utf-8")).resolves.toBe("remember alpha\n");
const stored = JSON.parse(await fs.readFile(storePath, "utf-8"));
expect(stored[sessionKey].memoryFlushAt).toBeTypeOf("number");
expect(stored[sessionKey].memoryFlushContextHash).toMatch(/^[0-9a-f]{16}$/);
});
});
it("increments compaction count when flush compaction completes", async () => {
await withTempStore(async (storePath) => {
const sessionKey = "main";