mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:30:42 +00:00
fix: keep local embedding batches from flooding providers
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
0b0d796bceddfb9e2929518ba84af626da7f5d75c392a217041f36e850c4e74f config-baseline.json
|
||||
271fdf1d6652927e0fc160a6f25276bf6dccb8f1b27fab15e0fc2620e8cacab4 config-baseline.core.json
|
||||
3b9a8841973205560a5396e7a18d301852941a95a561900984ad618e69a99d05 config-baseline.json
|
||||
089ab9493c8482687f19da89d37e069fc402543696c92e6e3be86072c1e48c68 config-baseline.core.json
|
||||
7cd9c908f066c143eab2a201efbc9640f483ab28bba92ddeca1d18cc2b528bc3 config-baseline.channel.json
|
||||
17eb3f8887193579ff32e35f9bd520ba2bd6049e52ab18855c5d41fcbf195d83 config-baseline.plugin.json
|
||||
|
||||
@@ -135,6 +135,11 @@ earlier conversations. This is opt-in via
|
||||
**Only keyword matches?** Your embedding provider may not be configured. Check
|
||||
`openclaw memory status --deep`.
|
||||
|
||||
**Local embeddings time out?** `ollama`, `lmstudio`, and `local` use a longer
|
||||
inline batch timeout by default. If the host is simply slow, set
|
||||
`agents.defaults.memorySearch.sync.embeddingBatchTimeoutSeconds` and rerun
|
||||
`openclaw memory index --force`.
|
||||
|
||||
**CJK text not found?** Rebuild the FTS index with
|
||||
`openclaw memory index --force`.
|
||||
|
||||
|
||||
@@ -219,6 +219,17 @@ to an existing local file. `hf:` and HTTP(S) model references can still be used
|
||||
explicitly with `provider: "local"`, but they do not make `auto` select local
|
||||
before the model is available on disk.
|
||||
|
||||
### Inline embedding timeout
|
||||
|
||||
| Key | Type | Default | Description |
|
||||
| ----------------------------------- | -------- | ---------------- | ------------------------------------------------------------------------ |
|
||||
| `sync.embeddingBatchTimeoutSeconds` | `number` | provider default | Override the timeout for inline embedding batches during memory indexing |
|
||||
|
||||
Unset uses the provider default: 600 seconds for local/self-hosted providers
|
||||
such as `local`, `ollama`, and `lmstudio`, and 120 seconds for hosted providers.
|
||||
|
||||
Increase this when local CPU-bound embedding batches are healthy but slow.
|
||||
|
||||
---
|
||||
|
||||
## Hybrid search config
|
||||
@@ -347,6 +358,10 @@ Prevents re-embedding unchanged text during reindex or transcript updates.
|
||||
Available for `openai`, `gemini`, and `voyage`. OpenAI batch is typically
|
||||
fastest and cheapest for large backfills.
|
||||
|
||||
This is separate from `sync.embeddingBatchTimeoutSeconds`, which controls inline
|
||||
embedding calls used by local/self-hosted providers and hosted providers when
|
||||
provider batch APIs are not active.
|
||||
|
||||
---
|
||||
|
||||
## Session memory search (experimental)
|
||||
|
||||
@@ -23,6 +23,7 @@ export const lmstudioMemoryEmbeddingProviderAdapter: MemoryEmbeddingProviderAdap
|
||||
provider,
|
||||
runtime: {
|
||||
id: "lmstudio",
|
||||
inlineBatchTimeoutMs: 10 * 60_000,
|
||||
cacheKeyData: {
|
||||
provider: "lmstudio",
|
||||
baseUrl: client.baseUrl,
|
||||
|
||||
@@ -4,6 +4,7 @@ import {
|
||||
enforceEmbeddingMaxInputTokens,
|
||||
hasNonTextEmbeddingParts,
|
||||
type EmbeddingInput,
|
||||
type MemoryEmbeddingProviderRuntime,
|
||||
} from "openclaw/plugin-sdk/memory-core-host-engine-embeddings";
|
||||
import { createSubsystemLogger } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
|
||||
import { type SessionFileEntry } from "openclaw/plugin-sdk/memory-core-host-engine-qmd";
|
||||
@@ -54,6 +55,38 @@ const EMBEDDING_BATCH_TIMEOUT_LOCAL_MS = 10 * 60_000;
|
||||
|
||||
const log = createSubsystemLogger("memory");
|
||||
|
||||
export function resolveEmbeddingTimeoutMs(params: {
|
||||
kind: "query" | "batch";
|
||||
providerId?: string;
|
||||
providerRuntime?: Pick<
|
||||
MemoryEmbeddingProviderRuntime,
|
||||
"inlineQueryTimeoutMs" | "inlineBatchTimeoutMs"
|
||||
>;
|
||||
configuredBatchTimeoutSeconds?: number;
|
||||
}): number {
|
||||
if (params.kind === "query") {
|
||||
const runtimeTimeoutMs = params.providerRuntime?.inlineQueryTimeoutMs;
|
||||
if (typeof runtimeTimeoutMs === "number" && runtimeTimeoutMs > 0) {
|
||||
return runtimeTimeoutMs;
|
||||
}
|
||||
return params.providerId === "local"
|
||||
? EMBEDDING_QUERY_TIMEOUT_LOCAL_MS
|
||||
: EMBEDDING_QUERY_TIMEOUT_REMOTE_MS;
|
||||
}
|
||||
|
||||
const configuredTimeoutSeconds = params.configuredBatchTimeoutSeconds;
|
||||
if (typeof configuredTimeoutSeconds === "number" && configuredTimeoutSeconds > 0) {
|
||||
return configuredTimeoutSeconds * 1000;
|
||||
}
|
||||
const runtimeTimeoutMs = params.providerRuntime?.inlineBatchTimeoutMs;
|
||||
if (typeof runtimeTimeoutMs === "number" && runtimeTimeoutMs > 0) {
|
||||
return runtimeTimeoutMs;
|
||||
}
|
||||
return params.providerId === "local"
|
||||
? EMBEDDING_BATCH_TIMEOUT_LOCAL_MS
|
||||
: EMBEDDING_BATCH_TIMEOUT_REMOTE_MS;
|
||||
}
|
||||
|
||||
export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
|
||||
protected abstract batchFailureCount: number;
|
||||
protected abstract batchFailureLastError?: string;
|
||||
@@ -305,11 +338,12 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
|
||||
}
|
||||
|
||||
private resolveEmbeddingTimeout(kind: "query" | "batch"): number {
|
||||
const isLocal = this.provider?.id === "local";
|
||||
if (kind === "query") {
|
||||
return isLocal ? EMBEDDING_QUERY_TIMEOUT_LOCAL_MS : EMBEDDING_QUERY_TIMEOUT_REMOTE_MS;
|
||||
}
|
||||
return isLocal ? EMBEDDING_BATCH_TIMEOUT_LOCAL_MS : EMBEDDING_BATCH_TIMEOUT_REMOTE_MS;
|
||||
return resolveEmbeddingTimeoutMs({
|
||||
kind,
|
||||
providerId: this.provider?.id,
|
||||
providerRuntime: this.providerRuntime,
|
||||
configuredBatchTimeoutSeconds: this.settings.sync.embeddingBatchTimeoutSeconds,
|
||||
});
|
||||
}
|
||||
|
||||
protected async embedQueryWithTimeout(text: string): Promise<number[]> {
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { resolveEmbeddingTimeoutMs } from "./manager-embedding-ops.js";
|
||||
|
||||
describe("memory embedding timeout resolution", () => {
|
||||
it("uses hosted defaults for inline embedding calls", () => {
|
||||
expect(resolveEmbeddingTimeoutMs({ kind: "query", providerId: "openai" })).toBe(60_000);
|
||||
expect(resolveEmbeddingTimeoutMs({ kind: "batch", providerId: "openai" })).toBe(120_000);
|
||||
});
|
||||
|
||||
it("uses local defaults for the builtin local provider", () => {
|
||||
expect(resolveEmbeddingTimeoutMs({ kind: "query", providerId: "local" })).toBe(300_000);
|
||||
expect(resolveEmbeddingTimeoutMs({ kind: "batch", providerId: "local" })).toBe(600_000);
|
||||
});
|
||||
|
||||
it("uses runtime batch defaults for local-server providers", () => {
|
||||
expect(
|
||||
resolveEmbeddingTimeoutMs({
|
||||
kind: "batch",
|
||||
providerId: "ollama",
|
||||
providerRuntime: { inlineBatchTimeoutMs: 600_000 },
|
||||
}),
|
||||
).toBe(600_000);
|
||||
});
|
||||
|
||||
it("lets configured batch timeout override provider defaults", () => {
|
||||
expect(
|
||||
resolveEmbeddingTimeoutMs({
|
||||
kind: "batch",
|
||||
providerId: "ollama",
|
||||
providerRuntime: { inlineBatchTimeoutMs: 600_000 },
|
||||
configuredBatchTimeoutSeconds: 45,
|
||||
}),
|
||||
).toBe(45_000);
|
||||
});
|
||||
});
|
||||
@@ -101,6 +101,8 @@ const localAdapter: MemoryEmbeddingProviderAdapter = {
|
||||
provider,
|
||||
runtime: {
|
||||
id: "local",
|
||||
inlineQueryTimeoutMs: 5 * 60_000,
|
||||
inlineBatchTimeoutMs: 10 * 60_000,
|
||||
cacheKeyData: {
|
||||
provider: "local",
|
||||
model: provider.model,
|
||||
|
||||
@@ -18,9 +18,11 @@ vi.mock("openclaw/plugin-sdk/ssrf-runtime", () => ({
|
||||
}));
|
||||
|
||||
let createOllamaEmbeddingProvider: typeof import("./embedding-provider.js").createOllamaEmbeddingProvider;
|
||||
let ollamaMemoryEmbeddingProviderAdapter: typeof import("./memory-embedding-adapter.js").ollamaMemoryEmbeddingProviderAdapter;
|
||||
|
||||
beforeAll(async () => {
|
||||
({ createOllamaEmbeddingProvider } = await import("./embedding-provider.js"));
|
||||
({ ollamaMemoryEmbeddingProviderAdapter } = await import("./memory-embedding-adapter.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
@@ -147,4 +149,49 @@ describe("ollama embedding provider", () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("serializes batch embeddings to avoid flooding local Ollama", async () => {
|
||||
let inFlight = 0;
|
||||
let maxInFlight = 0;
|
||||
const prompts: string[] = [];
|
||||
const fetchMock = vi.fn(async (_url: string, init?: RequestInit) => {
|
||||
inFlight += 1;
|
||||
maxInFlight = Math.max(maxInFlight, inFlight);
|
||||
const rawBody = typeof init?.body === "string" ? init.body : "{}";
|
||||
const body = JSON.parse(rawBody) as { prompt?: string };
|
||||
prompts.push(body.prompt ?? "");
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
inFlight -= 1;
|
||||
return new Response(JSON.stringify({ embedding: [1, 0] }), {
|
||||
status: 200,
|
||||
headers: { "content-type": "application/json" },
|
||||
});
|
||||
});
|
||||
vi.stubGlobal("fetch", fetchMock);
|
||||
|
||||
const { provider } = await createOllamaEmbeddingProvider({
|
||||
config: {} as OpenClawConfig,
|
||||
provider: "ollama",
|
||||
model: "nomic-embed-text",
|
||||
fallback: "none",
|
||||
remote: { baseUrl: "http://127.0.0.1:11434" },
|
||||
});
|
||||
|
||||
await expect(provider.embedBatch(["a", "bb", "ccc"])).resolves.toHaveLength(3);
|
||||
expect(fetchMock).toHaveBeenCalledTimes(3);
|
||||
expect(prompts).toEqual(["a", "bb", "ccc"]);
|
||||
expect(maxInFlight).toBe(1);
|
||||
});
|
||||
|
||||
it("marks inline memory batches as local-server timeout work", async () => {
|
||||
const result = await ollamaMemoryEmbeddingProviderAdapter.create({
|
||||
config: {} as OpenClawConfig,
|
||||
provider: "ollama",
|
||||
model: "nomic-embed-text",
|
||||
fallback: "none",
|
||||
remote: { baseUrl: "http://127.0.0.1:11434" },
|
||||
});
|
||||
|
||||
expect(result.runtime?.inlineBatchTimeoutMs).toBe(600_000);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -48,6 +48,7 @@ export type OllamaEmbeddingClient = {
|
||||
type OllamaEmbeddingClientConfig = Omit<OllamaEmbeddingClient, "embedBatch">;
|
||||
|
||||
export const DEFAULT_OLLAMA_EMBEDDING_MODEL = "nomic-embed-text";
|
||||
const OLLAMA_EMBEDDING_BATCH_CONCURRENCY = 1;
|
||||
|
||||
function sanitizeAndNormalizeEmbedding(vec: number[]): number[] {
|
||||
const sanitized = vec.map((value) => (Number.isFinite(value) ? value : 0));
|
||||
@@ -172,7 +173,12 @@ export async function createOllamaEmbeddingProvider(
|
||||
model: client.model,
|
||||
embedQuery: embedOne,
|
||||
embedBatch: async (texts) => {
|
||||
return await Promise.all(texts.map(embedOne));
|
||||
const embeddings: number[][] = [];
|
||||
for (let index = 0; index < texts.length; index += OLLAMA_EMBEDDING_BATCH_CONCURRENCY) {
|
||||
const batch = texts.slice(index, index + OLLAMA_EMBEDDING_BATCH_CONCURRENCY);
|
||||
embeddings.push(...(await Promise.all(batch.map(embedOne))));
|
||||
}
|
||||
return embeddings;
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ export const ollamaMemoryEmbeddingProviderAdapter: MemoryEmbeddingProviderAdapte
|
||||
provider,
|
||||
runtime: {
|
||||
id: "ollama",
|
||||
inlineBatchTimeoutMs: 10 * 60_000,
|
||||
cacheKeyData: {
|
||||
provider: "ollama",
|
||||
model: client.model,
|
||||
|
||||
@@ -225,6 +225,7 @@ describe("memory search config", () => {
|
||||
watch: false,
|
||||
watchDebounceMs: 25,
|
||||
intervalMinutes: 3,
|
||||
embeddingBatchTimeoutSeconds: undefined,
|
||||
sessions: {
|
||||
deltaBytes: 321,
|
||||
deltaMessages: 7,
|
||||
@@ -233,6 +234,23 @@ describe("memory search config", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("uses configured embeddingBatchTimeoutSeconds when set", () => {
|
||||
const cfg = asConfig({
|
||||
agents: {
|
||||
defaults: {
|
||||
memorySearch: {
|
||||
provider: "openai",
|
||||
sync: {
|
||||
embeddingBatchTimeoutSeconds: 600,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(resolveMemorySearchSyncConfig(cfg, "main")?.embeddingBatchTimeoutSeconds).toBe(600);
|
||||
});
|
||||
|
||||
it("merges defaults and overrides", () => {
|
||||
const cfg = asConfig({
|
||||
agents: {
|
||||
|
||||
@@ -62,6 +62,7 @@ export type ResolvedMemorySearchConfig = {
|
||||
watch: boolean;
|
||||
watchDebounceMs: number;
|
||||
intervalMinutes: number;
|
||||
embeddingBatchTimeoutSeconds: number | undefined;
|
||||
sessions: {
|
||||
deltaBytes: number;
|
||||
deltaMessages: number;
|
||||
@@ -360,6 +361,8 @@ function resolveSyncConfig(
|
||||
defaults?.sync?.watchDebounceMs ??
|
||||
DEFAULT_WATCH_DEBOUNCE_MS,
|
||||
intervalMinutes: overrides?.sync?.intervalMinutes ?? defaults?.sync?.intervalMinutes ?? 0,
|
||||
embeddingBatchTimeoutSeconds:
|
||||
overrides?.sync?.embeddingBatchTimeoutSeconds ?? defaults?.sync?.embeddingBatchTimeoutSeconds,
|
||||
sessions: {
|
||||
deltaBytes:
|
||||
overrides?.sync?.sessions?.deltaBytes ??
|
||||
|
||||
@@ -4479,6 +4479,14 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
|
||||
minimum: 0,
|
||||
maximum: 9007199254740991,
|
||||
},
|
||||
embeddingBatchTimeoutSeconds: {
|
||||
type: "integer",
|
||||
exclusiveMinimum: 0,
|
||||
maximum: 9007199254740991,
|
||||
title: "Embedding Batch Timeout (s)",
|
||||
description:
|
||||
"Overrides the timeout for inline embedding batches during memory indexing. Leave unset to use provider defaults: 600 seconds for local/self-hosted providers such as local, Ollama, and LM Studio, and 120 seconds for hosted providers.",
|
||||
},
|
||||
sessions: {
|
||||
type: "object",
|
||||
properties: {
|
||||
@@ -6361,6 +6369,11 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
|
||||
minimum: 0,
|
||||
maximum: 9007199254740991,
|
||||
},
|
||||
embeddingBatchTimeoutSeconds: {
|
||||
type: "integer",
|
||||
exclusiveMinimum: 0,
|
||||
maximum: 9007199254740991,
|
||||
},
|
||||
sessions: {
|
||||
type: "object",
|
||||
properties: {
|
||||
@@ -25577,6 +25590,11 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
|
||||
help: "Debounce window in milliseconds for coalescing rapid file-watch events before reindex runs. Increase to reduce churn on frequently-written files, or lower for faster freshness.",
|
||||
tags: ["performance", "automation"],
|
||||
},
|
||||
"agents.defaults.memorySearch.sync.embeddingBatchTimeoutSeconds": {
|
||||
label: "Embedding Batch Timeout (s)",
|
||||
help: "Overrides the timeout for inline embedding batches during memory indexing. Leave unset to use provider defaults: 600 seconds for local/self-hosted providers such as local, Ollama, and LM Studio, and 120 seconds for hosted providers.",
|
||||
tags: ["performance"],
|
||||
},
|
||||
"agents.defaults.memorySearch.sync.sessions.deltaBytes": {
|
||||
label: "Session Delta Bytes",
|
||||
help: "Requires at least this many newly appended bytes before session transcript changes trigger reindex (default: 100000). Increase to reduce frequent small reindexes, or lower for faster transcript freshness.",
|
||||
|
||||
@@ -108,6 +108,7 @@ const TARGET_KEYS = [
|
||||
"agents.defaults.memorySearch.cache.maxEntries",
|
||||
"agents.defaults.memorySearch.sync.onSearch",
|
||||
"agents.defaults.memorySearch.sync.watch",
|
||||
"agents.defaults.memorySearch.sync.embeddingBatchTimeoutSeconds",
|
||||
"agents.defaults.memorySearch.sync.sessions.deltaBytes",
|
||||
"agents.defaults.memorySearch.sync.sessions.deltaMessages",
|
||||
"models.mode",
|
||||
|
||||
@@ -1113,6 +1113,8 @@ export const FIELD_HELP: Record<string, string> = {
|
||||
"Watches memory files and schedules index updates from file-change events (chokidar). Enable for near-real-time freshness; disable on very large workspaces if watch churn is too noisy.",
|
||||
"agents.defaults.memorySearch.sync.watchDebounceMs":
|
||||
"Debounce window in milliseconds for coalescing rapid file-watch events before reindex runs. Increase to reduce churn on frequently-written files, or lower for faster freshness.",
|
||||
"agents.defaults.memorySearch.sync.embeddingBatchTimeoutSeconds":
|
||||
"Overrides the timeout for inline embedding batches during memory indexing. Leave unset to use provider defaults: 600 seconds for local/self-hosted providers such as local, Ollama, and LM Studio, and 120 seconds for hosted providers.",
|
||||
"agents.defaults.memorySearch.sync.sessions.deltaBytes":
|
||||
"Requires at least this many newly appended bytes before session transcript changes trigger reindex (default: 100000). Increase to reduce frequent small reindexes, or lower for faster transcript freshness.",
|
||||
"agents.defaults.memorySearch.sync.sessions.deltaMessages":
|
||||
|
||||
@@ -427,6 +427,7 @@ export const FIELD_LABELS: Record<string, string> = {
|
||||
"agents.defaults.memorySearch.sync.onSearch": "Index on Search (Lazy)",
|
||||
"agents.defaults.memorySearch.sync.watch": "Watch Memory Files",
|
||||
"agents.defaults.memorySearch.sync.watchDebounceMs": "Memory Watch Debounce (ms)",
|
||||
"agents.defaults.memorySearch.sync.embeddingBatchTimeoutSeconds": "Embedding Batch Timeout (s)",
|
||||
"agents.defaults.memorySearch.sync.sessions.deltaBytes": "Session Delta Bytes",
|
||||
"agents.defaults.memorySearch.sync.sessions.deltaMessages": "Session Delta Messages",
|
||||
"agents.defaults.memorySearch.sync.sessions.postCompactionForce":
|
||||
|
||||
@@ -433,6 +433,11 @@ export type MemorySearchConfig = {
|
||||
watch?: boolean;
|
||||
watchDebounceMs?: number;
|
||||
intervalMinutes?: number;
|
||||
/**
|
||||
* Timeout in seconds for inline embedding batches during memory indexing.
|
||||
* Unset uses provider defaults: 600s for local/self-hosted providers, 120s for hosted providers.
|
||||
*/
|
||||
embeddingBatchTimeoutSeconds?: number;
|
||||
sessions?: {
|
||||
/** Minimum appended bytes before session transcripts are reindexed. */
|
||||
deltaBytes?: number;
|
||||
|
||||
@@ -722,6 +722,7 @@ export const MemorySearchSchema = z
|
||||
watch: z.boolean().optional(),
|
||||
watchDebounceMs: z.number().int().nonnegative().optional(),
|
||||
intervalMinutes: z.number().int().nonnegative().optional(),
|
||||
embeddingBatchTimeoutSeconds: z.number().int().positive().optional(),
|
||||
sessions: z
|
||||
.object({
|
||||
deltaBytes: z.number().int().nonnegative().optional(),
|
||||
|
||||
@@ -20,6 +20,8 @@ export type MemoryEmbeddingBatchOptions = {
|
||||
export type MemoryEmbeddingProviderRuntime = {
|
||||
id: string;
|
||||
cacheKeyData?: Record<string, unknown>;
|
||||
inlineQueryTimeoutMs?: number;
|
||||
inlineBatchTimeoutMs?: number;
|
||||
batchEmbed?: (options: MemoryEmbeddingBatchOptions) => Promise<number[][] | null>;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user