mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-02 16:40:24 +00:00
feat(memory): native Voyage AI support (#7078)
* feat(memory): add native Voyage AI embedding support with batching Cherry-picked from PR #2519, resolved conflict in memory-search.ts (hasRemote -> hasRemoteConfig rename + added voyage provider) * fix(memory): optimize voyage batch memory usage with streaming and deduplicate code Cherry-picked from PR #2519. Fixed lint error: changed this.runWithConcurrency to use imported runWithConcurrency function after extraction to internal.ts
This commit is contained in:
@@ -26,14 +26,17 @@ import {
|
||||
type OpenAiBatchRequest,
|
||||
runOpenAiEmbeddingBatches,
|
||||
} from "./batch-openai.js";
|
||||
import { type VoyageBatchRequest, runVoyageEmbeddingBatches } from "./batch-voyage.js";
|
||||
import { DEFAULT_GEMINI_EMBEDDING_MODEL } from "./embeddings-gemini.js";
|
||||
import { DEFAULT_OPENAI_EMBEDDING_MODEL } from "./embeddings-openai.js";
|
||||
import { DEFAULT_VOYAGE_EMBEDDING_MODEL } from "./embeddings-voyage.js";
|
||||
import {
|
||||
createEmbeddingProvider,
|
||||
type EmbeddingProvider,
|
||||
type EmbeddingProviderResult,
|
||||
type GeminiEmbeddingClient,
|
||||
type OpenAiEmbeddingClient,
|
||||
type VoyageEmbeddingClient,
|
||||
} from "./embeddings.js";
|
||||
import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js";
|
||||
import {
|
||||
@@ -47,6 +50,7 @@ import {
|
||||
type MemoryChunk,
|
||||
type MemoryFileEntry,
|
||||
parseEmbedding,
|
||||
runWithConcurrency,
|
||||
} from "./internal.js";
|
||||
import { searchKeyword, searchVector } from "./manager-search.js";
|
||||
import { ensureMemoryIndexSchema } from "./memory-schema.js";
|
||||
@@ -112,11 +116,12 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
private readonly workspaceDir: string;
|
||||
private readonly settings: ResolvedMemorySearchConfig;
|
||||
private provider: EmbeddingProvider;
|
||||
private readonly requestedProvider: "openai" | "local" | "gemini" | "auto";
|
||||
private fallbackFrom?: "openai" | "local" | "gemini";
|
||||
private readonly requestedProvider: "openai" | "local" | "gemini" | "voyage" | "auto";
|
||||
private fallbackFrom?: "openai" | "local" | "gemini" | "voyage";
|
||||
private fallbackReason?: string;
|
||||
private openAi?: OpenAiEmbeddingClient;
|
||||
private gemini?: GeminiEmbeddingClient;
|
||||
private voyage?: VoyageEmbeddingClient;
|
||||
private batch: {
|
||||
enabled: boolean;
|
||||
wait: boolean;
|
||||
@@ -217,6 +222,7 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
this.fallbackReason = params.providerResult.fallbackReason;
|
||||
this.openAi = params.providerResult.openAi;
|
||||
this.gemini = params.providerResult.gemini;
|
||||
this.voyage = params.providerResult.voyage;
|
||||
this.sources = new Set(params.settings.sources);
|
||||
this.db = this.openDatabase();
|
||||
this.providerKey = this.computeProviderKey();
|
||||
@@ -1109,7 +1115,7 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
});
|
||||
}
|
||||
});
|
||||
await this.runWithConcurrency(tasks, this.getIndexConcurrency());
|
||||
await runWithConcurrency(tasks, this.getIndexConcurrency());
|
||||
|
||||
const staleRows = this.db
|
||||
.prepare(`SELECT path FROM files WHERE source = ?`)
|
||||
@@ -1206,7 +1212,7 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
});
|
||||
}
|
||||
});
|
||||
await this.runWithConcurrency(tasks, this.getIndexConcurrency());
|
||||
await runWithConcurrency(tasks, this.getIndexConcurrency());
|
||||
|
||||
const staleRows = this.db
|
||||
.prepare(`SELECT path FROM files WHERE source = ?`)
|
||||
@@ -1346,7 +1352,8 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
const enabled = Boolean(
|
||||
batch?.enabled &&
|
||||
((this.openAi && this.provider.id === "openai") ||
|
||||
(this.gemini && this.provider.id === "gemini")),
|
||||
(this.gemini && this.provider.id === "gemini") ||
|
||||
(this.voyage && this.provider.id === "voyage")),
|
||||
);
|
||||
return {
|
||||
enabled,
|
||||
@@ -1365,14 +1372,16 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
if (this.fallbackFrom) {
|
||||
return false;
|
||||
}
|
||||
const fallbackFrom = this.provider.id as "openai" | "gemini" | "local";
|
||||
const fallbackFrom = this.provider.id as "openai" | "gemini" | "local" | "voyage";
|
||||
|
||||
const fallbackModel =
|
||||
fallback === "gemini"
|
||||
? DEFAULT_GEMINI_EMBEDDING_MODEL
|
||||
: fallback === "openai"
|
||||
? DEFAULT_OPENAI_EMBEDDING_MODEL
|
||||
: this.settings.model;
|
||||
: fallback === "voyage"
|
||||
? DEFAULT_VOYAGE_EMBEDDING_MODEL
|
||||
: this.settings.model;
|
||||
|
||||
const fallbackResult = await createEmbeddingProvider({
|
||||
config: this.cfg,
|
||||
@@ -1389,6 +1398,7 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
this.provider = fallbackResult.provider;
|
||||
this.openAi = fallbackResult.openAi;
|
||||
this.gemini = fallbackResult.gemini;
|
||||
this.voyage = fallbackResult.voyage;
|
||||
this.providerKey = this.computeProviderKey();
|
||||
this.batch = this.resolveBatchConfig();
|
||||
log.warn(`memory embeddings: switched to fallback provider (${fallback})`, { reason });
|
||||
@@ -1865,9 +1875,82 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
if (this.provider.id === "gemini" && this.gemini) {
|
||||
return this.embedChunksWithGeminiBatch(chunks, entry, source);
|
||||
}
|
||||
if (this.provider.id === "voyage" && this.voyage) {
|
||||
return this.embedChunksWithVoyageBatch(chunks, entry, source);
|
||||
}
|
||||
return this.embedChunksInBatches(chunks);
|
||||
}
|
||||
|
||||
private async embedChunksWithVoyageBatch(
|
||||
chunks: MemoryChunk[],
|
||||
entry: MemoryFileEntry | SessionFileEntry,
|
||||
source: MemorySource,
|
||||
): Promise<number[][]> {
|
||||
const voyage = this.voyage;
|
||||
if (!voyage) {
|
||||
return this.embedChunksInBatches(chunks);
|
||||
}
|
||||
if (chunks.length === 0) return [];
|
||||
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
|
||||
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
|
||||
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
|
||||
|
||||
for (let i = 0; i < chunks.length; i += 1) {
|
||||
const chunk = chunks[i];
|
||||
const hit = chunk?.hash ? cached.get(chunk.hash) : undefined;
|
||||
if (hit && hit.length > 0) {
|
||||
embeddings[i] = hit;
|
||||
} else if (chunk) {
|
||||
missing.push({ index: i, chunk });
|
||||
}
|
||||
}
|
||||
|
||||
if (missing.length === 0) return embeddings;
|
||||
|
||||
const requests: VoyageBatchRequest[] = [];
|
||||
const mapping = new Map<string, { index: number; hash: string }>();
|
||||
for (const item of missing) {
|
||||
const chunk = item.chunk;
|
||||
const customId = hashText(
|
||||
`${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`,
|
||||
);
|
||||
mapping.set(customId, { index: item.index, hash: chunk.hash });
|
||||
requests.push({
|
||||
custom_id: customId,
|
||||
body: {
|
||||
input: chunk.text,
|
||||
},
|
||||
});
|
||||
}
|
||||
const batchResult = await this.runBatchWithFallback({
|
||||
provider: "voyage",
|
||||
run: async () =>
|
||||
await runVoyageEmbeddingBatches({
|
||||
client: voyage,
|
||||
agentId: this.agentId,
|
||||
requests,
|
||||
wait: this.batch.wait,
|
||||
concurrency: this.batch.concurrency,
|
||||
pollIntervalMs: this.batch.pollIntervalMs,
|
||||
timeoutMs: this.batch.timeoutMs,
|
||||
debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }),
|
||||
}),
|
||||
fallback: async () => await this.embedChunksInBatches(chunks),
|
||||
});
|
||||
if (Array.isArray(batchResult)) return batchResult;
|
||||
const byCustomId = batchResult;
|
||||
|
||||
const toCache: Array<{ hash: string; embedding: number[] }> = [];
|
||||
for (const [customId, embedding] of byCustomId.entries()) {
|
||||
const mapped = mapping.get(customId);
|
||||
if (!mapped) continue;
|
||||
embeddings[mapped.index] = embedding;
|
||||
toCache.push({ hash: mapped.hash, embedding });
|
||||
}
|
||||
this.upsertEmbeddingCache(toCache);
|
||||
return embeddings;
|
||||
}
|
||||
|
||||
private async embedChunksWithOpenAiBatch(
|
||||
chunks: MemoryChunk[],
|
||||
entry: MemoryFileEntry | SessionFileEntry,
|
||||
@@ -2108,41 +2191,6 @@ export class MemoryIndexManager implements MemorySearchManager {
|
||||
}
|
||||
}
|
||||
|
||||
private async runWithConcurrency<T>(tasks: Array<() => Promise<T>>, limit: number): Promise<T[]> {
|
||||
if (tasks.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const resolvedLimit = Math.max(1, Math.min(limit, tasks.length));
|
||||
const results: T[] = Array.from({ length: tasks.length });
|
||||
let next = 0;
|
||||
let firstError: unknown = null;
|
||||
|
||||
const workers = Array.from({ length: resolvedLimit }, async () => {
|
||||
while (true) {
|
||||
if (firstError) {
|
||||
return;
|
||||
}
|
||||
const index = next;
|
||||
next += 1;
|
||||
if (index >= tasks.length) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
results[index] = await tasks[index]();
|
||||
} catch (err) {
|
||||
firstError = err;
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
await Promise.allSettled(workers);
|
||||
if (firstError) {
|
||||
throw firstError;
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
private async withBatchFailureLock<T>(fn: () => Promise<T>): Promise<T> {
|
||||
let release: () => void;
|
||||
const wait = this.batchFailureLock;
|
||||
|
||||
Reference in New Issue
Block a user