fix(auto-reply): track memory flush failure exhaustion

Add durable memoryFlush failure metadata and lifecycle events so provider failures during memory flush no longer leave a session with no recorded recovery state.

After three consecutive non-abort flush failures, mark the current compaction cycle as exhausted so later messages can proceed without deleting transcript history. Successful flushes clear the failure metadata, and plugin session-entry slot reservations now protect the new fields.

Release-note: memoryFlush sessions can now fail open after repeated provider-side flush failures instead of retrying indefinitely before normal replies.

Refs #85645

Co-authored-by: 忻役 <xinyi@mininglamp.com>
This commit is contained in:
Jerry-Xin
2026-06-01 00:47:12 +08:00
committed by GitHub
parent 5bce222b0c
commit 4e84d0eaa5
4 changed files with 340 additions and 2 deletions

View File

@@ -10,6 +10,7 @@ import {
type MemoryFlushPlanResolver,
} from "../../plugins/memory-state.js";
import type { TemplateContext } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import {
runMemoryFlushIfNeeded,
runPreflightCompactionIfNeeded,
@@ -25,6 +26,8 @@ const refreshQueuedFollowupSessionMock = vi.fn();
const incrementCompactionCountMock = vi.fn();
const ensureSelectedAgentHarnessPluginMock = vi.fn();
const ensureMemoryFlushTargetFileMock = vi.fn();
const emitAgentEventMock = vi.fn();
const TEST_MAX_FLUSH_FAILURES = 3;
function registerMemoryFlushPlanResolverForTest(resolver: MemoryFlushPlanResolver): void {
registerMemoryCapability("memory-core", { flushPlanResolver: resolver });
@@ -171,6 +174,7 @@ describe("runMemoryFlushIfNeeded", () => {
refreshQueuedFollowupSessionMock.mockReset();
ensureMemoryFlushTargetFileMock.mockReset().mockResolvedValue(undefined);
ensureSelectedAgentHarnessPluginMock.mockReset().mockResolvedValue(undefined);
emitAgentEventMock.mockReset();
incrementCompactionCountMock.mockReset().mockImplementation(async (params) => {
const sessionKey = String(params.sessionKey ?? "");
if (!sessionKey || !params.sessionStore?.[sessionKey]) {
@@ -208,6 +212,7 @@ describe("runMemoryFlushIfNeeded", () => {
incrementCompactionCount: incrementCompactionCountMock as never,
ensureSelectedAgentHarnessPlugin: ensureSelectedAgentHarnessPluginMock as never,
registerAgentRunContext: vi.fn() as never,
emitAgentEvent: emitAgentEventMock as never,
randomUUID: () => "00000000-0000-0000-0000-000000000001",
now: () => 1_700_000_000_000,
});
@@ -489,6 +494,236 @@ describe("runMemoryFlushIfNeeded", () => {
expect(visibleErrorPayloads).toEqual([]);
});
it("increments memoryFlushFailureCount on non-abort flush failure", async () => {
const storePath = path.join(rootDir, "sessions.json");
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await writeTestSessionStore(storePath, "main", sessionEntry);
runWithModelFallbackMock.mockRejectedValueOnce(new Error("provider crashed during flush"));
await runMemoryFlushIfNeeded({
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun(),
sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext,
defaultModel: "anthropic/claude-opus-4-7",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
sessionEntry,
sessionStore: { main: sessionEntry },
sessionKey: "main",
storePath,
isHeartbeat: false,
replyOperation: createReplyOperation(),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { main: SessionEntry };
expect(persisted.main.memoryFlushFailureCount).toBe(1);
expect(persisted.main.memoryFlushLastFailedAt).toBe(1_700_000_000_000);
expect(persisted.main.memoryFlushLastFailureError).toContain("provider crashed during flush");
expect(emitAgentEventMock).toHaveBeenCalledWith(
expect.objectContaining({
stream: "lifecycle",
data: expect.objectContaining({
phase: "memory_flush_failed",
attempt: 1,
maxAttempts: TEST_MAX_FLUSH_FAILURES,
}),
}),
);
});
it("does not track failure on abort error", async () => {
const storePath = path.join(rootDir, "sessions.json");
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
memoryFlushFailureCount: 0,
};
await writeTestSessionStore(storePath, "main", sessionEntry);
const abortErr = new Error("operation aborted by user");
abortErr.name = "AbortError";
runWithModelFallbackMock.mockRejectedValueOnce(abortErr);
await runMemoryFlushIfNeeded({
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun(),
sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext,
defaultModel: "anthropic/claude-opus-4-7",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
sessionEntry,
sessionStore: { main: sessionEntry },
sessionKey: "main",
storePath,
isHeartbeat: false,
replyOperation: createReplyOperation(),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { main: SessionEntry };
expect(persisted.main.memoryFlushFailureCount).toBe(0);
expect(persisted.main.memoryFlushLastFailedAt).toBeUndefined();
expect(persisted.main.memoryFlushLastFailureError).toBeUndefined();
});
it("clears failure counters on successful flush", async () => {
const storePath = path.join(rootDir, "sessions.json");
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
memoryFlushFailureCount: 2,
memoryFlushLastFailedAt: 1_699_999_999_000,
memoryFlushLastFailureError: "provider crashed during flush",
};
await writeTestSessionStore(storePath, "main", sessionEntry);
await runMemoryFlushIfNeeded({
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun(),
sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext,
defaultModel: "anthropic/claude-opus-4-7",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
sessionEntry,
sessionStore: { main: sessionEntry },
sessionKey: "main",
storePath,
isHeartbeat: false,
replyOperation: createReplyOperation(),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { main: SessionEntry };
expect(persisted.main.memoryFlushFailureCount).toBe(0);
expect(persisted.main.memoryFlushLastFailedAt).toBeUndefined();
expect(persisted.main.memoryFlushLastFailureError).toBeUndefined();
});
it("marks flush as completed after MAX_FLUSH_FAILURES to break retry loop", async () => {
const storePath = path.join(rootDir, "sessions.json");
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
memoryFlushFailureCount: TEST_MAX_FLUSH_FAILURES - 1,
};
await writeTestSessionStore(storePath, "main", sessionEntry);
runWithModelFallbackMock.mockRejectedValueOnce(new Error("provider crashed during flush"));
const visibleErrorPayloads: ReplyPayload[] = [];
await runMemoryFlushIfNeeded({
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun(),
sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext,
defaultModel: "anthropic/claude-opus-4-7",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off",
sessionEntry,
sessionStore: { main: sessionEntry },
sessionKey: "main",
storePath,
isHeartbeat: false,
replyOperation: createReplyOperation(),
onVisibleErrorPayloads: (payloads) => {
visibleErrorPayloads.push(...payloads);
},
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { main: SessionEntry };
expect(persisted.main.memoryFlushCompactionCount).toBe(1);
expect(persisted.main.memoryFlushFailureCount).toBe(TEST_MAX_FLUSH_FAILURES);
expect(emitAgentEventMock).toHaveBeenCalledWith(
expect.objectContaining({
stream: "lifecycle",
data: expect.objectContaining({
phase: "memory_flush_exhausted",
attempt: TEST_MAX_FLUSH_FAILURES,
maxAttempts: TEST_MAX_FLUSH_FAILURES,
}),
}),
);
expect(visibleErrorPayloads[0]).toEqual(
expect.objectContaining({
text: expect.stringContaining("skipping for this cycle"),
isError: true,
}),
);
});
it("retries flush on subsequent messages until MAX_FLUSH_FAILURES", async () => {
const storePath = path.join(rootDir, "sessions.json");
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await writeTestSessionStore(storePath, "main", sessionEntry);
runWithModelFallbackMock.mockRejectedValue(new Error("provider crashed during flush"));
const params = {
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun(),
sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext,
defaultModel: "anthropic/claude-opus-4-7",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off" as const,
sessionEntry,
sessionStore: { main: sessionEntry },
sessionKey: "main",
storePath,
isHeartbeat: false,
replyOperation: createReplyOperation(),
};
await runMemoryFlushIfNeeded(params);
await runMemoryFlushIfNeeded({ ...params, replyOperation: createReplyOperation() });
expect(runWithModelFallbackMock).toHaveBeenCalledTimes(2);
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { main: SessionEntry };
expect(persisted.main.memoryFlushFailureCount).toBe(2);
});
it("next message retries flush after failure", async () => {
const storePath = path.join(rootDir, "sessions.json");
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
totalTokens: 80_000,
compactionCount: 1,
};
await writeTestSessionStore(storePath, "main", sessionEntry);
runWithModelFallbackMock.mockRejectedValueOnce(new Error("provider crashed during flush"));
const params = {
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun(),
sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext,
defaultModel: "anthropic/claude-opus-4-7",
agentCfgContextTokens: 100_000,
resolvedVerboseLevel: "off" as const,
sessionEntry,
sessionStore: { main: sessionEntry },
sessionKey: "main",
storePath,
isHeartbeat: false,
replyOperation: createReplyOperation(),
};
await runMemoryFlushIfNeeded(params);
await runMemoryFlushIfNeeded({ ...params, replyOperation: createReplyOperation() });
expect(runWithModelFallbackMock).toHaveBeenCalledTimes(2);
});
it("runs memory flush on the configured maintenance model without active fallbacks", async () => {
registerMemoryFlushPlanResolverForTest(() => ({
softThresholdTokens: 4_000,

View File

@@ -37,7 +37,7 @@ import {
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
import { formatErrorMessage } from "../../infra/errors.js";
import { isAbortError } from "../../infra/unhandled-rejections.js";
import { resolveMemoryFlushPlan } from "../../plugins/memory-state.js";
@@ -67,6 +67,8 @@ import { incrementCompactionCount } from "./session-updates.js";
type EmbeddedAgentRuntime = typeof import("../../agents/embedded-agent.js");
const MAX_VISIBLE_MEMORY_FLUSH_ERROR_CHARS = 600;
const MAX_FLUSH_FAILURES = 3;
const MAX_FLUSH_ERROR_LENGTH = 200;
const embeddedAgentRuntimeLoader = createLazyImportLoader<EmbeddedAgentRuntime>(
() => import("../../agents/embedded-agent.js"),
@@ -126,6 +128,7 @@ const memoryDeps = {
refreshQueuedFollowupSession,
incrementCompactionCount,
updateSessionStoreEntry,
emitAgentEvent,
randomUUID: () => crypto.randomUUID(),
now: () => Date.now(),
};
@@ -141,6 +144,7 @@ export function setAgentRunnerMemoryTestDeps(overrides?: Partial<typeof memoryDe
refreshQueuedFollowupSession,
incrementCompactionCount,
updateSessionStoreEntry,
emitAgentEvent,
randomUUID: () => crypto.randomUUID(),
now: () => Date.now(),
...overrides,
@@ -322,6 +326,13 @@ function buildMemoryFlushErrorPayload(err: unknown): ReplyPayload | undefined {
};
}
function truncateMemoryFlushErrorMessage(err: unknown): string {
const message = normalizeOptionalString(formatErrorMessage(err)) || String(err);
return message.length > MAX_FLUSH_ERROR_LENGTH
? `${message.slice(0, MAX_FLUSH_ERROR_LENGTH - 1)}`
: message;
}
export type SessionTranscriptUsageSnapshot = {
promptTokens?: number;
outputTokens?: number;
@@ -1327,6 +1338,9 @@ export async function runMemoryFlushIfNeeded(params: {
update: async () => ({
memoryFlushAt: memoryDeps.now(),
memoryFlushCompactionCount: flushedCompactionCount,
memoryFlushFailureCount: 0,
memoryFlushLastFailedAt: undefined,
memoryFlushLastFailureError: undefined,
}),
});
if (updatedEntry) {
@@ -1342,7 +1356,87 @@ export async function runMemoryFlushIfNeeded(params: {
}
}
} catch (err) {
logVerbose(`memory flush run failed: ${String(err)}`);
const truncatedError = truncateMemoryFlushErrorMessage(err);
if (!isAbortError(err) && params.storePath && params.sessionKey) {
try {
const failedAt = memoryDeps.now();
const failedEntry = await memoryDeps.updateSessionStoreEntry({
storePath: params.storePath,
sessionKey: params.sessionKey,
skipMaintenance: true,
takeCacheOwnership: true,
update: async (entry) => ({
memoryFlushFailureCount: Math.max(0, entry.memoryFlushFailureCount ?? 0) + 1,
memoryFlushLastFailedAt: failedAt,
memoryFlushLastFailureError: truncatedError,
}),
});
if (failedEntry) {
activeSessionEntry = failedEntry;
if (activeSessionStore) {
activeSessionStore[params.sessionKey] = failedEntry;
}
}
const failureCount = Math.max(0, failedEntry?.memoryFlushFailureCount ?? 0);
logVerbose(
`memory flush failed (attempt ${failureCount}/${MAX_FLUSH_FAILURES}): ${truncatedError}`,
);
memoryDeps.emitAgentEvent({
runId: flushRunId,
stream: "lifecycle",
sessionKey: params.sessionKey,
sessionId: activeSessionEntry?.sessionId,
data: {
phase: "memory_flush_failed",
attempt: failureCount,
maxAttempts: MAX_FLUSH_FAILURES,
error: truncatedError,
},
});
if (failedEntry && failureCount >= MAX_FLUSH_FAILURES) {
logVerbose(
`memory flush exhausted: skipping flush for this compaction cycle after ${failureCount} consecutive failures`,
);
memoryDeps.emitAgentEvent({
runId: flushRunId,
stream: "lifecycle",
sessionKey: params.sessionKey,
sessionId: failedEntry.sessionId,
data: {
phase: "memory_flush_exhausted",
attempt: failureCount,
maxAttempts: MAX_FLUSH_FAILURES,
},
});
const exhaustedEntry = await memoryDeps.updateSessionStoreEntry({
storePath: params.storePath,
sessionKey: params.sessionKey,
skipMaintenance: true,
takeCacheOwnership: true,
update: async (entry) => ({
memoryFlushAt: memoryDeps.now(),
memoryFlushCompactionCount: entry.compactionCount ?? 0,
}),
});
if (exhaustedEntry) {
activeSessionEntry = exhaustedEntry;
if (activeSessionStore) {
activeSessionStore[params.sessionKey] = exhaustedEntry;
}
}
params.onVisibleErrorPayloads?.([
{
text: `⚠️ Memory flush failed after ${MAX_FLUSH_FAILURES} attempts; skipping for this cycle. It will retry after the next compaction.`,
isError: true,
},
]);
}
} catch (persistErr) {
logVerbose(`failed to persist memory flush failure metadata: ${String(persistErr)}`);
}
} else {
logVerbose(`memory flush run failed: ${String(err)}`);
}
const visibleErrorPayload = buildMemoryFlushErrorPayload(err);
if (visibleErrorPayload) {
params.onVisibleErrorPayloads?.([visibleErrorPayload]);

View File

@@ -376,6 +376,12 @@ export type SessionEntry = {
memoryFlushAt?: number;
memoryFlushCompactionCount?: number;
memoryFlushContextHash?: string;
/** Consecutive memory flush failures since the last successful flush. */
memoryFlushFailureCount?: number;
/** Timestamp (ms) of the last failed memory flush attempt. */
memoryFlushLastFailedAt?: number;
/** Last memory flush failure error message, truncated for durable metadata. */
memoryFlushLastFailureError?: string;
cliSessionIds?: Record<string, string>;
cliSessionBindings?: Record<string, CliSessionBinding>;
claudeCliSessionId?: string;

View File

@@ -101,6 +101,9 @@ const SESSION_ENTRY_RESERVED_SLOT_KEY_LIST = [
"memoryFlushAt",
"memoryFlushCompactionCount",
"memoryFlushContextHash",
"memoryFlushFailureCount",
"memoryFlushLastFailedAt",
"memoryFlushLastFailureError",
"cliSessionIds",
"cliSessionBindings",
"claudeCliSessionId",