fix(memory): close local embedding providers on timeout (#84048)

Summary:
- The branch adds a close lifecycle for local memory embedding providers, scoped memory search/index teardown for one agent, Active Memory timeout cleanup, focused tests, and a changelog entry.
- Reproducibility: yes. The linked issue gives a concrete OpenClaw 2026.5.18 Telegram Active Memory timeout pa ... current-main source inspection confirms there is no timeout cleanup for that local embedding provider path.

Automerge notes:
- PR branch already contained follow-up commit before automerge: fix(memory): close local embedding providers on timeout

Validation:
- ClawSweeper review passed for head 8e2e369b5c.
- Required merge gates passed before the squash merge.

Prepared head SHA: 8e2e369b5c
Review: https://github.com/openclaw/openclaw/pull/84048#issuecomment-4485705481

Co-authored-by: brokemac79 <martin_cleary@yahoo.co.uk>
Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com>
Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com>
Approved-by: hxy91819
Co-authored-by: hxy91819 <8814856+hxy91819@users.noreply.github.com>
This commit is contained in:
clawsweeper[bot]
2026-05-19 09:19:09 +00:00
committed by GitHub
parent aef93881af
commit 1c1c75df72
25 changed files with 750 additions and 33 deletions

View File

@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Installer/Windows: launch `install.ps1` onboarding as an attached child process so fresh native Windows installs do not freeze visibly at `Starting setup...` or corrupt the wizard's terminal rendering.
- Memory/search: close local embedding providers when active-memory searches time out so pending local model loads and embedding contexts are aborted and released. (#83858) Thanks @brokemac79.
- Agents: include bounded trajectory queued-writer diagnostics in `pi-trajectory-flush` timeout warnings so flush stalls show pending writes, queued bytes, and append state. Fixes #82961. (#82962) Thanks @galiniliev.
- Agents/subagents: recover stale completion announces by retrying unsupported transcript-wait wakes without transcript waiting and forcing a message-tool handoff when the requester run is already stale. Fixes #83699. (#83700) Thanks @galiniliev.
- Agents/subagents: skip stale embedded-run wake probes for dormant completion requesters, so late subagent completions go straight to requester-agent/direct handoff instead of producing `reason=no_active_run` queue noise. (#82964) Thanks @galiniliev.

View File

@@ -27,6 +27,7 @@ const hoisted = vi.hoisted(() => {
},
};
return {
closeActiveMemorySearchManager: vi.fn(async () => {}),
sessionStore,
updateSessionStore: vi.fn(
async (_storePath: string, updater: (store: Record<string, unknown>) => void) => {
@@ -36,6 +37,10 @@ const hoisted = vi.hoisted(() => {
};
});
vi.mock("openclaw/plugin-sdk/memory-host-search", () => ({
closeActiveMemorySearchManager: hoisted.closeActiveMemorySearchManager,
}));
vi.mock("openclaw/plugin-sdk/session-store-runtime", async () => {
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/session-store-runtime")>(
"openclaw/plugin-sdk/session-store-runtime",
@@ -2811,6 +2816,37 @@ describe("active-memory plugin", () => {
expectLinesNotToContain(infoLines, " cached ");
});
it("releases memory search managers after active-memory timeouts", async () => {
testing.setMinimumTimeoutMsForTests(1);
testing.setSetupGraceTimeoutMsForTests(0);
api.pluginConfig = {
agents: ["main"],
timeoutMs: 1,
logging: true,
};
plugin.register(api as unknown as OpenClawPluginApi);
runEmbeddedPiAgent.mockImplementationOnce(() => new Promise<never>(() => {}));
const result = await hooks.before_prompt_build(
{ prompt: "what wings should i order? cleanup timeout", messages: [] },
{
agentId: "main",
trigger: "user",
sessionKey: "agent:main:cleanup-timeout",
messageProvider: "webchat",
},
);
expect(result).toBeUndefined();
await vi.waitFor(() => {
expect(hoisted.closeActiveMemorySearchManager).toHaveBeenCalledTimes(1);
});
expect(hoisted.closeActiveMemorySearchManager).toHaveBeenCalledWith({
cfg: configFile,
agentId: "main",
});
});
it("does not share cached recall results across session-id-only contexts", async () => {
api.pluginConfig = {
agents: ["main"],

View File

@@ -12,6 +12,7 @@ import {
resolveDefaultModelForAgent,
} from "openclaw/plugin-sdk/agent-runtime";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { closeActiveMemorySearchManager } from "openclaw/plugin-sdk/memory-host-search";
import {
resolveLivePluginConfigObject,
resolvePluginConfigObject,
@@ -980,6 +981,39 @@ function applyActiveMemoryRuntimeConfigSnapshot(
};
}
function resolveActiveMemoryCleanupConfig(api: OpenClawPluginApi): OpenClawConfig | undefined {
try {
return (
(api.runtime.config?.current?.() as OpenClawConfig | undefined) ??
(api.config as OpenClawConfig | undefined)
);
} catch {
return api.config as OpenClawConfig | undefined;
}
}
function scheduleMemorySearchCleanupAfterTimeout(
api: OpenClawPluginApi,
logPrefix: string,
agentId: string,
): void {
const cfg = resolveActiveMemoryCleanupConfig(api);
setTimeout(() => {
void closeActiveMemorySearchManager({ cfg: cfg ?? api.config, agentId })
.then(() => {
api.logger.debug?.(`${logPrefix} released memory search managers after timeout`);
})
.catch((error: unknown) => {
const message = toSingleLineLogValue(
error instanceof Error ? error.message : String(error),
);
api.logger.warn?.(
`${logPrefix} failed to release memory search managers after timeout: ${message}`,
);
});
}, 0);
}
function resolveThinkingLevel(thinking: unknown): ActiveMemoryThinkingLevel {
if (
thinking === "off" ||
@@ -2755,6 +2789,7 @@ async function maybeResolveActiveRecall(params: {
searchDebug: result.searchDebug,
});
recordCircuitBreakerTimeout(cbKey);
scheduleMemorySearchCleanupAfterTimeout(params.api, logPrefix, params.agentId);
return result;
}
@@ -2864,6 +2899,7 @@ async function maybeResolveActiveRecall(params: {
searchDebug: result.searchDebug,
});
recordCircuitBreakerTimeout(cbKey);
scheduleMemorySearchCleanupAfterTimeout(params.api, logPrefix, params.agentId);
return result;
}
const message = toSingleLineLogValue(error instanceof Error ? error.message : String(error));

View File

@@ -1,5 +1,7 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { describe, expect, it } from "vitest";
import type { MemoryPluginRuntime } from "openclaw/plugin-sdk/memory-core-host-runtime-core";
import { createTestPluginApi } from "openclaw/plugin-sdk/plugin-test-api";
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
buildMemoryFlushPlan,
DEFAULT_MEMORY_FLUSH_FORCE_TRANSCRIPT_BYTES,
@@ -8,6 +10,33 @@ import {
} from "./src/flush-plan.js";
import { buildPromptSection } from "./src/prompt-section.js";
const closeMemorySearchManagerMock = vi.hoisted(() => vi.fn(async () => {}));
vi.mock("./src/runtime-provider.js", () => ({
memoryRuntime: {
closeAllMemorySearchManagers: vi.fn(async () => {}),
closeMemorySearchManager: closeMemorySearchManagerMock,
getMemorySearchManager: vi.fn(async () => null),
},
}));
import plugin from "./index.js";
function registerMemoryCoreRuntime(): MemoryPluginRuntime {
let runtime: MemoryPluginRuntime | undefined;
plugin.register(
createTestPluginApi({
registerMemoryCapability(capability) {
runtime = capability.runtime;
},
}),
);
if (!runtime) {
throw new Error("expected memory-core to register a memory runtime");
}
return runtime;
}
describe("buildPromptSection", () => {
it("returns empty when no memory tools are available", () => {
expect(buildPromptSection({ availableTools: new Set() })).toStrictEqual([]);
@@ -53,6 +82,21 @@ describe("buildPromptSection", () => {
});
});
describe("memory-core plugin runtime registration", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("wires scoped memory search cleanup through the lazy runtime", async () => {
const runtime = registerMemoryCoreRuntime();
const cfg = {} as OpenClawConfig;
await runtime.closeMemorySearchManager?.({ cfg, agentId: "main" });
expect(closeMemorySearchManagerMock).toHaveBeenCalledWith({ cfg, agentId: "main" });
});
});
describe("buildMemoryFlushPlan", () => {
const cfg = {
agents: {

View File

@@ -166,6 +166,10 @@ const memoryRuntime: MemoryPluginRuntime = {
const { memoryRuntime: runtime } = await loadRuntimeProviderModule();
await runtime.closeAllMemorySearchManagers?.();
},
async closeMemorySearchManager(params) {
const { memoryRuntime: runtime } = await loadRuntimeProviderModule();
await runtime.closeMemorySearchManager?.(params);
},
};
export default definePluginEntry({
id: "memory-core",

View File

@@ -1 +1,5 @@
export { closeAllMemoryIndexManagers, MemoryIndexManager } from "./src/memory/manager-runtime.js";
export {
closeAllMemoryIndexManagers,
closeMemoryIndexManagersForAgent,
MemoryIndexManager,
} from "./src/memory/manager-runtime.js";

View File

@@ -12,7 +12,7 @@ import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi }
import "./test-runtime-mocks.js";
import type { MemoryIndexManager } from "./index.js";
import { closeAllMemorySearchManagers, getMemorySearchManager } from "./index.js";
import { EMBEDDING_PROBE_CACHE_TTL_MS } from "./manager.js";
import { closeMemoryIndexManagersForAgent, EMBEDDING_PROBE_CACHE_TTL_MS } from "./manager.js";
import {
DEFAULT_LOCAL_MODEL,
registerBuiltInMemoryEmbeddingProviders,
@@ -28,6 +28,9 @@ afterAll(() => {
let embedBatchCalls = 0;
let embedBatchInputCalls = 0;
let providerCloseCalls = 0;
let providerCloseFailuresRemaining = 0;
let providerCloseGate: Promise<void> | null = null;
let providerCalls: Array<{ provider?: string; model?: string; outputDimensionality?: number }> = [];
let forceNoProvider = false;
@@ -65,6 +68,14 @@ vi.mock("./embeddings.js", () => {
provider: {
id: providerId,
model,
close: async () => {
providerCloseCalls += 1;
await providerCloseGate;
if (providerCloseFailuresRemaining > 0) {
providerCloseFailuresRemaining -= 1;
throw new Error("provider close failed");
}
},
embedQuery: async (text: string) => embedText(text),
embedBatch: async (texts: string[]) => {
embedBatchCalls += 1;
@@ -188,6 +199,9 @@ describe("memory index", () => {
registerBuiltInMemoryEmbeddingProviders({ registerMemoryEmbeddingProvider: registerAdapter });
embedBatchCalls = 0;
embedBatchInputCalls = 0;
providerCloseCalls = 0;
providerCloseFailuresRemaining = 0;
providerCloseGate = null;
providerCalls = [];
forceNoProvider = false;
@@ -354,6 +368,96 @@ describe("memory index", () => {
}
});
it("closes embedding providers when memory index managers close", async () => {
const cfg = createCfg({
storePath: indexMainPath,
hybrid: { enabled: true, vectorWeight: 0.5, textWeight: 0.5 },
});
const manager = await getFreshManager(cfg);
await manager.probeEmbeddingAvailability();
expect(providerCloseCalls).toBe(0);
await manager.close();
await manager.close();
expect(providerCloseCalls).toBe(1);
});
it("closes embedding providers before waiting for pending sync to settle", async () => {
const cfg = createCfg({
storePath: indexMainPath,
hybrid: { enabled: true, vectorWeight: 0.5, textWeight: 0.5 },
});
const manager = await getFreshManager(cfg);
await manager.probeEmbeddingAvailability();
let resolveSync: () => void = () => {};
(manager as unknown as { syncing: Promise<void> }).syncing = new Promise<void>((resolve) => {
resolveSync = resolve;
});
const closePromise = manager.close();
await vi.waitFor(() => {
expect(providerCloseCalls).toBe(1);
});
let closeSettled = false;
void closePromise.then(() => {
closeSettled = true;
});
await Promise.resolve();
expect(closeSettled).toBe(false);
resolveSync();
await closePromise;
});
it("evicts scoped memory index managers before close settles", async () => {
let releaseProviderClose: () => void = () => {};
providerCloseGate = new Promise<void>((resolve) => {
releaseProviderClose = resolve;
});
const cfg = createCfg({
storePath: indexMainPath,
hybrid: { enabled: true, vectorWeight: 0.5, textWeight: 0.5 },
});
const first = requireManager(await getMemorySearchManager({ cfg, agentId: "main" }));
managersForCleanup.add(first);
await first.probeEmbeddingAvailability();
const closePromise = closeMemoryIndexManagersForAgent({ cfg, agentId: "main" });
let second: MemoryIndexManager | null = null;
try {
await vi.waitFor(() => {
expect(providerCloseCalls).toBe(1);
});
second = requireManager(await getMemorySearchManager({ cfg, agentId: "main" }));
managersForCleanup.add(second);
expect(second).not.toBe(first);
} finally {
releaseProviderClose();
providerCloseGate = null;
}
await closePromise;
const third = requireManager(await getMemorySearchManager({ cfg, agentId: "main" }));
managersForCleanup.add(third);
expect(third).toBe(second);
});
it("retries embedding provider close before releasing the manager", async () => {
providerCloseFailuresRemaining = 1;
const cfg = createCfg({
storePath: indexMainPath,
hybrid: { enabled: true, vectorWeight: 0.5, textWeight: 0.5 },
});
const manager = await getFreshManager(cfg);
await manager.probeEmbeddingAvailability();
await manager.close();
expect(providerCloseCalls).toBe(2);
});
it("indexes multimodal image and audio files from extra paths with Gemini structured inputs", async () => {
const mediaDir = path.join(workspaceDir, "media-memory");
await fs.mkdir(mediaDir, { recursive: true });

View File

@@ -6,6 +6,7 @@ export type {
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
export {
closeAllMemorySearchManagers,
closeMemorySearchManager,
getMemorySearchManager,
type MemorySearchManagerPurpose,
type MemorySearchManagerResult,

View File

@@ -1 +1,5 @@
export { closeAllMemoryIndexManagers, MemoryIndexManager } from "./manager.js";
export {
closeAllMemoryIndexManagers,
closeMemoryIndexManagersForAgent,
MemoryIndexManager,
} from "./manager.js";

View File

@@ -86,6 +86,32 @@ export async function closeAllMemoryIndexManagers(): Promise<void> {
});
}
export async function closeMemoryIndexManagersForAgent(params: {
cfg: OpenClawConfig;
agentId: string;
}): Promise<void> {
const settings = resolveMemorySearchConfig(params.cfg, params.agentId);
if (!settings) {
return;
}
const workspaceDir = resolveAgentWorkspaceDir(params.cfg, params.agentId);
const key = `${params.agentId}:${workspaceDir}:${JSON.stringify(settings)}:default`;
const pending = INDEX_CACHE_PENDING.get(key);
if (pending) {
await Promise.allSettled([pending]);
}
const manager = INDEX_CACHE.get(key);
if (!manager) {
return;
}
INDEX_CACHE.delete(key);
try {
await manager.close();
} catch (err) {
log.warn(`failed to close memory index manager for agent ${params.agentId}: ${String(err)}`);
}
}
export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements MemorySearchManager {
private readonly cacheKey: string;
protected readonly cfg: OpenClawConfig;
@@ -933,8 +959,36 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
this.sessionUnsubscribe();
this.sessionUnsubscribe = null;
}
await awaitPendingManagerWork({ pendingSync, pendingProviderInit });
closeMemoryDatabase(this.db);
INDEX_CACHE.delete(this.cacheKey);
const closeErrors = new Map<EmbeddingProvider, unknown>();
const closeCurrentProvider = async () => {
const provider = this.provider;
if (!provider) {
return;
}
try {
await provider.close?.();
closeErrors.delete(provider);
if (this.provider === provider) {
this.provider = null;
}
} catch (err) {
closeErrors.set(provider, err);
}
};
await awaitPendingManagerWork({ pendingProviderInit });
await closeCurrentProvider();
try {
await awaitPendingManagerWork({ pendingSync });
await closeCurrentProvider();
} finally {
closeMemoryDatabase(this.db);
if (INDEX_CACHE.get(this.cacheKey) === this) {
INDEX_CACHE.delete(this.cacheKey);
}
}
const closeError = closeErrors.values().next().value;
if (closeError) {
throw closeError;
}
}
}

View File

@@ -109,6 +109,7 @@ const fallbackManager = vi.hoisted(() => ({
const fallbackSearch = fallbackManager.search;
const mockMemoryIndexGet = vi.hoisted(() => vi.fn(async () => fallbackManager));
const mockCloseAllMemoryIndexManagers = vi.hoisted(() => vi.fn(async () => {}));
const mockCloseMemoryIndexManagersForAgent = vi.hoisted(() => vi.fn(async () => {}));
const checkQmdBinaryAvailability = vi.hoisted(() =>
vi.fn<CheckQmdBinaryAvailability>(async () => ({ available: true })),
);
@@ -128,10 +129,15 @@ vi.mock("../../manager-runtime.js", () => ({
get: mockMemoryIndexGet,
},
closeAllMemoryIndexManagers: mockCloseAllMemoryIndexManagers,
closeMemoryIndexManagersForAgent: mockCloseMemoryIndexManagersForAgent,
}));
import { QmdMemoryManager } from "./qmd-manager.js";
import { closeAllMemorySearchManagers, getMemorySearchManager } from "./search-manager.js";
import {
closeAllMemorySearchManagers,
closeMemorySearchManager,
getMemorySearchManager,
} from "./search-manager.js";
const createQmdManagerMock = vi.mocked(QmdMemoryManager.create);
type QmdManagerInstance = Awaited<ReturnType<typeof QmdMemoryManager.create>>;
@@ -280,6 +286,7 @@ beforeEach(async () => {
fallbackManager.probeVectorAvailability.mockClear();
fallbackManager.close.mockClear();
mockCloseAllMemoryIndexManagers.mockClear();
mockCloseMemoryIndexManagersForAgent.mockClear();
mockMemoryIndexGet.mockClear();
mockMemoryIndexGet.mockResolvedValue(fallbackManager);
checkQmdBinaryAvailability.mockClear();
@@ -383,6 +390,27 @@ describe("getMemorySearchManager caching", () => {
}
});
it("preserves qmd open-failure cooldown when scoped teardown closes no qmd manager", async () => {
const agentId = "qmd-open-cooldown-scoped-close";
const cfg = createQmdCfg(agentId);
const nowSpy = vi.spyOn(Date, "now").mockReturnValue(1_000);
createQmdManagerMock.mockRejectedValueOnce(new Error("Cannot find package 'chokidar'"));
try {
const first = await getMemorySearchManager({ cfg, agentId });
expect(first.manager).toBe(fallbackManager);
expect(createQmdManagerMock).toHaveBeenCalledTimes(1);
await closeMemorySearchManager({ cfg, agentId });
const second = await getMemorySearchManager({ cfg, agentId });
expect(second.manager).toBe(fallbackManager);
expect(createQmdManagerMock).toHaveBeenCalledTimes(1);
} finally {
nowSpy.mockRestore();
}
});
it("lets status probes bypass and clear a full qmd open-failure cooldown", async () => {
const agentId = "qmd-open-status-bypass";
const cfg = createQmdCfg(agentId);
@@ -930,6 +958,46 @@ describe("getMemorySearchManager caching", () => {
expect(createQmdManagerMock.mock.calls).toHaveLength(2);
});
it("closes only the requested agent qmd manager on scoped teardown", async () => {
const mainCfg = createQmdCfg("main");
const otherPrimary = createQmdManagerInstanceMock();
createQmdManagerMock.mockImplementationOnce(
async () => mockPrimary as unknown as QmdManagerInstance,
);
createQmdManagerMock.mockImplementationOnce(
async () => otherPrimary as unknown as QmdManagerInstance,
);
const main = await getMemorySearchManager({ cfg: mainCfg, agentId: "main" });
const other = await getMemorySearchManager({ cfg: createQmdCfg("other"), agentId: "other" });
const mainManager = requireManager(main);
const otherManager = requireManager(other);
await closeMemorySearchManager({ cfg: mainCfg, agentId: "main" });
expect(mockPrimary.close).toHaveBeenCalledTimes(1);
expect(otherPrimary.close).not.toHaveBeenCalled();
const nextMain = await getMemorySearchManager({ cfg: mainCfg, agentId: "main" });
const nextOther = await getMemorySearchManager({
cfg: createQmdCfg("other"),
agentId: "other",
});
expect(nextMain.manager).not.toBe(mainManager);
expect(nextOther.manager).toBe(otherManager);
});
it("closes the requested agent builtin index manager on scoped teardown", async () => {
const cfg = createBuiltinCfg("main");
await getMemorySearchManager({ cfg, agentId: "main" });
await closeMemorySearchManager({ cfg, agentId: "main" });
expect(mockCloseMemoryIndexManagersForAgent).toHaveBeenCalledWith({
cfg,
agentId: "main",
});
});
it("waits for pending full qmd manager creation during global teardown", async () => {
const agentId = "teardown-pending-qmd";
const cfg = createQmdCfg(agentId);

View File

@@ -400,6 +400,32 @@ export async function closeAllMemorySearchManagers(): Promise<void> {
}
}
export async function closeMemorySearchManager(params: {
cfg: OpenClawConfig;
agentId: string;
}): Promise<void> {
const normalizedAgentId = normalizeAgentId(params.agentId);
const scopeKey = buildQmdManagerScopeKey(normalizedAgentId);
const pending = PENDING_QMD_MANAGER_CREATES.get(scopeKey);
if (pending) {
await Promise.allSettled([pending.promise]);
}
const cached = QMD_MANAGER_CACHE.get(scopeKey);
if (cached) {
QMD_MANAGER_CACHE.delete(scopeKey);
QMD_MANAGER_OPEN_FAILURES.delete(scopeKey);
try {
await cached.manager.close?.();
} catch (err) {
log.warn(`failed to close qmd memory manager for agent ${normalizedAgentId}: ${String(err)}`);
}
}
if (managerRuntimePromise !== null) {
const { closeMemoryIndexManagersForAgent } = await loadManagerRuntime();
await closeMemoryIndexManagersForAgent({ cfg: params.cfg, agentId: normalizedAgentId });
}
}
class FallbackMemoryManager implements MemorySearchManager {
private fallback: Maybe<MemorySearchManager> = null;
private primaryFailed = false;

View File

@@ -1,6 +1,10 @@
import type { MemoryPluginRuntime } from "openclaw/plugin-sdk/memory-core-host-runtime-core";
import { resolveMemoryBackendConfig } from "openclaw/plugin-sdk/memory-core-host-runtime-files";
import { closeAllMemorySearchManagers, getMemorySearchManager } from "./memory/index.js";
import {
closeAllMemorySearchManagers,
closeMemorySearchManager,
getMemorySearchManager,
} from "./memory/index.js";
export const memoryRuntime: MemoryPluginRuntime = {
async getMemorySearchManager(params) {
@@ -16,4 +20,7 @@ export const memoryRuntime: MemoryPluginRuntime = {
async closeAllMemorySearchManagers() {
await closeAllMemorySearchManagers();
},
async closeMemorySearchManager(params) {
await closeMemorySearchManager(params);
},
};

View File

@@ -17,19 +17,45 @@ afterEach(() => {
vi.resetAllMocks();
});
function createDeferred<T>() {
let resolve: ((value: T) => void) | undefined;
let reject: ((reason?: unknown) => void) | undefined;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
if (!resolve || !reject) {
throw new Error("Expected deferred callbacks to be initialized");
}
return { promise, resolve, reject };
}
function mockLocalEmbeddingRuntime(vector = new Float32Array([2.35, 3.45, 0.63, 4.3])) {
const disposeContext = vi.fn();
const disposeModel = vi.fn();
const disposeLlama = vi.fn();
const getEmbeddingFor = vi.fn().mockResolvedValue({ vector });
const createEmbeddingContext = vi.fn().mockResolvedValue({ getEmbeddingFor });
const loadModel = vi.fn().mockResolvedValue({ createEmbeddingContext });
const createEmbeddingContext = vi
.fn()
.mockResolvedValue({ getEmbeddingFor, dispose: disposeContext });
const loadModel = vi.fn().mockResolvedValue({ createEmbeddingContext, dispose: disposeModel });
const resolveModelFile = vi.fn(async (modelPath: string) => `/resolved/${modelPath}`);
nodeLlamaMock.importNodeLlamaCpp.mockResolvedValue({
getLlama: async () => ({ loadModel }),
getLlama: async () => ({ loadModel, dispose: disposeLlama }),
resolveModelFile,
LlamaLogLevel: { error: 0 },
} as never);
return { createEmbeddingContext, getEmbeddingFor, loadModel, resolveModelFile };
return {
createEmbeddingContext,
disposeContext,
disposeLlama,
disposeModel,
getEmbeddingFor,
loadModel,
resolveModelFile,
};
}
describe("local embedding provider", () => {
@@ -50,7 +76,16 @@ describe("local embedding provider", () => {
"hf:ggml-org/embeddinggemma-300m-qat-q8_0-GGUF/embeddinggemma-300m-qat-Q8_0.gguf",
);
expect(magnitude).toBeCloseTo(1, 5);
expect(runtime.resolveModelFile).toHaveBeenCalledWith(DEFAULT_LOCAL_MODEL, undefined);
expect(runtime.resolveModelFile).toHaveBeenCalledWith(
DEFAULT_LOCAL_MODEL,
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
expect(runtime.loadModel).toHaveBeenCalledWith(
expect.objectContaining({
modelPath: `/resolved/${DEFAULT_LOCAL_MODEL}`,
loadSignal: expect.any(AbortSignal),
}),
);
expect(runtime.getEmbeddingFor).toHaveBeenCalledWith("test query");
});
@@ -66,7 +101,9 @@ describe("local embedding provider", () => {
await provider.embedQuery("context size default test");
expect(runtime.createEmbeddingContext).toHaveBeenCalledWith({ contextSize: 4096 });
expect(runtime.createEmbeddingContext).toHaveBeenCalledWith(
expect.objectContaining({ contextSize: 4096, createSignal: expect.any(AbortSignal) }),
);
});
it("passes configured contextSize to createEmbeddingContext", async () => {
@@ -82,7 +119,9 @@ describe("local embedding provider", () => {
await provider.embedQuery("context size custom test");
expect(runtime.createEmbeddingContext).toHaveBeenCalledWith({ contextSize: 2048 });
expect(runtime.createEmbeddingContext).toHaveBeenCalledWith(
expect.objectContaining({ contextSize: 2048, createSignal: expect.any(AbortSignal) }),
);
});
it('passes "auto" contextSize to createEmbeddingContext when explicitly set', async () => {
@@ -98,7 +137,9 @@ describe("local embedding provider", () => {
await provider.embedQuery("context size auto test");
expect(runtime.createEmbeddingContext).toHaveBeenCalledWith({ contextSize: "auto" });
expect(runtime.createEmbeddingContext).toHaveBeenCalledWith(
expect.objectContaining({ contextSize: "auto", createSignal: expect.any(AbortSignal) }),
);
});
it("trims explicit local model paths and cache directories", async () => {
@@ -118,7 +159,136 @@ describe("local embedding provider", () => {
await provider.embedBatch(["a", "b"]);
expect(provider.model).toBe("/models/embed.gguf");
expect(runtime.resolveModelFile).toHaveBeenCalledWith("/models/embed.gguf", "/cache/models");
expect(runtime.resolveModelFile).toHaveBeenCalledWith(
"/models/embed.gguf",
expect.objectContaining({
directory: "/cache/models",
signal: expect.any(AbortSignal),
}),
);
expect(runtime.getEmbeddingFor).toHaveBeenCalledTimes(2);
});
it("disposes cached local llama resources when closed", async () => {
const runtime = mockLocalEmbeddingRuntime();
const provider = await createLocalEmbeddingProvider({
config: {} as never,
provider: "local",
model: "",
fallback: "none",
});
await provider.embedQuery("load local resources");
await provider.close?.();
await provider.close?.();
expect(runtime.disposeContext).toHaveBeenCalledTimes(1);
expect(runtime.disposeModel).toHaveBeenCalledTimes(1);
expect(runtime.disposeLlama).toHaveBeenCalledTimes(1);
await expect(provider.embedQuery("after close")).rejects.toThrow(
"Local embedding provider has been closed",
);
});
it("does not wait for pending local llama initialization before close resolves", async () => {
const disposeLlama = vi.fn();
const getLlamaGate = createDeferred<unknown>();
nodeLlamaMock.importNodeLlamaCpp.mockResolvedValue({
getLlama: async () => (await getLlamaGate.promise) as never,
resolveModelFile: vi.fn(async (modelPath: string) => `/resolved/${modelPath}`),
LlamaLogLevel: { error: 0 },
} as never);
const provider = await createLocalEmbeddingProvider({
config: {} as never,
provider: "local",
model: "",
fallback: "none",
});
const embedPromise = provider.embedQuery("pending init");
await expect(provider.close?.()).resolves.toBeUndefined();
getLlamaGate.resolve({ loadModel: vi.fn(), dispose: disposeLlama });
await expect(embedPromise).rejects.toThrow("Local embedding provider has been closed");
expect(disposeLlama).toHaveBeenCalledTimes(1);
});
it("aborts pending local llama model loads when closed", async () => {
const loadModelStarted = createDeferred<void>();
const loadModelGate = createDeferred<never>();
const disposeLlama = vi.fn();
let capturedResolveSignal: AbortSignal | undefined;
let capturedLoadSignal: AbortSignal | undefined;
const loadModel = vi.fn(
(params: { modelPath: string; loadSignal?: AbortSignal }): Promise<never> => {
capturedLoadSignal = params.loadSignal;
loadModelStarted.resolve();
return loadModelGate.promise;
},
);
nodeLlamaMock.importNodeLlamaCpp.mockResolvedValue({
getLlama: async () => ({ loadModel, dispose: disposeLlama }),
resolveModelFile: vi.fn(async (_modelPath: string, options?: { signal?: AbortSignal }) => {
capturedResolveSignal = options?.signal;
return "/resolved/model.gguf";
}),
LlamaLogLevel: { error: 0 },
} as never);
const provider = await createLocalEmbeddingProvider({
config: {} as never,
provider: "local",
model: "",
fallback: "none",
});
const embedPromise = provider.embedQuery("pending model load");
await loadModelStarted.promise;
await expect(provider.close?.()).resolves.toBeUndefined();
expect(capturedResolveSignal?.aborted).toBe(true);
expect(capturedLoadSignal?.aborted).toBe(true);
expect(disposeLlama).toHaveBeenCalledTimes(1);
loadModelGate.reject(new Error("load aborted"));
await expect(embedPromise).rejects.toThrow("load aborted");
});
it("aborts pending local llama embedding context creation when closed", async () => {
const createContextStarted = createDeferred<void>();
const createContextGate = createDeferred<never>();
const disposeLlama = vi.fn();
const disposeModel = vi.fn();
let capturedCreateSignal: AbortSignal | undefined;
const createEmbeddingContext = vi.fn(
(options?: { createSignal?: AbortSignal }): Promise<never> => {
capturedCreateSignal = options?.createSignal;
createContextStarted.resolve();
return createContextGate.promise;
},
);
nodeLlamaMock.importNodeLlamaCpp.mockResolvedValue({
getLlama: async () => ({
loadModel: vi.fn(async () => ({ createEmbeddingContext, dispose: disposeModel })),
dispose: disposeLlama,
}),
resolveModelFile: vi.fn(async () => "/resolved/model.gguf"),
LlamaLogLevel: { error: 0 },
} as never);
const provider = await createLocalEmbeddingProvider({
config: {} as never,
provider: "local",
model: "",
fallback: "none",
});
const embedPromise = provider.embedQuery("pending context create");
await createContextStarted.promise;
await expect(provider.close?.()).resolves.toBeUndefined();
expect(capturedCreateSignal?.aborted).toBe(true);
expect(disposeModel).toHaveBeenCalledTimes(1);
expect(disposeLlama).toHaveBeenCalledTimes(1);
createContextGate.reject(new Error("context create aborted"));
await expect(embedPromise).rejects.toThrow("context create aborted");
});
});

View File

@@ -9,6 +9,10 @@ import {
} from "./node-llama.js";
import { normalizeOptionalString } from "./string-utils.js";
type DisposableResource = {
dispose?: () => Promise<void> | void;
};
export type {
EmbeddingProvider,
EmbeddingProviderFallback,
@@ -20,6 +24,22 @@ export type {
export { DEFAULT_LOCAL_MODEL } from "./embedding-defaults.js";
async function disposeResources(
resources: Array<DisposableResource | null | undefined>,
): Promise<void> {
let firstError: unknown;
for (const resource of resources) {
try {
await resource?.dispose?.();
} catch (err) {
firstError ??= err;
}
}
if (firstError) {
throw firstError;
}
}
export async function createLocalEmbeddingProvider(
options: EmbeddingProviderOptions,
): Promise<EmbeddingProvider> {
@@ -34,8 +54,26 @@ export async function createLocalEmbeddingProvider(
let embeddingModel: LlamaModel | null = null;
let embeddingContext: LlamaEmbeddingContext | null = null;
let initPromise: Promise<LlamaEmbeddingContext> | null = null;
let initAbortController: AbortController | null = null;
let closePromise: Promise<void> | null = null;
let closed = false;
const throwIfClosed = () => {
if (closed) {
throw new Error("Local embedding provider has been closed");
}
};
const disposeAndThrowIfClosed = async <T extends DisposableResource>(resource: T): Promise<T> => {
if (!closed) {
return resource;
}
await disposeResources([resource]);
throwIfClosed();
return resource;
};
const ensureContext = async (): Promise<LlamaEmbeddingContext> => {
throwIfClosed();
if (embeddingContext) {
return embeddingContext;
}
@@ -43,21 +81,40 @@ export async function createLocalEmbeddingProvider(
return initPromise;
}
initPromise = (async () => {
const abortController = new AbortController();
initAbortController = abortController;
try {
if (!llama) {
llama = await getLlama({ logLevel: LlamaLogLevel.error });
const nextLlama = await getLlama({ logLevel: LlamaLogLevel.error });
llama = await disposeAndThrowIfClosed(nextLlama);
}
if (!embeddingModel) {
const resolved = await resolveModelFile(modelPath, modelCacheDir || undefined);
embeddingModel = await llama.loadModel({ modelPath: resolved });
const resolved = await resolveModelFile(modelPath, {
...(modelCacheDir ? { directory: modelCacheDir } : {}),
signal: abortController.signal,
});
throwIfClosed();
const nextModel = await llama.loadModel({
modelPath: resolved,
loadSignal: abortController.signal,
});
embeddingModel = await disposeAndThrowIfClosed(nextModel);
}
if (!embeddingContext) {
embeddingContext = await embeddingModel.createEmbeddingContext({ contextSize });
const nextContext = await embeddingModel.createEmbeddingContext({
contextSize,
createSignal: abortController.signal,
});
embeddingContext = await disposeAndThrowIfClosed(nextContext);
}
return embeddingContext;
} catch (err) {
initPromise = null;
throw err;
} finally {
if (initAbortController === abortController) {
initAbortController = null;
}
}
})();
return initPromise;
@@ -67,18 +124,23 @@ export async function createLocalEmbeddingProvider(
id: "local",
model: modelPath,
embedQuery: async (text, options) => {
throwIfClosed();
options?.signal?.throwIfAborted();
const ctx = await ensureContext();
throwIfClosed();
options?.signal?.throwIfAborted();
const embedding = await ctx.getEmbeddingFor(text);
return sanitizeAndNormalizeEmbedding(Array.from(embedding.vector));
},
embedBatch: async (texts, options) => {
throwIfClosed();
options?.signal?.throwIfAborted();
const ctx = await ensureContext();
throwIfClosed();
options?.signal?.throwIfAborted();
const embeddings = await Promise.all(
texts.map(async (text) => {
throwIfClosed();
options?.signal?.throwIfAborted();
const embedding = await ctx.getEmbeddingFor(text);
return sanitizeAndNormalizeEmbedding(Array.from(embedding.vector));
@@ -86,5 +148,24 @@ export async function createLocalEmbeddingProvider(
);
return embeddings;
},
close: async () => {
if (closePromise) {
return closePromise;
}
closed = true;
initAbortController?.abort();
initAbortController = null;
closePromise = (async () => {
const context = embeddingContext;
const model = embeddingModel;
const runtime = llama;
embeddingContext = null;
embeddingModel = null;
llama = null;
initPromise = null;
await disposeResources([context, model, runtime]);
})();
return closePromise;
},
};
}

View File

@@ -11,6 +11,7 @@ export type EmbeddingProvider = {
inputs: EmbeddingInput[],
options?: EmbeddingProviderCallOptions,
) => Promise<number[][]>;
close?: () => Promise<void> | void;
};
export type EmbeddingProviderCallOptions = {

View File

@@ -4,16 +4,25 @@ export type LlamaEmbedding = {
export type LlamaEmbeddingContext = {
getEmbeddingFor: (text: string) => Promise<LlamaEmbedding>;
dispose?: () => Promise<void> | void;
};
export type LlamaModel = {
createEmbeddingContext: (options?: {
contextSize?: number | "auto";
createSignal?: AbortSignal;
}) => Promise<LlamaEmbeddingContext>;
dispose?: () => Promise<void> | void;
};
export type ResolveModelFileOptions = {
directory?: string;
signal?: AbortSignal;
};
export type Llama = {
loadModel: (params: { modelPath: string }) => Promise<LlamaModel>;
loadModel: (params: { modelPath: string; loadSignal?: AbortSignal }) => Promise<LlamaModel>;
dispose?: () => Promise<void> | void;
};
export type NodeLlamaCppModule = {
@@ -21,7 +30,10 @@ export type NodeLlamaCppModule = {
error: number;
};
getLlama: (params: { logLevel: number }) => Promise<Llama>;
resolveModelFile: (modelPath: string, cacheDir?: string) => Promise<string>;
resolveModelFile: (
modelPath: string,
optionsOrDirectory?: string | ResolveModelFileOptions,
) => Promise<string>;
};
const NODE_LLAMA_CPP_MODULE = "node-llama-cpp";

View File

@@ -1,4 +1,5 @@
export {
closeActiveMemorySearchManager,
closeActiveMemorySearchManagers,
getActiveMemorySearchManager,
resolveActiveMemoryBackendConfig,

View File

@@ -1,24 +1,30 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import {
closeActiveMemorySearchManager,
closeActiveMemorySearchManagers,
getActiveMemorySearchManager,
} from "./memory-host-search.js";
const { closeActiveMemorySearchManagersMock, getActiveMemorySearchManagerMock } = vi.hoisted(
() => ({
closeActiveMemorySearchManagersMock: vi.fn(),
getActiveMemorySearchManagerMock: vi.fn(),
}),
);
const {
closeActiveMemorySearchManagerMock,
closeActiveMemorySearchManagersMock,
getActiveMemorySearchManagerMock,
} = vi.hoisted(() => ({
closeActiveMemorySearchManagerMock: vi.fn(),
closeActiveMemorySearchManagersMock: vi.fn(),
getActiveMemorySearchManagerMock: vi.fn(),
}));
vi.mock("./memory-host-search.runtime.js", () => ({
closeActiveMemorySearchManager: closeActiveMemorySearchManagerMock,
closeActiveMemorySearchManagers: closeActiveMemorySearchManagersMock,
getActiveMemorySearchManager: getActiveMemorySearchManagerMock,
}));
describe("memory-host-search facade", () => {
beforeEach(() => {
closeActiveMemorySearchManagerMock.mockReset();
closeActiveMemorySearchManagersMock.mockReset();
getActiveMemorySearchManagerMock.mockReset();
});
@@ -39,4 +45,12 @@ describe("memory-host-search facade", () => {
expect(closeActiveMemorySearchManagersMock).toHaveBeenCalledWith(cfg);
});
it("delegates scoped runtime cleanup to the lazy runtime module", async () => {
const cfg = { agents: { list: [{ id: "main", default: true }] } } as OpenClawConfig;
await closeActiveMemorySearchManager({ cfg, agentId: "main" });
expect(closeActiveMemorySearchManagerMock).toHaveBeenCalledWith({ cfg, agentId: "main" });
});
});

View File

@@ -27,3 +27,11 @@ export async function closeActiveMemorySearchManagers(cfg?: OpenClawConfig): Pro
const runtime = await loadMemoryHostSearchRuntime();
await runtime.closeActiveMemorySearchManagers(cfg);
}
export async function closeActiveMemorySearchManager(params: {
cfg: OpenClawConfig;
agentId: string;
}): Promise<void> {
const runtime = await loadMemoryHostSearchRuntime();
await runtime.closeActiveMemorySearchManager(params);
}

View File

@@ -42,6 +42,7 @@ export type MemoryEmbeddingProvider = {
inputs: EmbeddingInput[],
options?: MemoryEmbeddingProviderCallOptions,
) => Promise<number[][]>;
close?: () => Promise<void> | void;
};
export type MemoryEmbeddingProviderCreateOptions = {

View File

@@ -45,6 +45,7 @@ vi.mock("./memory-state.js", () => ({
let getActiveMemorySearchManager: typeof import("./memory-runtime.js").getActiveMemorySearchManager;
let resolveActiveMemoryBackendConfig: typeof import("./memory-runtime.js").resolveActiveMemoryBackendConfig;
let closeActiveMemorySearchManager: typeof import("./memory-runtime.js").closeActiveMemorySearchManager;
let closeActiveMemorySearchManagers: typeof import("./memory-runtime.js").closeActiveMemorySearchManagers;
function createMemoryAutoEnableFixture() {
@@ -67,6 +68,7 @@ function createMemoryRuntimeFixture() {
return {
getMemorySearchManager: vi.fn(async () => ({ manager: null, error: "no index" })),
resolveMemoryBackendConfig: vi.fn(() => ({ backend: "builtin" as const })),
closeMemorySearchManager: vi.fn(async () => {}),
};
}
@@ -147,6 +149,7 @@ describe("memory runtime auto-enable loading", () => {
({
getActiveMemorySearchManager,
resolveActiveMemoryBackendConfig,
closeActiveMemorySearchManager,
closeActiveMemorySearchManagers,
} = await import("./memory-runtime.js"));
resolveRuntimePluginRegistryMock.mockReset();
@@ -343,4 +346,18 @@ describe("memory runtime auto-enable loading", () => {
] as const)("$name", async ({ config, setup }) => {
await expectCloseMemoryRuntimeCase({ config, setup });
});
it("delegates scoped cleanup to the loaded memory runtime without reloading plugins", async () => {
const runtime = createMemoryRuntimeFixture();
const cfg = { plugins: {} };
getMemoryRuntimeMock.mockReturnValue(runtime);
await closeActiveMemorySearchManager({ cfg: cfg as never, agentId: "main" });
expect(runtime.closeMemorySearchManager).toHaveBeenCalledWith({
cfg,
agentId: "main",
});
expectNoMemoryRuntimeBootstrap();
});
});

View File

@@ -74,3 +74,11 @@ export async function closeActiveMemorySearchManagers(cfg?: OpenClawConfig): Pro
const runtime = getMemoryRuntime();
await runtime?.closeAllMemorySearchManagers?.();
}
export async function closeActiveMemorySearchManager(params: {
cfg: OpenClawConfig;
agentId: string;
}): Promise<void> {
const runtime = getMemoryRuntime();
await runtime?.closeMemorySearchManager?.(params);
}

View File

@@ -107,6 +107,7 @@ export type MemoryPluginRuntime = {
cfg: OpenClawConfig;
agentId: string;
}): MemoryRuntimeBackendConfig;
closeMemorySearchManager?(params: { cfg: OpenClawConfig; agentId: string }): Promise<void>;
closeAllMemorySearchManagers?(): Promise<void>;
};

View File

@@ -7,16 +7,30 @@ declare module "node-llama-cpp" {
export type LlamaEmbeddingContext = {
getEmbeddingFor: (text: string) => Promise<LlamaEmbedding>;
dispose?: () => Promise<void> | void;
};
export type LlamaModel = {
createEmbeddingContext: () => Promise<LlamaEmbeddingContext>;
createEmbeddingContext: (options?: {
contextSize?: number | "auto";
createSignal?: AbortSignal;
}) => Promise<LlamaEmbeddingContext>;
dispose?: () => Promise<void> | void;
};
export type ResolveModelFileOptions = {
directory?: string;
signal?: AbortSignal;
};
export type Llama = {
loadModel: (params: { modelPath: string }) => Promise<LlamaModel>;
loadModel: (params: { modelPath: string; loadSignal?: AbortSignal }) => Promise<LlamaModel>;
dispose?: () => Promise<void> | void;
};
export function getLlama(params: { logLevel: LlamaLogLevel }): Promise<Llama>;
export function resolveModelFile(modelPath: string, cacheDir?: string): Promise<string>;
export function resolveModelFile(
modelPath: string,
optionsOrDirectory?: string | ResolveModelFileOptions,
): Promise<string>;
}