mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 20:34:45 +00:00
fix(memory): stop watcher write-polling fd pressure (#81802)
Merged via squash.
Prepared head SHA: 623874619b
Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com>
Co-authored-by: frankekn <4488090+frankekn@users.noreply.github.com>
Reviewed-by: @frankekn
This commit is contained in:
@@ -2,6 +2,14 @@
|
||||
|
||||
Docs: https://docs.openclaw.ai
|
||||
|
||||
## Unreleased
|
||||
|
||||
### Changes
|
||||
|
||||
### Fixes
|
||||
|
||||
- Memory search: stop using chokidar write-stability polling for memory and QMD watchers so large Markdown extraPath trees no longer build up regular file descriptors; changed files now settle through the existing debounced sync queue. Fixes #77327 and #78224. (#81802) Thanks @frankekn, @loyur, and @JanPlessow.
|
||||
|
||||
## 2026.5.14
|
||||
|
||||
### Changes
|
||||
|
||||
@@ -59,6 +59,12 @@ import {
|
||||
resolveMemorySourceExistingHash,
|
||||
} from "./manager-source-state.js";
|
||||
import { runMemoryTargetedSessionSync } from "./manager-targeted-sync.js";
|
||||
import {
|
||||
recordMemoryWatchEventPath,
|
||||
settleMemoryWatchEventPaths,
|
||||
type MemoryWatchEventStats,
|
||||
type MemoryWatchSettleQueue,
|
||||
} from "./watch-settle.js";
|
||||
|
||||
type MemorySyncProgressState = {
|
||||
completed: number;
|
||||
@@ -192,6 +198,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
protected intervalTimer: NodeJS.Timeout | null = null;
|
||||
protected closed = false;
|
||||
protected dirty = false;
|
||||
protected pendingWatchPaths: MemoryWatchSettleQueue = new Map();
|
||||
protected sessionsDirty = false;
|
||||
protected sessionsDirtyFiles = new Set<string>();
|
||||
protected sessionPendingFiles = new Set<string>();
|
||||
@@ -450,12 +457,9 @@ export abstract class MemoryManagerSyncOps {
|
||||
ignoreInitial: true,
|
||||
ignored: (watchPath, stats) =>
|
||||
shouldIgnoreMemoryWatchPath(watchPath, stats, this.settings.multimodal),
|
||||
awaitWriteFinish: {
|
||||
stabilityThreshold: this.settings.sync.watchDebounceMs,
|
||||
pollInterval: 100,
|
||||
},
|
||||
});
|
||||
const markDirty = () => {
|
||||
const markDirty = (watchPath?: string, stats?: MemoryWatchEventStats) => {
|
||||
recordMemoryWatchEventPath(this.pendingWatchPaths, watchPath, stats);
|
||||
this.dirty = true;
|
||||
this.scheduleWatchSync();
|
||||
};
|
||||
@@ -709,7 +713,21 @@ export abstract class MemoryManagerSyncOps {
|
||||
}
|
||||
this.watchTimer = setTimeout(() => {
|
||||
this.watchTimer = null;
|
||||
runDetachedMemorySync(() => this.sync({ reason: "watch" }), "watch");
|
||||
runDetachedMemorySync(async () => {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
if (!(await settleMemoryWatchEventPaths(this.pendingWatchPaths))) {
|
||||
if (!this.closed) {
|
||||
this.scheduleWatchSync();
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
await this.sync({ reason: "watch" });
|
||||
}, "watch");
|
||||
}, this.settings.sync.watchDebounceMs);
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ type WatchIgnoredFn = (watchPath: string, stats?: { isDirectory?: () => boolean
|
||||
|
||||
const { createdWatchers, memoryLoggerWarn, watchMock } = vi.hoisted(() => {
|
||||
type WatchEvent = "add" | "change" | "unlink" | "unlinkDir" | "error";
|
||||
type WatchCallback = (value?: unknown) => void;
|
||||
type WatchCallback = (...args: unknown[]) => void;
|
||||
function createMockWatcher() {
|
||||
const handlers = new Map<WatchEvent, WatchCallback[]>();
|
||||
const watcher = {
|
||||
@@ -20,9 +20,9 @@ const { createdWatchers, memoryLoggerWarn, watchMock } = vi.hoisted(() => {
|
||||
return watcher;
|
||||
}),
|
||||
close: vi.fn(async () => undefined),
|
||||
emit: (event: WatchEvent, value?: unknown) => {
|
||||
emit: (event: WatchEvent, ...args: unknown[]) => {
|
||||
for (const callback of handlers.get(event) ?? []) {
|
||||
callback(value);
|
||||
callback(...args);
|
||||
}
|
||||
},
|
||||
};
|
||||
@@ -172,7 +172,7 @@ describe("memory watcher config", () => {
|
||||
]);
|
||||
expect(watchedPaths.filter((watchedPath) => watchedPath.includes("*"))).toEqual([]);
|
||||
expect(options.ignoreInitial).toBe(true);
|
||||
expect(options.awaitWriteFinish).toEqual({ stabilityThreshold: 25, pollInterval: 100 });
|
||||
expect(options).not.toHaveProperty("awaitWriteFinish");
|
||||
|
||||
const ignored = options.ignored as WatchIgnoredFn | undefined;
|
||||
expect(ignored).toBeTypeOf("function");
|
||||
@@ -261,6 +261,37 @@ describe("memory watcher config", () => {
|
||||
},
|
||||
);
|
||||
|
||||
it("settles changed file stats before running watch sync", async () => {
|
||||
await setupWatcherWorkspace({ name: "notes.md", contents: "hello" });
|
||||
const cfg = createWatcherConfig();
|
||||
|
||||
await expectWatcherManager(cfg);
|
||||
vi.useFakeTimers();
|
||||
const notesPath = path.join(extraDir, "notes.md");
|
||||
const initialStats = await fs.stat(notesPath);
|
||||
const syncSpy = vi
|
||||
.spyOn(
|
||||
manager as unknown as {
|
||||
sync: (params?: { reason?: string }) => Promise<void>;
|
||||
},
|
||||
"sync",
|
||||
)
|
||||
.mockResolvedValue(undefined);
|
||||
|
||||
createdWatchers[0]?.emit("change", notesPath, {
|
||||
size: initialStats.size,
|
||||
mtimeMs: initialStats.mtimeMs,
|
||||
isDirectory: () => false,
|
||||
});
|
||||
await fs.writeFile(notesPath, "hello updated");
|
||||
|
||||
await vi.advanceTimersByTimeAsync(25);
|
||||
expect(syncSpy).not.toHaveBeenCalled();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(25);
|
||||
expect(syncSpy).toHaveBeenCalledWith({ reason: "watch" });
|
||||
});
|
||||
|
||||
it("attaches a logging non-throwing watcher error listener", async () => {
|
||||
await setupWatcherWorkspace({ name: "notes.md", contents: "hello" });
|
||||
const cfg = createWatcherConfig();
|
||||
|
||||
@@ -493,6 +493,7 @@ describe("QmdMemoryManager", () => {
|
||||
const initialUpdateCalls = spawnMock.mock.calls.filter((call) => call[1]?.[0] === "update");
|
||||
expect(initialUpdateCalls).toHaveLength(0);
|
||||
const watchOptions = firstWatchOptions();
|
||||
expect(watchOptions).not.toHaveProperty("awaitWriteFinish");
|
||||
expect(watchOptions.ignored?.(path.join(workspaceDir, "node_modules", "pkg", "note.md"))).toBe(
|
||||
true,
|
||||
);
|
||||
@@ -502,7 +503,14 @@ describe("QmdMemoryManager", () => {
|
||||
expect(watchOptions.ignored?.(path.join(workspaceDir, "build", "note.md"))).toBe(true);
|
||||
expect(watchOptions.ignored?.(path.join(workspaceDir, "notes.md"))).toBe(false);
|
||||
|
||||
watcher.emit("change", path.join(workspaceDir, "notes.md"));
|
||||
const notesPath = path.join(workspaceDir, "notes.md");
|
||||
await fs.writeFile(notesPath, "hello");
|
||||
const initialStats = await fs.stat(notesPath);
|
||||
watcher.emit("change", notesPath, {
|
||||
size: initialStats.size,
|
||||
mtimeMs: initialStats.mtimeMs,
|
||||
isDirectory: () => false,
|
||||
});
|
||||
expect(manager.status().dirty).toBe(true);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(25);
|
||||
@@ -514,6 +522,55 @@ describe("QmdMemoryManager", () => {
|
||||
await manager.close();
|
||||
});
|
||||
|
||||
it("delays qmd watch sync until changed file stats settle", async () => {
|
||||
vi.useFakeTimers();
|
||||
cfg = {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
memorySearch: {
|
||||
provider: "openai",
|
||||
model: "mock-embed",
|
||||
store: { path: path.join(workspaceDir, "index.sqlite"), vector: { enabled: false } },
|
||||
sync: { watch: true, watchDebounceMs: 25, onSessionStart: false, onSearch: false },
|
||||
},
|
||||
},
|
||||
list: [{ id: agentId, default: true, workspace: workspaceDir }],
|
||||
},
|
||||
memory: {
|
||||
backend: "qmd",
|
||||
qmd: {
|
||||
includeDefaultMemory: false,
|
||||
update: { interval: "0s", debounceMs: 0, onBoot: false },
|
||||
paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }],
|
||||
},
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
|
||||
const notesPath = path.join(workspaceDir, "notes.md");
|
||||
await fs.writeFile(notesPath, "hello");
|
||||
const initialStats = await fs.stat(notesPath);
|
||||
const { manager } = await createManager({ mode: "full" });
|
||||
const watcher = watchMock.mock.results[0]?.value as {
|
||||
emit: (event: string, ...args: unknown[]) => boolean;
|
||||
};
|
||||
|
||||
watcher.emit("change", notesPath, {
|
||||
size: initialStats.size,
|
||||
mtimeMs: initialStats.mtimeMs,
|
||||
isDirectory: () => false,
|
||||
});
|
||||
await fs.writeFile(notesPath, "hello updated");
|
||||
|
||||
await vi.advanceTimersByTimeAsync(25);
|
||||
expect(spawnMock.mock.calls.filter((call) => call[1]?.[0] === "update")).toHaveLength(0);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(25);
|
||||
expect(spawnMock.mock.calls.filter((call) => call[1]?.[0] === "update")).toHaveLength(1);
|
||||
|
||||
await manager.close();
|
||||
});
|
||||
|
||||
it("runs boot update in background by default", async () => {
|
||||
cfg = {
|
||||
...cfg,
|
||||
|
||||
@@ -55,6 +55,12 @@ import {
|
||||
} from "openclaw/plugin-sdk/string-coerce-runtime";
|
||||
import { asRecord } from "../dreaming-shared.js";
|
||||
import { resolveQmdCollectionPatternFlags, type QmdCollectionPatternFlag } from "./qmd-compat.js";
|
||||
import {
|
||||
recordMemoryWatchEventPath,
|
||||
settleMemoryWatchEventPaths,
|
||||
type MemoryWatchEventStats,
|
||||
type MemoryWatchSettleQueue,
|
||||
} from "./watch-settle.js";
|
||||
|
||||
type SqliteDatabase = import("node:sqlite").DatabaseSync;
|
||||
|
||||
@@ -62,7 +68,6 @@ const log = createSubsystemLogger("memory");
|
||||
|
||||
const SNIPPET_HEADER_RE = /@@\s*-([0-9]+),([0-9]+)/;
|
||||
const SEARCH_PENDING_UPDATE_WAIT_MS = 500;
|
||||
const QMD_WATCH_STABILITY_MS = 200;
|
||||
const MAX_QMD_OUTPUT_CHARS = 200_000;
|
||||
const NUL_MARKER_RE = /(?:\^@|\\0|\\x00|\\u0000|null\s*byte|nul\s*byte)/i;
|
||||
const QMD_EMBED_BACKOFF_BASE_MS = 60_000;
|
||||
@@ -324,6 +329,7 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
private embedTimer: NodeJS.Timeout | null = null;
|
||||
private watcher: FSWatcher | null = null;
|
||||
private watchTimer: NodeJS.Timeout | null = null;
|
||||
private readonly pendingWatchPaths: MemoryWatchSettleQueue = new Map();
|
||||
private pendingUpdate: Promise<void> | null = null;
|
||||
private queuedForcedUpdate: Promise<void> | null = null;
|
||||
private queuedForcedRuns = 0;
|
||||
@@ -1560,12 +1566,9 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
this.watcher = chokidar.watch(watchPathList, {
|
||||
ignoreInitial: true,
|
||||
ignored: (watchPath) => shouldIgnoreMemoryWatchPath(watchPath),
|
||||
awaitWriteFinish: {
|
||||
stabilityThreshold: QMD_WATCH_STABILITY_MS,
|
||||
pollInterval: 100,
|
||||
},
|
||||
});
|
||||
const markDirty = () => {
|
||||
const markDirty = (watchPath?: string, stats?: MemoryWatchEventStats) => {
|
||||
recordMemoryWatchEventPath(this.pendingWatchPaths, watchPath, stats);
|
||||
this.dirty = true;
|
||||
this.scheduleWatchSync();
|
||||
};
|
||||
@@ -1592,7 +1595,21 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
}
|
||||
this.watchTimer = setTimeout(() => {
|
||||
this.watchTimer = null;
|
||||
void this.sync({ reason: "watch" }).catch((err) => {
|
||||
void (async () => {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
if (!(await settleMemoryWatchEventPaths(this.pendingWatchPaths))) {
|
||||
if (!this.closed) {
|
||||
this.scheduleWatchSync();
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
await this.sync({ reason: "watch" });
|
||||
})().catch((err) => {
|
||||
log.warn(`qmd watch sync failed: ${String(err)}`);
|
||||
});
|
||||
}, this.syncSettings.watchDebounceMs);
|
||||
|
||||
102
extensions/memory-core/src/memory/watch-settle.ts
Normal file
102
extensions/memory-core/src/memory/watch-settle.ts
Normal file
@@ -0,0 +1,102 @@
|
||||
import fsSync from "node:fs";
|
||||
import path from "node:path";
|
||||
|
||||
export type MemoryWatchEventStats = {
|
||||
isDirectory?: () => boolean;
|
||||
size?: number;
|
||||
mtimeMs?: number;
|
||||
};
|
||||
|
||||
type WatchPathSnapshot = {
|
||||
size: number;
|
||||
mtimeMs: number;
|
||||
};
|
||||
|
||||
export type MemoryWatchSettleQueue = Map<string, WatchPathSnapshot | null>;
|
||||
|
||||
const MEMORY_WATCH_SETTLE_RECHECK_MS = 100;
|
||||
|
||||
function snapshotFromStats(stats?: MemoryWatchEventStats): WatchPathSnapshot | null {
|
||||
if (!stats || stats.isDirectory?.()) {
|
||||
return null;
|
||||
}
|
||||
if (typeof stats.size !== "number" || typeof stats.mtimeMs !== "number") {
|
||||
return null;
|
||||
}
|
||||
return { size: stats.size, mtimeMs: stats.mtimeMs };
|
||||
}
|
||||
|
||||
function snapshotsMatch(left: WatchPathSnapshot | null, right: WatchPathSnapshot | null): boolean {
|
||||
if (left === null || right === null) {
|
||||
return left === right;
|
||||
}
|
||||
return left.size === right.size && left.mtimeMs === right.mtimeMs;
|
||||
}
|
||||
|
||||
function snapshotPath(filePath: string): WatchPathSnapshot | null {
|
||||
try {
|
||||
const stats = fsSync.statSync(filePath);
|
||||
if (stats.isDirectory()) {
|
||||
return null;
|
||||
}
|
||||
return { size: stats.size, mtimeMs: stats.mtimeMs };
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async function delay(ms: number): Promise<void> {
|
||||
await new Promise<void>((resolve) => {
|
||||
setTimeout(resolve, ms);
|
||||
});
|
||||
}
|
||||
|
||||
export function recordMemoryWatchEventPath(
|
||||
queue: MemoryWatchSettleQueue,
|
||||
watchPath?: string,
|
||||
stats?: MemoryWatchEventStats,
|
||||
): void {
|
||||
if (!watchPath) {
|
||||
return;
|
||||
}
|
||||
const trimmed = watchPath.trim();
|
||||
if (!trimmed) {
|
||||
return;
|
||||
}
|
||||
queue.set(path.resolve(trimmed), snapshotFromStats(stats));
|
||||
}
|
||||
|
||||
export async function settleMemoryWatchEventPaths(queue: MemoryWatchSettleQueue): Promise<boolean> {
|
||||
if (queue.size === 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const entries = Array.from(queue.entries());
|
||||
queue.clear();
|
||||
const missingBaseline: Array<{ filePath: string; snapshot: WatchPathSnapshot }> = [];
|
||||
|
||||
for (const [filePath, previousSnapshot] of entries) {
|
||||
const currentSnapshot = snapshotPath(filePath);
|
||||
if (previousSnapshot === null) {
|
||||
if (currentSnapshot !== null) {
|
||||
missingBaseline.push({ filePath, snapshot: currentSnapshot });
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (!snapshotsMatch(previousSnapshot, currentSnapshot)) {
|
||||
queue.set(filePath, currentSnapshot);
|
||||
}
|
||||
}
|
||||
|
||||
if (missingBaseline.length > 0) {
|
||||
await delay(MEMORY_WATCH_SETTLE_RECHECK_MS);
|
||||
for (const entry of missingBaseline) {
|
||||
const currentSnapshot = snapshotPath(entry.filePath);
|
||||
if (!snapshotsMatch(entry.snapshot, currentSnapshot)) {
|
||||
queue.set(entry.filePath, currentSnapshot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return queue.size === 0;
|
||||
}
|
||||
Reference in New Issue
Block a user