diff --git a/src/auto-reply/reply/agent-runner-memory.test.ts b/src/auto-reply/reply/agent-runner-memory.test.ts index ddcdf542cfd..a6dbc53dabe 100644 --- a/src/auto-reply/reply/agent-runner-memory.test.ts +++ b/src/auto-reply/reply/agent-runner-memory.test.ts @@ -10,6 +10,7 @@ import { type MemoryFlushPlanResolver, } from "../../plugins/memory-state.js"; import type { TemplateContext } from "../templating.js"; +import type { ReplyPayload } from "../types.js"; import { runMemoryFlushIfNeeded, runPreflightCompactionIfNeeded, @@ -25,6 +26,8 @@ const refreshQueuedFollowupSessionMock = vi.fn(); const incrementCompactionCountMock = vi.fn(); const ensureSelectedAgentHarnessPluginMock = vi.fn(); const ensureMemoryFlushTargetFileMock = vi.fn(); +const emitAgentEventMock = vi.fn(); +const TEST_MAX_FLUSH_FAILURES = 3; function registerMemoryFlushPlanResolverForTest(resolver: MemoryFlushPlanResolver): void { registerMemoryCapability("memory-core", { flushPlanResolver: resolver }); @@ -171,6 +174,7 @@ describe("runMemoryFlushIfNeeded", () => { refreshQueuedFollowupSessionMock.mockReset(); ensureMemoryFlushTargetFileMock.mockReset().mockResolvedValue(undefined); ensureSelectedAgentHarnessPluginMock.mockReset().mockResolvedValue(undefined); + emitAgentEventMock.mockReset(); incrementCompactionCountMock.mockReset().mockImplementation(async (params) => { const sessionKey = String(params.sessionKey ?? ""); if (!sessionKey || !params.sessionStore?.[sessionKey]) { @@ -208,6 +212,7 @@ describe("runMemoryFlushIfNeeded", () => { incrementCompactionCount: incrementCompactionCountMock as never, ensureSelectedAgentHarnessPlugin: ensureSelectedAgentHarnessPluginMock as never, registerAgentRunContext: vi.fn() as never, + emitAgentEvent: emitAgentEventMock as never, randomUUID: () => "00000000-0000-0000-0000-000000000001", now: () => 1_700_000_000_000, }); @@ -489,6 +494,236 @@ describe("runMemoryFlushIfNeeded", () => { expect(visibleErrorPayloads).toEqual([]); }); + it("increments memoryFlushFailureCount on non-abort flush failure", async () => { + const storePath = path.join(rootDir, "sessions.json"); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + await writeTestSessionStore(storePath, "main", sessionEntry); + runWithModelFallbackMock.mockRejectedValueOnce(new Error("provider crashed during flush")); + + await runMemoryFlushIfNeeded({ + cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, + followupRun: createTestFollowupRun(), + sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext, + defaultModel: "anthropic/claude-opus-4-7", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + sessionEntry, + sessionStore: { main: sessionEntry }, + sessionKey: "main", + storePath, + isHeartbeat: false, + replyOperation: createReplyOperation(), + }); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { main: SessionEntry }; + expect(persisted.main.memoryFlushFailureCount).toBe(1); + expect(persisted.main.memoryFlushLastFailedAt).toBe(1_700_000_000_000); + expect(persisted.main.memoryFlushLastFailureError).toContain("provider crashed during flush"); + expect(emitAgentEventMock).toHaveBeenCalledWith( + expect.objectContaining({ + stream: "lifecycle", + data: expect.objectContaining({ + phase: "memory_flush_failed", + attempt: 1, + maxAttempts: TEST_MAX_FLUSH_FAILURES, + }), + }), + ); + }); + + it("does not track failure on abort error", async () => { + const storePath = path.join(rootDir, "sessions.json"); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + memoryFlushFailureCount: 0, + }; + await writeTestSessionStore(storePath, "main", sessionEntry); + const abortErr = new Error("operation aborted by user"); + abortErr.name = "AbortError"; + runWithModelFallbackMock.mockRejectedValueOnce(abortErr); + + await runMemoryFlushIfNeeded({ + cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, + followupRun: createTestFollowupRun(), + sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext, + defaultModel: "anthropic/claude-opus-4-7", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + sessionEntry, + sessionStore: { main: sessionEntry }, + sessionKey: "main", + storePath, + isHeartbeat: false, + replyOperation: createReplyOperation(), + }); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { main: SessionEntry }; + expect(persisted.main.memoryFlushFailureCount).toBe(0); + expect(persisted.main.memoryFlushLastFailedAt).toBeUndefined(); + expect(persisted.main.memoryFlushLastFailureError).toBeUndefined(); + }); + + it("clears failure counters on successful flush", async () => { + const storePath = path.join(rootDir, "sessions.json"); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + memoryFlushFailureCount: 2, + memoryFlushLastFailedAt: 1_699_999_999_000, + memoryFlushLastFailureError: "provider crashed during flush", + }; + await writeTestSessionStore(storePath, "main", sessionEntry); + + await runMemoryFlushIfNeeded({ + cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, + followupRun: createTestFollowupRun(), + sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext, + defaultModel: "anthropic/claude-opus-4-7", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + sessionEntry, + sessionStore: { main: sessionEntry }, + sessionKey: "main", + storePath, + isHeartbeat: false, + replyOperation: createReplyOperation(), + }); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { main: SessionEntry }; + expect(persisted.main.memoryFlushFailureCount).toBe(0); + expect(persisted.main.memoryFlushLastFailedAt).toBeUndefined(); + expect(persisted.main.memoryFlushLastFailureError).toBeUndefined(); + }); + + it("marks flush as completed after MAX_FLUSH_FAILURES to break retry loop", async () => { + const storePath = path.join(rootDir, "sessions.json"); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + memoryFlushFailureCount: TEST_MAX_FLUSH_FAILURES - 1, + }; + await writeTestSessionStore(storePath, "main", sessionEntry); + runWithModelFallbackMock.mockRejectedValueOnce(new Error("provider crashed during flush")); + + const visibleErrorPayloads: ReplyPayload[] = []; + await runMemoryFlushIfNeeded({ + cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, + followupRun: createTestFollowupRun(), + sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext, + defaultModel: "anthropic/claude-opus-4-7", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + sessionEntry, + sessionStore: { main: sessionEntry }, + sessionKey: "main", + storePath, + isHeartbeat: false, + replyOperation: createReplyOperation(), + onVisibleErrorPayloads: (payloads) => { + visibleErrorPayloads.push(...payloads); + }, + }); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { main: SessionEntry }; + expect(persisted.main.memoryFlushCompactionCount).toBe(1); + expect(persisted.main.memoryFlushFailureCount).toBe(TEST_MAX_FLUSH_FAILURES); + expect(emitAgentEventMock).toHaveBeenCalledWith( + expect.objectContaining({ + stream: "lifecycle", + data: expect.objectContaining({ + phase: "memory_flush_exhausted", + attempt: TEST_MAX_FLUSH_FAILURES, + maxAttempts: TEST_MAX_FLUSH_FAILURES, + }), + }), + ); + expect(visibleErrorPayloads[0]).toEqual( + expect.objectContaining({ + text: expect.stringContaining("skipping for this cycle"), + isError: true, + }), + ); + }); + + it("retries flush on subsequent messages until MAX_FLUSH_FAILURES", async () => { + const storePath = path.join(rootDir, "sessions.json"); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + await writeTestSessionStore(storePath, "main", sessionEntry); + runWithModelFallbackMock.mockRejectedValue(new Error("provider crashed during flush")); + + const params = { + cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, + followupRun: createTestFollowupRun(), + sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext, + defaultModel: "anthropic/claude-opus-4-7", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off" as const, + sessionEntry, + sessionStore: { main: sessionEntry }, + sessionKey: "main", + storePath, + isHeartbeat: false, + replyOperation: createReplyOperation(), + }; + + await runMemoryFlushIfNeeded(params); + await runMemoryFlushIfNeeded({ ...params, replyOperation: createReplyOperation() }); + + expect(runWithModelFallbackMock).toHaveBeenCalledTimes(2); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { main: SessionEntry }; + expect(persisted.main.memoryFlushFailureCount).toBe(2); + }); + + it("next message retries flush after failure", async () => { + const storePath = path.join(rootDir, "sessions.json"); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + await writeTestSessionStore(storePath, "main", sessionEntry); + runWithModelFallbackMock.mockRejectedValueOnce(new Error("provider crashed during flush")); + + const params = { + cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, + followupRun: createTestFollowupRun(), + sessionCtx: { Provider: "whatsapp" } as unknown as TemplateContext, + defaultModel: "anthropic/claude-opus-4-7", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off" as const, + sessionEntry, + sessionStore: { main: sessionEntry }, + sessionKey: "main", + storePath, + isHeartbeat: false, + replyOperation: createReplyOperation(), + }; + + await runMemoryFlushIfNeeded(params); + await runMemoryFlushIfNeeded({ ...params, replyOperation: createReplyOperation() }); + + expect(runWithModelFallbackMock).toHaveBeenCalledTimes(2); + }); + it("runs memory flush on the configured maintenance model without active fallbacks", async () => { registerMemoryFlushPlanResolverForTest(() => ({ softThresholdTokens: 4_000, diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index 6e653ff169f..08edcd4b373 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -37,7 +37,7 @@ import { import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js"; import { logVerbose } from "../../globals.js"; -import { registerAgentRunContext } from "../../infra/agent-events.js"; +import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js"; import { formatErrorMessage } from "../../infra/errors.js"; import { isAbortError } from "../../infra/unhandled-rejections.js"; import { resolveMemoryFlushPlan } from "../../plugins/memory-state.js"; @@ -67,6 +67,8 @@ import { incrementCompactionCount } from "./session-updates.js"; type EmbeddedAgentRuntime = typeof import("../../agents/embedded-agent.js"); const MAX_VISIBLE_MEMORY_FLUSH_ERROR_CHARS = 600; +const MAX_FLUSH_FAILURES = 3; +const MAX_FLUSH_ERROR_LENGTH = 200; const embeddedAgentRuntimeLoader = createLazyImportLoader( () => import("../../agents/embedded-agent.js"), @@ -126,6 +128,7 @@ const memoryDeps = { refreshQueuedFollowupSession, incrementCompactionCount, updateSessionStoreEntry, + emitAgentEvent, randomUUID: () => crypto.randomUUID(), now: () => Date.now(), }; @@ -141,6 +144,7 @@ export function setAgentRunnerMemoryTestDeps(overrides?: Partial crypto.randomUUID(), now: () => Date.now(), ...overrides, @@ -322,6 +326,13 @@ function buildMemoryFlushErrorPayload(err: unknown): ReplyPayload | undefined { }; } +function truncateMemoryFlushErrorMessage(err: unknown): string { + const message = normalizeOptionalString(formatErrorMessage(err)) || String(err); + return message.length > MAX_FLUSH_ERROR_LENGTH + ? `${message.slice(0, MAX_FLUSH_ERROR_LENGTH - 1)}…` + : message; +} + export type SessionTranscriptUsageSnapshot = { promptTokens?: number; outputTokens?: number; @@ -1327,6 +1338,9 @@ export async function runMemoryFlushIfNeeded(params: { update: async () => ({ memoryFlushAt: memoryDeps.now(), memoryFlushCompactionCount: flushedCompactionCount, + memoryFlushFailureCount: 0, + memoryFlushLastFailedAt: undefined, + memoryFlushLastFailureError: undefined, }), }); if (updatedEntry) { @@ -1342,7 +1356,87 @@ export async function runMemoryFlushIfNeeded(params: { } } } catch (err) { - logVerbose(`memory flush run failed: ${String(err)}`); + const truncatedError = truncateMemoryFlushErrorMessage(err); + if (!isAbortError(err) && params.storePath && params.sessionKey) { + try { + const failedAt = memoryDeps.now(); + const failedEntry = await memoryDeps.updateSessionStoreEntry({ + storePath: params.storePath, + sessionKey: params.sessionKey, + skipMaintenance: true, + takeCacheOwnership: true, + update: async (entry) => ({ + memoryFlushFailureCount: Math.max(0, entry.memoryFlushFailureCount ?? 0) + 1, + memoryFlushLastFailedAt: failedAt, + memoryFlushLastFailureError: truncatedError, + }), + }); + if (failedEntry) { + activeSessionEntry = failedEntry; + if (activeSessionStore) { + activeSessionStore[params.sessionKey] = failedEntry; + } + } + const failureCount = Math.max(0, failedEntry?.memoryFlushFailureCount ?? 0); + logVerbose( + `memory flush failed (attempt ${failureCount}/${MAX_FLUSH_FAILURES}): ${truncatedError}`, + ); + memoryDeps.emitAgentEvent({ + runId: flushRunId, + stream: "lifecycle", + sessionKey: params.sessionKey, + sessionId: activeSessionEntry?.sessionId, + data: { + phase: "memory_flush_failed", + attempt: failureCount, + maxAttempts: MAX_FLUSH_FAILURES, + error: truncatedError, + }, + }); + if (failedEntry && failureCount >= MAX_FLUSH_FAILURES) { + logVerbose( + `memory flush exhausted: skipping flush for this compaction cycle after ${failureCount} consecutive failures`, + ); + memoryDeps.emitAgentEvent({ + runId: flushRunId, + stream: "lifecycle", + sessionKey: params.sessionKey, + sessionId: failedEntry.sessionId, + data: { + phase: "memory_flush_exhausted", + attempt: failureCount, + maxAttempts: MAX_FLUSH_FAILURES, + }, + }); + const exhaustedEntry = await memoryDeps.updateSessionStoreEntry({ + storePath: params.storePath, + sessionKey: params.sessionKey, + skipMaintenance: true, + takeCacheOwnership: true, + update: async (entry) => ({ + memoryFlushAt: memoryDeps.now(), + memoryFlushCompactionCount: entry.compactionCount ?? 0, + }), + }); + if (exhaustedEntry) { + activeSessionEntry = exhaustedEntry; + if (activeSessionStore) { + activeSessionStore[params.sessionKey] = exhaustedEntry; + } + } + params.onVisibleErrorPayloads?.([ + { + text: `⚠️ Memory flush failed after ${MAX_FLUSH_FAILURES} attempts; skipping for this cycle. It will retry after the next compaction.`, + isError: true, + }, + ]); + } + } catch (persistErr) { + logVerbose(`failed to persist memory flush failure metadata: ${String(persistErr)}`); + } + } else { + logVerbose(`memory flush run failed: ${String(err)}`); + } const visibleErrorPayload = buildMemoryFlushErrorPayload(err); if (visibleErrorPayload) { params.onVisibleErrorPayloads?.([visibleErrorPayload]); diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 7522145a956..bdff055e417 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -376,6 +376,12 @@ export type SessionEntry = { memoryFlushAt?: number; memoryFlushCompactionCount?: number; memoryFlushContextHash?: string; + /** Consecutive memory flush failures since the last successful flush. */ + memoryFlushFailureCount?: number; + /** Timestamp (ms) of the last failed memory flush attempt. */ + memoryFlushLastFailedAt?: number; + /** Last memory flush failure error message, truncated for durable metadata. */ + memoryFlushLastFailureError?: string; cliSessionIds?: Record; cliSessionBindings?: Record; claudeCliSessionId?: string; diff --git a/src/plugins/session-entry-slot-keys.ts b/src/plugins/session-entry-slot-keys.ts index aa39a0a2e07..ff010b1f8e1 100644 --- a/src/plugins/session-entry-slot-keys.ts +++ b/src/plugins/session-entry-slot-keys.ts @@ -101,6 +101,9 @@ const SESSION_ENTRY_RESERVED_SLOT_KEY_LIST = [ "memoryFlushAt", "memoryFlushCompactionCount", "memoryFlushContextHash", + "memoryFlushFailureCount", + "memoryFlushLastFailedAt", + "memoryFlushLastFailureError", "cliSessionIds", "cliSessionBindings", "claudeCliSessionId",