mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 15:40:44 +00:00
fix(plugins): bound prompt memory recall latency
This commit is contained in:
@@ -602,6 +602,102 @@ describe("memory plugin e2e", () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("bounds auto-recall latency during prompt build", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const post = vi.fn(() => new Promise(() => undefined));
|
||||||
|
const ensureGlobalUndiciEnvProxyDispatcher = vi.fn();
|
||||||
|
const loadLanceDbModule = vi.fn(async () => ({
|
||||||
|
connect: vi.fn(async () => ({
|
||||||
|
tableNames: vi.fn(async () => ["memories"]),
|
||||||
|
openTable: vi.fn(async () => ({
|
||||||
|
vectorSearch: vi.fn(() => ({ limit: vi.fn(() => ({ toArray: vi.fn(async () => []) })) })),
|
||||||
|
countRows: vi.fn(async () => 0),
|
||||||
|
add: vi.fn(async () => undefined),
|
||||||
|
delete: vi.fn(async () => undefined),
|
||||||
|
})),
|
||||||
|
})),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.resetModules();
|
||||||
|
vi.doMock("openclaw/plugin-sdk/runtime-env", () => ({
|
||||||
|
ensureGlobalUndiciEnvProxyDispatcher,
|
||||||
|
}));
|
||||||
|
vi.doMock("openai", () => ({
|
||||||
|
default: class MockOpenAI {
|
||||||
|
post = post;
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
vi.doMock("./lancedb-runtime.js", () => ({
|
||||||
|
loadLanceDbModule,
|
||||||
|
}));
|
||||||
|
|
||||||
|
try {
|
||||||
|
const { default: dynamicMemoryPlugin } = await import("./index.js");
|
||||||
|
const on = vi.fn();
|
||||||
|
const logger = {
|
||||||
|
info: vi.fn(),
|
||||||
|
warn: vi.fn(),
|
||||||
|
error: vi.fn(),
|
||||||
|
debug: vi.fn(),
|
||||||
|
};
|
||||||
|
const mockApi = {
|
||||||
|
id: "memory-lancedb",
|
||||||
|
name: "Memory (LanceDB)",
|
||||||
|
source: "test",
|
||||||
|
config: {},
|
||||||
|
pluginConfig: {
|
||||||
|
embedding: {
|
||||||
|
apiKey: OPENAI_API_KEY,
|
||||||
|
model: "text-embedding-3-small",
|
||||||
|
},
|
||||||
|
dbPath: getDbPath(),
|
||||||
|
autoCapture: false,
|
||||||
|
autoRecall: true,
|
||||||
|
},
|
||||||
|
runtime: {},
|
||||||
|
logger,
|
||||||
|
registerTool: vi.fn(),
|
||||||
|
registerCli: vi.fn(),
|
||||||
|
registerService: vi.fn(),
|
||||||
|
on,
|
||||||
|
resolvePath: (p: string) => p,
|
||||||
|
};
|
||||||
|
|
||||||
|
dynamicMemoryPlugin.register(mockApi as any);
|
||||||
|
|
||||||
|
const beforePromptBuild = on.mock.calls.find(
|
||||||
|
([hookName]) => hookName === "before_prompt_build",
|
||||||
|
)?.[1];
|
||||||
|
expect(beforePromptBuild).toBeTypeOf("function");
|
||||||
|
|
||||||
|
const resultPromise = beforePromptBuild?.(
|
||||||
|
{ prompt: "what editor should i use?", messages: [] },
|
||||||
|
{},
|
||||||
|
);
|
||||||
|
await vi.advanceTimersByTimeAsync(15_000);
|
||||||
|
|
||||||
|
await expect(resultPromise).resolves.toBeUndefined();
|
||||||
|
expect(ensureGlobalUndiciEnvProxyDispatcher).toHaveBeenCalledOnce();
|
||||||
|
expect(post).toHaveBeenCalledWith(
|
||||||
|
"/embeddings",
|
||||||
|
expect.objectContaining({
|
||||||
|
maxRetries: 0,
|
||||||
|
timeout: 15_000,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
expect(loadLanceDbModule).not.toHaveBeenCalled();
|
||||||
|
expect(logger.warn).toHaveBeenCalledWith(
|
||||||
|
"memory-lancedb: auto-recall timed out after 15000ms; skipping memory injection to avoid stalling agent startup",
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
vi.doUnmock("openclaw/plugin-sdk/runtime-env");
|
||||||
|
vi.doUnmock("openai");
|
||||||
|
vi.doUnmock("./lancedb-runtime.js");
|
||||||
|
vi.resetModules();
|
||||||
|
vi.useRealTimers();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
test("uses live runtime config to enable auto-recall after startup disable", async () => {
|
test("uses live runtime config to enable auto-recall after startup disable", async () => {
|
||||||
const embeddingsCreate = vi.fn(async () => ({
|
const embeddingsCreate = vi.fn(async () => ({
|
||||||
data: [{ embedding: [0.1, 0.2, 0.3] }],
|
data: [{ embedding: [0.1, 0.2, 0.3] }],
|
||||||
|
|||||||
@@ -149,6 +149,7 @@ function resolveAutoCaptureStartIndex(
|
|||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
||||||
const TABLE_NAME = "memories";
|
const TABLE_NAME = "memories";
|
||||||
|
const DEFAULT_AUTO_RECALL_TIMEOUT_MS = 15_000;
|
||||||
|
|
||||||
class MemoryDB {
|
class MemoryDB {
|
||||||
private db: LanceDB.Connection | null = null;
|
private db: LanceDB.Connection | null = null;
|
||||||
@@ -262,7 +263,7 @@ class MemoryDB {
|
|||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
||||||
type Embeddings = {
|
type Embeddings = {
|
||||||
embed(text: string): Promise<number[]>;
|
embed(text: string, options?: { timeoutMs?: number }): Promise<number[]>;
|
||||||
};
|
};
|
||||||
|
|
||||||
class OpenAiCompatibleEmbeddings implements Embeddings {
|
class OpenAiCompatibleEmbeddings implements Embeddings {
|
||||||
@@ -277,7 +278,7 @@ class OpenAiCompatibleEmbeddings implements Embeddings {
|
|||||||
this.client = new OpenAI({ apiKey, baseURL: baseUrl });
|
this.client = new OpenAI({ apiKey, baseURL: baseUrl });
|
||||||
}
|
}
|
||||||
|
|
||||||
async embed(text: string): Promise<number[]> {
|
async embed(text: string, options?: { timeoutMs?: number }): Promise<number[]> {
|
||||||
const params: OpenAI.EmbeddingCreateParams = {
|
const params: OpenAI.EmbeddingCreateParams = {
|
||||||
model: this.model,
|
model: this.model,
|
||||||
input: text,
|
input: text,
|
||||||
@@ -292,6 +293,7 @@ class OpenAiCompatibleEmbeddings implements Embeddings {
|
|||||||
// transport and normalize the response ourselves.
|
// transport and normalize the response ourselves.
|
||||||
const response = await this.client.post<EmbeddingCreateResponse>("/embeddings", {
|
const response = await this.client.post<EmbeddingCreateResponse>("/embeddings", {
|
||||||
body: params,
|
body: params,
|
||||||
|
...(options?.timeoutMs ? { timeout: options.timeoutMs, maxRetries: 0 } : {}),
|
||||||
});
|
});
|
||||||
return normalizeEmbeddingVector(response.data?.[0]?.embedding);
|
return normalizeEmbeddingVector(response.data?.[0]?.embedding);
|
||||||
}
|
}
|
||||||
@@ -353,6 +355,32 @@ class ProviderAdapterEmbeddings implements Embeddings {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function runWithTimeout<T>(params: {
|
||||||
|
timeoutMs: number;
|
||||||
|
task: () => Promise<T>;
|
||||||
|
}): Promise<{ status: "ok"; value: T } | { status: "timeout" }> {
|
||||||
|
let timeout: ReturnType<typeof setTimeout> | undefined;
|
||||||
|
const TIMEOUT = Symbol("timeout");
|
||||||
|
const timeoutPromise = new Promise<typeof TIMEOUT>((resolve) => {
|
||||||
|
timeout = setTimeout(() => resolve(TIMEOUT), params.timeoutMs);
|
||||||
|
timeout.unref?.();
|
||||||
|
});
|
||||||
|
const taskPromise = params.task();
|
||||||
|
taskPromise.catch(() => undefined);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const result = await Promise.race([taskPromise, timeoutPromise]);
|
||||||
|
if (result === TIMEOUT) {
|
||||||
|
return { status: "timeout" };
|
||||||
|
}
|
||||||
|
return { status: "ok", value: result };
|
||||||
|
} finally {
|
||||||
|
if (timeout) {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function createEmbeddings(api: OpenClawPluginApi, cfg: MemoryConfig): Embeddings {
|
function createEmbeddings(api: OpenClawPluginApi, cfg: MemoryConfig): Embeddings {
|
||||||
const { provider, model, dimensions, apiKey, baseUrl } = cfg.embedding;
|
const { provider, model, dimensions, apiKey, baseUrl } = cfg.embedding;
|
||||||
if (provider === "openai" && apiKey) {
|
if (provider === "openai" && apiKey) {
|
||||||
@@ -818,8 +846,22 @@ export default definePluginEntry({
|
|||||||
event.prompt,
|
event.prompt,
|
||||||
currentCfg.recallMaxChars,
|
currentCfg.recallMaxChars,
|
||||||
);
|
);
|
||||||
const vector = await embeddings.embed(recallQuery);
|
const recall = await runWithTimeout({
|
||||||
const results = await db.search(vector, 3, 0.3);
|
timeoutMs: DEFAULT_AUTO_RECALL_TIMEOUT_MS,
|
||||||
|
task: async () => {
|
||||||
|
const vector = await embeddings.embed(recallQuery, {
|
||||||
|
timeoutMs: DEFAULT_AUTO_RECALL_TIMEOUT_MS,
|
||||||
|
});
|
||||||
|
return await db.search(vector, 3, 0.3);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
if (recall.status === "timeout") {
|
||||||
|
api.logger.warn?.(
|
||||||
|
`memory-lancedb: auto-recall timed out after ${DEFAULT_AUTO_RECALL_TIMEOUT_MS}ms; skipping memory injection to avoid stalling agent startup`,
|
||||||
|
);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
const results = recall.value;
|
||||||
|
|
||||||
if (results.length === 0) {
|
if (results.length === 0) {
|
||||||
return undefined;
|
return undefined;
|
||||||
|
|||||||
@@ -226,6 +226,44 @@ describe("model override pipeline wiring", () => {
|
|||||||
expectedPrependContext,
|
expectedPrependContext,
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("skips timed-out handlers and continues", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
try {
|
||||||
|
addBeforePromptBuildHook(
|
||||||
|
registry,
|
||||||
|
"slow-plugin",
|
||||||
|
() => new Promise<PluginHookBeforePromptBuildResult>(() => undefined),
|
||||||
|
10,
|
||||||
|
);
|
||||||
|
addBeforePromptBuildHook(registry, "fast-plugin", () => ({ prependContext: "fast" }), 1);
|
||||||
|
const logger = {
|
||||||
|
error: vi.fn(),
|
||||||
|
warn: vi.fn(),
|
||||||
|
info: vi.fn(),
|
||||||
|
debug: vi.fn(),
|
||||||
|
};
|
||||||
|
const runner = createHookRunner(registry, {
|
||||||
|
logger,
|
||||||
|
modifyingHookTimeoutMsByHook: { before_prompt_build: 5 },
|
||||||
|
});
|
||||||
|
|
||||||
|
const resultPromise = runner.runBeforePromptBuild(
|
||||||
|
{ prompt: "test", messages: [] },
|
||||||
|
stubCtx,
|
||||||
|
);
|
||||||
|
await vi.advanceTimersByTimeAsync(5);
|
||||||
|
|
||||||
|
await expect(resultPromise).resolves.toEqual({ prependContext: "fast" });
|
||||||
|
expect(logger.error).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining(
|
||||||
|
"[hooks] before_prompt_build handler from slow-plugin failed: timed out after 5ms",
|
||||||
|
),
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
vi.useRealTimers();
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("graceful degradation + hook detection", () => {
|
describe("graceful degradation + hook detection", () => {
|
||||||
|
|||||||
@@ -160,11 +160,19 @@ export type HookRunnerOptions = {
|
|||||||
* the runner continues, but the plugin's underlying work is not cancelled.
|
* the runner continues, but the plugin's underlying work is not cancelled.
|
||||||
*/
|
*/
|
||||||
voidHookTimeoutMsByHook?: Partial<Record<PluginHookName, number>>;
|
voidHookTimeoutMsByHook?: Partial<Record<PluginHookName, number>>;
|
||||||
|
/**
|
||||||
|
* Optional timeout for modifying hooks. A timed-out hook is logged and skipped,
|
||||||
|
* but the plugin's underlying work is not cancelled.
|
||||||
|
*/
|
||||||
|
modifyingHookTimeoutMsByHook?: Partial<Record<PluginHookName, number>>;
|
||||||
};
|
};
|
||||||
|
|
||||||
const DEFAULT_VOID_HOOK_TIMEOUT_MS_BY_HOOK: Partial<Record<PluginHookName, number>> = {
|
const DEFAULT_VOID_HOOK_TIMEOUT_MS_BY_HOOK: Partial<Record<PluginHookName, number>> = {
|
||||||
agent_end: 30_000,
|
agent_end: 30_000,
|
||||||
};
|
};
|
||||||
|
const DEFAULT_MODIFYING_HOOK_TIMEOUT_MS_BY_HOOK: Partial<Record<PluginHookName, number>> = {
|
||||||
|
before_prompt_build: 15_000,
|
||||||
|
};
|
||||||
|
|
||||||
type ModifyingHookPolicy<K extends PluginHookName, TResult> = {
|
type ModifyingHookPolicy<K extends PluginHookName, TResult> = {
|
||||||
mergeResults?: (
|
mergeResults?: (
|
||||||
@@ -236,6 +244,10 @@ export function createHookRunner(
|
|||||||
...DEFAULT_VOID_HOOK_TIMEOUT_MS_BY_HOOK,
|
...DEFAULT_VOID_HOOK_TIMEOUT_MS_BY_HOOK,
|
||||||
...options.voidHookTimeoutMsByHook,
|
...options.voidHookTimeoutMsByHook,
|
||||||
};
|
};
|
||||||
|
const modifyingHookTimeoutMsByHook = {
|
||||||
|
...DEFAULT_MODIFYING_HOOK_TIMEOUT_MS_BY_HOOK,
|
||||||
|
...options.modifyingHookTimeoutMsByHook,
|
||||||
|
};
|
||||||
|
|
||||||
const shouldCatchHookErrors = (hookName: PluginHookName): boolean =>
|
const shouldCatchHookErrors = (hookName: PluginHookName): boolean =>
|
||||||
catchErrors && (failurePolicyByHook[hookName] ?? "fail-open") === "fail-open";
|
catchErrors && (failurePolicyByHook[hookName] ?? "fail-open") === "fail-open";
|
||||||
@@ -385,13 +397,27 @@ export function createHookRunner(
|
|||||||
return Math.floor(timeoutMs);
|
return Math.floor(timeoutMs);
|
||||||
};
|
};
|
||||||
|
|
||||||
const withVoidHookTimeout = async <T>(promise: Promise<T>, timeoutMs: number): Promise<T> => {
|
const getModifyingHookTimeoutMs = (hookName: PluginHookName): number | undefined => {
|
||||||
|
const timeoutMs = modifyingHookTimeoutMsByHook[hookName];
|
||||||
|
if (typeof timeoutMs !== "number" || !Number.isFinite(timeoutMs) || timeoutMs <= 0) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
return Math.floor(timeoutMs);
|
||||||
|
};
|
||||||
|
|
||||||
|
const withHookTimeout = async <T>(
|
||||||
|
promise: Promise<T>,
|
||||||
|
timeoutMs: number,
|
||||||
|
options: { unref?: boolean } = {},
|
||||||
|
): Promise<T> => {
|
||||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||||
const timeout = new Promise<never>((_, reject) => {
|
const timeout = new Promise<never>((_, reject) => {
|
||||||
timer = setTimeout(() => {
|
timer = setTimeout(() => {
|
||||||
reject(new Error(`timed out after ${timeoutMs}ms`));
|
reject(new Error(`timed out after ${timeoutMs}ms`));
|
||||||
}, timeoutMs);
|
}, timeoutMs);
|
||||||
|
if (options.unref) {
|
||||||
timer.unref?.();
|
timer.unref?.();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -435,7 +461,7 @@ export function createHookRunner(
|
|||||||
);
|
);
|
||||||
const timeoutMs = getVoidHookTimeoutMs(hookName);
|
const timeoutMs = getVoidHookTimeoutMs(hookName);
|
||||||
if (timeoutMs) {
|
if (timeoutMs) {
|
||||||
await withVoidHookTimeout(promise, timeoutMs);
|
await withHookTimeout(promise, timeoutMs, { unref: true });
|
||||||
} else {
|
} else {
|
||||||
await promise;
|
await promise;
|
||||||
}
|
}
|
||||||
@@ -468,9 +494,10 @@ export function createHookRunner(
|
|||||||
|
|
||||||
for (const hook of hooks) {
|
for (const hook of hooks) {
|
||||||
try {
|
try {
|
||||||
const handlerResult = await (
|
const handler = hook.handler as (event: unknown, ctx: unknown) => Promise<TResult>;
|
||||||
hook.handler as (event: unknown, ctx: unknown) => Promise<TResult>
|
const promise = Promise.resolve(handler(event, ctx));
|
||||||
)(event, ctx);
|
const timeoutMs = getModifyingHookTimeoutMs(hookName);
|
||||||
|
const handlerResult = timeoutMs ? await withHookTimeout(promise, timeoutMs) : await promise;
|
||||||
|
|
||||||
if (handlerResult !== undefined && handlerResult !== null) {
|
if (handlerResult !== undefined && handlerResult !== null) {
|
||||||
if (policy.mergeResults) {
|
if (policy.mergeResults) {
|
||||||
|
|||||||
Reference in New Issue
Block a user