From b04e42812eca1752ef5e8213346ae5cda7779d9b Mon Sep 17 00:00:00 2001 From: Frank Yang Date: Fri, 15 May 2026 18:12:29 +0800 Subject: [PATCH] fix(memory): stop watcher write-polling fd pressure (#81802) Merged via squash. Prepared head SHA: 623874619b8b32815095a25a901e3ecc2c8b301f Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com> Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com> Reviewed-by: @frankekn --- CHANGELOG.md | 8 ++ .../src/memory/manager-sync-ops.ts | 30 ++++-- .../src/memory/manager.watcher-config.test.ts | 39 ++++++- .../src/memory/qmd-manager.test.ts | 59 +++++++++- .../memory-core/src/memory/qmd-manager.ts | 31 ++++-- .../memory-core/src/memory/watch-settle.ts | 102 ++++++++++++++++++ 6 files changed, 251 insertions(+), 18 deletions(-) create mode 100644 extensions/memory-core/src/memory/watch-settle.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index ef008ee2209..5cece31dfa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ Docs: https://docs.openclaw.ai +## Unreleased + +### Changes + +### Fixes + +- Memory search: stop using chokidar write-stability polling for memory and QMD watchers so large Markdown extraPath trees no longer build up regular file descriptors; changed files now settle through the existing debounced sync queue. Fixes #77327 and #78224. (#81802) Thanks @frankekn, @loyur, and @JanPlessow. + ## 2026.5.14 ### Changes diff --git a/extensions/memory-core/src/memory/manager-sync-ops.ts b/extensions/memory-core/src/memory/manager-sync-ops.ts index 3e7a660cac4..72b2b58c074 100644 --- a/extensions/memory-core/src/memory/manager-sync-ops.ts +++ b/extensions/memory-core/src/memory/manager-sync-ops.ts @@ -59,6 +59,12 @@ import { resolveMemorySourceExistingHash, } from "./manager-source-state.js"; import { runMemoryTargetedSessionSync } from "./manager-targeted-sync.js"; +import { + recordMemoryWatchEventPath, + settleMemoryWatchEventPaths, + type MemoryWatchEventStats, + type MemoryWatchSettleQueue, +} from "./watch-settle.js"; type MemorySyncProgressState = { completed: number; @@ -192,6 +198,7 @@ export abstract class MemoryManagerSyncOps { protected intervalTimer: NodeJS.Timeout | null = null; protected closed = false; protected dirty = false; + protected pendingWatchPaths: MemoryWatchSettleQueue = new Map(); protected sessionsDirty = false; protected sessionsDirtyFiles = new Set(); protected sessionPendingFiles = new Set(); @@ -450,12 +457,9 @@ export abstract class MemoryManagerSyncOps { ignoreInitial: true, ignored: (watchPath, stats) => shouldIgnoreMemoryWatchPath(watchPath, stats, this.settings.multimodal), - awaitWriteFinish: { - stabilityThreshold: this.settings.sync.watchDebounceMs, - pollInterval: 100, - }, }); - const markDirty = () => { + const markDirty = (watchPath?: string, stats?: MemoryWatchEventStats) => { + recordMemoryWatchEventPath(this.pendingWatchPaths, watchPath, stats); this.dirty = true; this.scheduleWatchSync(); }; @@ -709,7 +713,21 @@ export abstract class MemoryManagerSyncOps { } this.watchTimer = setTimeout(() => { this.watchTimer = null; - runDetachedMemorySync(() => this.sync({ reason: "watch" }), "watch"); + runDetachedMemorySync(async () => { + if (this.closed) { + return; + } + if (!(await settleMemoryWatchEventPaths(this.pendingWatchPaths))) { + if (!this.closed) { + this.scheduleWatchSync(); + } + return; + } + if (this.closed) { + return; + } + await this.sync({ reason: "watch" }); + }, "watch"); }, this.settings.sync.watchDebounceMs); } diff --git a/extensions/memory-core/src/memory/manager.watcher-config.test.ts b/extensions/memory-core/src/memory/manager.watcher-config.test.ts index 5f083409a41..aa9c71c3e51 100644 --- a/extensions/memory-core/src/memory/manager.watcher-config.test.ts +++ b/extensions/memory-core/src/memory/manager.watcher-config.test.ts @@ -11,7 +11,7 @@ type WatchIgnoredFn = (watchPath: string, stats?: { isDirectory?: () => boolean const { createdWatchers, memoryLoggerWarn, watchMock } = vi.hoisted(() => { type WatchEvent = "add" | "change" | "unlink" | "unlinkDir" | "error"; - type WatchCallback = (value?: unknown) => void; + type WatchCallback = (...args: unknown[]) => void; function createMockWatcher() { const handlers = new Map(); const watcher = { @@ -20,9 +20,9 @@ const { createdWatchers, memoryLoggerWarn, watchMock } = vi.hoisted(() => { return watcher; }), close: vi.fn(async () => undefined), - emit: (event: WatchEvent, value?: unknown) => { + emit: (event: WatchEvent, ...args: unknown[]) => { for (const callback of handlers.get(event) ?? []) { - callback(value); + callback(...args); } }, }; @@ -172,7 +172,7 @@ describe("memory watcher config", () => { ]); expect(watchedPaths.filter((watchedPath) => watchedPath.includes("*"))).toEqual([]); expect(options.ignoreInitial).toBe(true); - expect(options.awaitWriteFinish).toEqual({ stabilityThreshold: 25, pollInterval: 100 }); + expect(options).not.toHaveProperty("awaitWriteFinish"); const ignored = options.ignored as WatchIgnoredFn | undefined; expect(ignored).toBeTypeOf("function"); @@ -261,6 +261,37 @@ describe("memory watcher config", () => { }, ); + it("settles changed file stats before running watch sync", async () => { + await setupWatcherWorkspace({ name: "notes.md", contents: "hello" }); + const cfg = createWatcherConfig(); + + await expectWatcherManager(cfg); + vi.useFakeTimers(); + const notesPath = path.join(extraDir, "notes.md"); + const initialStats = await fs.stat(notesPath); + const syncSpy = vi + .spyOn( + manager as unknown as { + sync: (params?: { reason?: string }) => Promise; + }, + "sync", + ) + .mockResolvedValue(undefined); + + createdWatchers[0]?.emit("change", notesPath, { + size: initialStats.size, + mtimeMs: initialStats.mtimeMs, + isDirectory: () => false, + }); + await fs.writeFile(notesPath, "hello updated"); + + await vi.advanceTimersByTimeAsync(25); + expect(syncSpy).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(25); + expect(syncSpy).toHaveBeenCalledWith({ reason: "watch" }); + }); + it("attaches a logging non-throwing watcher error listener", async () => { await setupWatcherWorkspace({ name: "notes.md", contents: "hello" }); const cfg = createWatcherConfig(); diff --git a/extensions/memory-core/src/memory/qmd-manager.test.ts b/extensions/memory-core/src/memory/qmd-manager.test.ts index 4afdf560342..ab3ba09721c 100644 --- a/extensions/memory-core/src/memory/qmd-manager.test.ts +++ b/extensions/memory-core/src/memory/qmd-manager.test.ts @@ -493,6 +493,7 @@ describe("QmdMemoryManager", () => { const initialUpdateCalls = spawnMock.mock.calls.filter((call) => call[1]?.[0] === "update"); expect(initialUpdateCalls).toHaveLength(0); const watchOptions = firstWatchOptions(); + expect(watchOptions).not.toHaveProperty("awaitWriteFinish"); expect(watchOptions.ignored?.(path.join(workspaceDir, "node_modules", "pkg", "note.md"))).toBe( true, ); @@ -502,7 +503,14 @@ describe("QmdMemoryManager", () => { expect(watchOptions.ignored?.(path.join(workspaceDir, "build", "note.md"))).toBe(true); expect(watchOptions.ignored?.(path.join(workspaceDir, "notes.md"))).toBe(false); - watcher.emit("change", path.join(workspaceDir, "notes.md")); + const notesPath = path.join(workspaceDir, "notes.md"); + await fs.writeFile(notesPath, "hello"); + const initialStats = await fs.stat(notesPath); + watcher.emit("change", notesPath, { + size: initialStats.size, + mtimeMs: initialStats.mtimeMs, + isDirectory: () => false, + }); expect(manager.status().dirty).toBe(true); await vi.advanceTimersByTimeAsync(25); @@ -514,6 +522,55 @@ describe("QmdMemoryManager", () => { await manager.close(); }); + it("delays qmd watch sync until changed file stats settle", async () => { + vi.useFakeTimers(); + cfg = { + agents: { + defaults: { + workspace: workspaceDir, + memorySearch: { + provider: "openai", + model: "mock-embed", + store: { path: path.join(workspaceDir, "index.sqlite"), vector: { enabled: false } }, + sync: { watch: true, watchDebounceMs: 25, onSessionStart: false, onSearch: false }, + }, + }, + list: [{ id: agentId, default: true, workspace: workspaceDir }], + }, + memory: { + backend: "qmd", + qmd: { + includeDefaultMemory: false, + update: { interval: "0s", debounceMs: 0, onBoot: false }, + paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }], + }, + }, + } as OpenClawConfig; + + const notesPath = path.join(workspaceDir, "notes.md"); + await fs.writeFile(notesPath, "hello"); + const initialStats = await fs.stat(notesPath); + const { manager } = await createManager({ mode: "full" }); + const watcher = watchMock.mock.results[0]?.value as { + emit: (event: string, ...args: unknown[]) => boolean; + }; + + watcher.emit("change", notesPath, { + size: initialStats.size, + mtimeMs: initialStats.mtimeMs, + isDirectory: () => false, + }); + await fs.writeFile(notesPath, "hello updated"); + + await vi.advanceTimersByTimeAsync(25); + expect(spawnMock.mock.calls.filter((call) => call[1]?.[0] === "update")).toHaveLength(0); + + await vi.advanceTimersByTimeAsync(25); + expect(spawnMock.mock.calls.filter((call) => call[1]?.[0] === "update")).toHaveLength(1); + + await manager.close(); + }); + it("runs boot update in background by default", async () => { cfg = { ...cfg, diff --git a/extensions/memory-core/src/memory/qmd-manager.ts b/extensions/memory-core/src/memory/qmd-manager.ts index 928e5cf445e..3944854c51a 100644 --- a/extensions/memory-core/src/memory/qmd-manager.ts +++ b/extensions/memory-core/src/memory/qmd-manager.ts @@ -55,6 +55,12 @@ import { } from "openclaw/plugin-sdk/string-coerce-runtime"; import { asRecord } from "../dreaming-shared.js"; import { resolveQmdCollectionPatternFlags, type QmdCollectionPatternFlag } from "./qmd-compat.js"; +import { + recordMemoryWatchEventPath, + settleMemoryWatchEventPaths, + type MemoryWatchEventStats, + type MemoryWatchSettleQueue, +} from "./watch-settle.js"; type SqliteDatabase = import("node:sqlite").DatabaseSync; @@ -62,7 +68,6 @@ const log = createSubsystemLogger("memory"); const SNIPPET_HEADER_RE = /@@\s*-([0-9]+),([0-9]+)/; const SEARCH_PENDING_UPDATE_WAIT_MS = 500; -const QMD_WATCH_STABILITY_MS = 200; const MAX_QMD_OUTPUT_CHARS = 200_000; const NUL_MARKER_RE = /(?:\^@|\\0|\\x00|\\u0000|null\s*byte|nul\s*byte)/i; const QMD_EMBED_BACKOFF_BASE_MS = 60_000; @@ -324,6 +329,7 @@ export class QmdMemoryManager implements MemorySearchManager { private embedTimer: NodeJS.Timeout | null = null; private watcher: FSWatcher | null = null; private watchTimer: NodeJS.Timeout | null = null; + private readonly pendingWatchPaths: MemoryWatchSettleQueue = new Map(); private pendingUpdate: Promise | null = null; private queuedForcedUpdate: Promise | null = null; private queuedForcedRuns = 0; @@ -1560,12 +1566,9 @@ export class QmdMemoryManager implements MemorySearchManager { this.watcher = chokidar.watch(watchPathList, { ignoreInitial: true, ignored: (watchPath) => shouldIgnoreMemoryWatchPath(watchPath), - awaitWriteFinish: { - stabilityThreshold: QMD_WATCH_STABILITY_MS, - pollInterval: 100, - }, }); - const markDirty = () => { + const markDirty = (watchPath?: string, stats?: MemoryWatchEventStats) => { + recordMemoryWatchEventPath(this.pendingWatchPaths, watchPath, stats); this.dirty = true; this.scheduleWatchSync(); }; @@ -1592,7 +1595,21 @@ export class QmdMemoryManager implements MemorySearchManager { } this.watchTimer = setTimeout(() => { this.watchTimer = null; - void this.sync({ reason: "watch" }).catch((err) => { + void (async () => { + if (this.closed) { + return; + } + if (!(await settleMemoryWatchEventPaths(this.pendingWatchPaths))) { + if (!this.closed) { + this.scheduleWatchSync(); + } + return; + } + if (this.closed) { + return; + } + await this.sync({ reason: "watch" }); + })().catch((err) => { log.warn(`qmd watch sync failed: ${String(err)}`); }); }, this.syncSettings.watchDebounceMs); diff --git a/extensions/memory-core/src/memory/watch-settle.ts b/extensions/memory-core/src/memory/watch-settle.ts new file mode 100644 index 00000000000..effb802e228 --- /dev/null +++ b/extensions/memory-core/src/memory/watch-settle.ts @@ -0,0 +1,102 @@ +import fsSync from "node:fs"; +import path from "node:path"; + +export type MemoryWatchEventStats = { + isDirectory?: () => boolean; + size?: number; + mtimeMs?: number; +}; + +type WatchPathSnapshot = { + size: number; + mtimeMs: number; +}; + +export type MemoryWatchSettleQueue = Map; + +const MEMORY_WATCH_SETTLE_RECHECK_MS = 100; + +function snapshotFromStats(stats?: MemoryWatchEventStats): WatchPathSnapshot | null { + if (!stats || stats.isDirectory?.()) { + return null; + } + if (typeof stats.size !== "number" || typeof stats.mtimeMs !== "number") { + return null; + } + return { size: stats.size, mtimeMs: stats.mtimeMs }; +} + +function snapshotsMatch(left: WatchPathSnapshot | null, right: WatchPathSnapshot | null): boolean { + if (left === null || right === null) { + return left === right; + } + return left.size === right.size && left.mtimeMs === right.mtimeMs; +} + +function snapshotPath(filePath: string): WatchPathSnapshot | null { + try { + const stats = fsSync.statSync(filePath); + if (stats.isDirectory()) { + return null; + } + return { size: stats.size, mtimeMs: stats.mtimeMs }; + } catch { + return null; + } +} + +async function delay(ms: number): Promise { + await new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +export function recordMemoryWatchEventPath( + queue: MemoryWatchSettleQueue, + watchPath?: string, + stats?: MemoryWatchEventStats, +): void { + if (!watchPath) { + return; + } + const trimmed = watchPath.trim(); + if (!trimmed) { + return; + } + queue.set(path.resolve(trimmed), snapshotFromStats(stats)); +} + +export async function settleMemoryWatchEventPaths(queue: MemoryWatchSettleQueue): Promise { + if (queue.size === 0) { + return true; + } + + const entries = Array.from(queue.entries()); + queue.clear(); + const missingBaseline: Array<{ filePath: string; snapshot: WatchPathSnapshot }> = []; + + for (const [filePath, previousSnapshot] of entries) { + const currentSnapshot = snapshotPath(filePath); + if (previousSnapshot === null) { + if (currentSnapshot !== null) { + missingBaseline.push({ filePath, snapshot: currentSnapshot }); + } + continue; + } + if (!snapshotsMatch(previousSnapshot, currentSnapshot)) { + queue.set(filePath, currentSnapshot); + } + } + + if (missingBaseline.length > 0) { + await delay(MEMORY_WATCH_SETTLE_RECHECK_MS); + for (const entry of missingBaseline) { + const currentSnapshot = snapshotPath(entry.filePath); + if (!snapshotsMatch(entry.snapshot, currentSnapshot)) { + queue.set(entry.filePath, currentSnapshot); + } + } + } + + return queue.size === 0; +}