fix(memory-lancedb): skip processed auto-capture messages safely (#72663)

This commit is contained in:
Vincent Koc
2026-04-26 23:51:04 -07:00
committed by GitHub
parent 9ced682a9d
commit 69c30e37d9
4 changed files with 348 additions and 65 deletions

View File

@@ -114,6 +114,7 @@ Docs: https://docs.openclaw.ai
- Cron/context engine: run isolated cron jobs under run-scoped context-engine session keys so prior runs of the same job are not inherited unless the job is explicitly session-bound. (#72292) Thanks @jalehman.
- Control UI: localize command palette labels, categories, skill shortcuts, footer hints, and connect-command copy labels while preserving localized command palette search matching. (#61130, #61119) Thanks @rubensfox20.
- Plugins/memory-lancedb: request float embedding responses from OpenAI-compatible servers so local providers that default SDK requests to base64 no longer return dimension-mismatched LanceDB vectors while preserving configured dimensions. Fixes #45982. (#59048, #46069, #45986) Thanks @deep-introspection, @xiaokhkh, @caicongyang, and @thiswind.
- Plugins/memory-lancedb: advance auto-capture cursors per session only after messages are processed or intentionally skipped, retry failed messages, survive compacted histories, and clear cursor state on session end. Fixes #71349; carries forward #42083. Thanks @as775116191.
- Plugins/memory-core: respect configured memory-search embedding concurrency during non-batch indexing so local Ollama embedding backends can serialize indexing instead of flooding the server. Fixes #66822. (#66931) Thanks @oliviareid-svg and @LyraInTheFlesh.
- Docker/update smoke: keep the package-derived update-channel fixture on package-shipped files and make its UI build stub create the asset the updater verifies. Thanks @vincentkoc.
- Gateway/models: repair legacy `models.providers.*.api = "openai"` config values to `openai-completions`, and skip providers with future stale API enum values during startup instead of bricking the gateway. Fixes #72477. (#72542) Thanks @JooyoungChoi14 and @obviyus.

View File

@@ -1271,6 +1271,232 @@ describe("memory plugin e2e", () => {
}
});
async function setupAutoCaptureCursorHarness(overrides?: {
embeddingsCreate?: ReturnType<typeof vi.fn>;
searchResults?: Array<Record<string, unknown>>;
}) {
const embeddingsCreate =
overrides?.embeddingsCreate ??
vi.fn(async () => ({
data: [{ embedding: [0.1, 0.2, 0.3] }],
}));
const ensureGlobalUndiciEnvProxyDispatcher = vi.fn();
const add = vi.fn(async () => undefined);
const toArray = vi.fn(async () => overrides?.searchResults ?? []);
const limit = vi.fn(() => ({ toArray }));
const vectorSearch = vi.fn(() => ({ limit }));
const openTable = vi.fn(async () => ({
vectorSearch,
countRows: vi.fn(async () => 0),
add,
delete: vi.fn(async () => undefined),
}));
const loadLanceDbModule = vi.fn(async () => ({
connect: vi.fn(async () => ({
tableNames: vi.fn(async () => ["memories"]),
openTable,
})),
}));
vi.resetModules();
vi.doMock("openclaw/plugin-sdk/runtime-env", () => ({
ensureGlobalUndiciEnvProxyDispatcher,
}));
vi.doMock("openai", () => ({
default: class MockOpenAI {
embeddings = { create: embeddingsCreate };
},
}));
vi.doMock("./lancedb-runtime.js", () => ({
loadLanceDbModule,
}));
const { default: dynamicMemoryPlugin } = await import("./index.js");
const on = vi.fn();
const logger = {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
};
const mockApi = {
id: "memory-lancedb",
name: "Memory (LanceDB)",
source: "test",
config: {},
pluginConfig: {
embedding: {
apiKey: OPENAI_API_KEY,
model: "text-embedding-3-small",
},
dbPath: getDbPath(),
autoCapture: true,
autoRecall: false,
},
runtime: {},
logger,
registerTool: vi.fn(),
registerCli: vi.fn(),
registerService: vi.fn(),
on,
resolvePath: (p: string) => p,
};
dynamicMemoryPlugin.register(mockApi as any);
const agentEnd = on.mock.calls.find(([hookName]) => hookName === "agent_end")?.[1];
const sessionEnd = on.mock.calls.find(([hookName]) => hookName === "session_end")?.[1];
expect(agentEnd).toBeTypeOf("function");
expect(sessionEnd).toBeTypeOf("function");
return {
add,
agentEnd,
embeddingsCreate,
ensureGlobalUndiciEnvProxyDispatcher,
loadLanceDbModule,
logger,
sessionEnd,
};
}
async function cleanupAutoCaptureCursorHarness() {
vi.doUnmock("openclaw/plugin-sdk/runtime-env");
vi.doUnmock("openai");
vi.doUnmock("./lancedb-runtime.js");
vi.resetModules();
}
test("skips already-processed auto-capture messages by session cursor", async () => {
const harness = await setupAutoCaptureCursorHarness();
try {
await harness.agentEnd?.(
{
success: true,
messages: [{ role: "user", content: "I prefer Helix for editing code every day." }],
},
{ sessionKey: "session-a" },
);
await harness.agentEnd?.(
{
success: true,
messages: [
{ role: "user", content: "I prefer Helix for editing code every day." },
{ role: "user", content: "I prefer Fish for shell commands every day." },
],
},
{ sessionKey: "session-a" },
);
expect(harness.embeddingsCreate).toHaveBeenCalledTimes(2);
expect(harness.embeddingsCreate).toHaveBeenNthCalledWith(1, {
model: "text-embedding-3-small",
input: "I prefer Helix for editing code every day.",
encoding_format: "float",
});
expect(harness.embeddingsCreate).toHaveBeenNthCalledWith(2, {
model: "text-embedding-3-small",
input: "I prefer Fish for shell commands every day.",
encoding_format: "float",
});
expect(harness.add).toHaveBeenCalledTimes(2);
} finally {
await cleanupAutoCaptureCursorHarness();
}
});
test("does not advance auto-capture cursor when message processing fails", async () => {
const embeddingsCreate = vi
.fn()
.mockRejectedValueOnce(new Error("temporary embedding failure"))
.mockResolvedValueOnce({ data: [{ embedding: [0.1, 0.2, 0.3] }] });
const harness = await setupAutoCaptureCursorHarness({ embeddingsCreate });
try {
const event = {
success: true,
messages: [{ role: "user", content: "I prefer Helix for editing code every day." }],
};
await harness.agentEnd?.(event, { sessionKey: "session-failure" });
await harness.agentEnd?.(event, { sessionKey: "session-failure" });
expect(embeddingsCreate).toHaveBeenCalledTimes(2);
expect(harness.add).toHaveBeenCalledTimes(1);
expect(harness.logger.warn).toHaveBeenCalledWith(
expect.stringContaining("memory-lancedb: capture failed:"),
);
} finally {
await cleanupAutoCaptureCursorHarness();
}
});
test("does not lose new auto-capture messages after history compaction rewrites prior turns", async () => {
const harness = await setupAutoCaptureCursorHarness();
try {
await harness.agentEnd?.(
{
success: true,
messages: [
{ role: "user", content: "I prefer Helix for editing code every day." },
{ role: "user", content: "I prefer Fish for shell commands every day." },
],
},
{ sessionKey: "session-compacted" },
);
await harness.agentEnd?.(
{
success: true,
messages: [
{ role: "assistant", content: "Earlier history was compacted." },
{ role: "user", content: "I prefer Deno for small scripts every day." },
],
},
{ sessionKey: "session-compacted" },
);
expect(harness.embeddingsCreate).toHaveBeenCalledTimes(3);
expect(harness.embeddingsCreate).toHaveBeenNthCalledWith(3, {
model: "text-embedding-3-small",
input: "I prefer Deno for small scripts every day.",
encoding_format: "float",
});
expect(harness.add).toHaveBeenCalledTimes(3);
} finally {
await cleanupAutoCaptureCursorHarness();
}
});
test("evicts auto-capture cursor state on session end", async () => {
const harness = await setupAutoCaptureCursorHarness();
try {
const event = {
success: true,
messages: [{ role: "user", content: "I prefer Helix for editing code every day." }],
};
await harness.agentEnd?.(event, { sessionKey: "session-ended" });
await harness.sessionEnd?.(
{
sessionId: "session-id",
sessionKey: "session-ended",
messageCount: 1,
reason: "deleted",
},
{ sessionId: "session-id", sessionKey: "session-ended" },
);
await harness.agentEnd?.(event, { sessionKey: "session-ended" });
expect(harness.embeddingsCreate).toHaveBeenCalledTimes(2);
expect(harness.add).toHaveBeenCalledTimes(2);
} finally {
await cleanupAutoCaptureCursorHarness();
}
});
test("passes configured dimensions to OpenAI embeddings API", async () => {
const embeddingsCreate = vi.fn(async () => ({
data: [{ embedding: [0.1, 0.2, 0.3] }],

View File

@@ -41,12 +41,78 @@ type MemorySearchResult = {
score: number;
};
type AutoCaptureCursor = {
nextIndex: number;
lastMessageFingerprint?: string;
};
function asRecord(value: unknown): Record<string, unknown> | undefined {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: undefined;
}
function extractUserTextContent(message: unknown): string[] {
const msgObj = asRecord(message);
if (!msgObj || msgObj.role !== "user") {
return [];
}
const content = msgObj.content;
if (typeof content === "string") {
return [content];
}
if (!Array.isArray(content)) {
return [];
}
const texts: string[] = [];
for (const block of content) {
const blockObj = asRecord(block);
if (blockObj?.type === "text" && typeof blockObj.text === "string") {
texts.push(blockObj.text);
}
}
return texts;
}
function messageFingerprint(message: unknown): string {
const msgObj = asRecord(message);
if (!msgObj) {
return `${typeof message}:${String(message)}`;
}
try {
return JSON.stringify({
role: msgObj.role,
content: msgObj.content,
});
} catch {
return `${String(msgObj.role)}:${String(msgObj.content)}`;
}
}
function resolveAutoCaptureStartIndex(
messages: unknown[],
cursor: AutoCaptureCursor | undefined,
): number {
if (!cursor) {
return 0;
}
if (cursor.lastMessageFingerprint && cursor.nextIndex > 0) {
for (let index = messages.length - 1; index >= 0; index--) {
if (messageFingerprint(messages[index]) === cursor.lastMessageFingerprint) {
return index + 1;
}
}
return 0;
}
if (cursor.nextIndex <= messages.length) {
return cursor.nextIndex;
}
return 0;
}
// ============================================================================
// LanceDB Provider
// ============================================================================
@@ -312,6 +378,7 @@ export default definePluginEntry({
const vectorDim = dimensions ?? vectorDimsForModel(model);
const db = new MemoryDB(resolvedDbPath, vectorDim, cfg.storageOptions);
const embeddings = new Embeddings(apiKey, model, baseUrl, dimensions);
const autoCaptureCursors = new Map<string, AutoCaptureCursor>();
const resolveCurrentHookConfig = () => {
const runtimePluginConfig = resolveLivePluginConfigObject(
api.runtime.config?.loadConfig,
@@ -611,7 +678,7 @@ export default definePluginEntry({
});
// Auto-capture: analyze and store important information after agent ends
api.on("agent_end", async (event) => {
api.on("agent_end", async (event, ctx) => {
const currentCfg = resolveCurrentHookConfig();
if (!currentCfg.autoCapture) {
return;
@@ -621,75 +688,55 @@ export default definePluginEntry({
}
try {
// Extract text content from messages (handling unknown[] type)
const texts: string[] = [];
for (const msg of event.messages) {
// Type guard for message object
if (!msg || typeof msg !== "object") {
continue;
}
const msgObj = msg as Record<string, unknown>;
const cursorKey = ctx.sessionKey ?? ctx.sessionId;
const startIndex = resolveAutoCaptureStartIndex(
event.messages,
cursorKey ? autoCaptureCursors.get(cursorKey) : undefined,
);
let stored = 0;
let capturableSeen = 0;
for (let index = startIndex; index < event.messages.length; index++) {
const message = event.messages[index];
let messageProcessed = false;
// Only process user messages to avoid self-poisoning from model output
const role = msgObj.role;
if (role !== "user") {
continue;
}
const content = msgObj.content;
// Handle string content directly
if (typeof content === "string") {
texts.push(content);
continue;
}
// Handle array content (content blocks)
if (Array.isArray(content)) {
for (const block of content) {
if (
block &&
typeof block === "object" &&
"type" in block &&
(block as Record<string, unknown>).type === "text" &&
"text" in block &&
typeof (block as Record<string, unknown>).text === "string"
) {
texts.push((block as Record<string, unknown>).text as string);
try {
for (const text of extractUserTextContent(message)) {
if (!text || !shouldCapture(text, { maxChars: currentCfg.captureMaxChars })) {
continue;
}
capturableSeen++;
if (capturableSeen > 3) {
continue;
}
const category = detectCategory(text);
const vector = await embeddings.embed(text);
// Check for duplicates (high similarity threshold)
const existing = await db.search(vector, 1, 0.95);
if (existing.length > 0) {
continue;
}
await db.store({
text,
vector,
importance: 0.7,
category,
});
stored++;
}
messageProcessed = true;
} finally {
if (messageProcessed && cursorKey) {
autoCaptureCursors.set(cursorKey, {
nextIndex: index + 1,
lastMessageFingerprint: messageFingerprint(message),
});
}
}
}
// Filter for capturable content
const toCapture = texts.filter(
(text) => text && shouldCapture(text, { maxChars: currentCfg.captureMaxChars }),
);
if (toCapture.length === 0) {
return;
}
// Store each capturable piece (limit to 3 per conversation)
let stored = 0;
for (const text of toCapture.slice(0, 3)) {
const category = detectCategory(text);
const vector = await embeddings.embed(text);
// Check for duplicates (high similarity threshold)
const existing = await db.search(vector, 1, 0.95);
if (existing.length > 0) {
continue;
}
await db.store({
text,
vector,
importance: 0.7,
category,
});
stored++;
}
if (stored > 0) {
api.logger.info(`memory-lancedb: auto-captured ${stored} memories`);
}
@@ -698,6 +745,15 @@ export default definePluginEntry({
}
});
api.on("session_end", (event, ctx) => {
const cursorKey = ctx.sessionKey ?? event.sessionKey ?? ctx.sessionId ?? event.sessionId;
autoCaptureCursors.delete(cursorKey);
const nextCursorKey = event.nextSessionKey ?? event.nextSessionId;
if (nextCursorKey) {
autoCaptureCursors.delete(nextCursorKey);
}
});
// ========================================================================
// Service
// ========================================================================

View File

@@ -41,7 +41,7 @@ const BUNDLED_TYPED_HOOK_REGISTRATION_GUARDS = {
"subagent_spawning",
],
"extensions/memory-core/src/dreaming.ts": ["before_agent_reply", "gateway_start"],
"extensions/memory-lancedb/index.ts": ["agent_end", "before_prompt_build"],
"extensions/memory-lancedb/index.ts": ["agent_end", "before_prompt_build", "session_end"],
"extensions/skill-workshop/index.ts": ["agent_end", "before_prompt_build"],
"extensions/thread-ownership/index.ts": ["message_received", "message_sending"],
} as const satisfies Record<