mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:20:43 +00:00
fix(active-memory): fast-fail stalled recall paths (#76183)
Summary: - This PR adds Active Memory transcript polling to fast-fail terminal zero-hit or unavailable recall tool results, filters timeout boilerplate, extends focused regressions, and adds a changelog fix entry. - Reproducibility: yes. The PR includes focused regressions that reproduce terminal zero-hit search, unavailab ... rch, non-empty `details.results` with `debug.hits: 0`, memory_get misses, and timeout boilerplate behavior. ClawSweeper fixups: - Included follow-up commit: fix(active-memory): fast-fail stalled recall paths - Included follow-up commit: fix(clawsweeper): address review for automerge-openclaw-openclaw-7576… - Included follow-up commit: fix(clawsweeper): reconcile automerge-openclaw-openclaw-75761 with ma… - Ran the ClawSweeper repair loop before final review. Validation: - ClawSweeper review passed for heade5ea3f1a7a. - Required merge gates passed before the squash merge. Prepared head SHA:e5ea3f1a7aReview: https://github.com/openclaw/openclaw/pull/76183#issuecomment-4364369591 Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com> Co-authored-by: codexGW <9350182+codexGW@users.noreply.github.com>
This commit is contained in:
@@ -1789,6 +1789,50 @@ describe("active-memory plugin", () => {
|
||||
expect(lines.some((line) => line.includes("timeout_partial"))).toBe(false);
|
||||
});
|
||||
|
||||
it("does not inject embedded timeout boilerplate from partial transcripts", async () => {
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
__testing.setSetupGraceTimeoutMsForTests(0);
|
||||
api.pluginConfig = {
|
||||
agents: ["main"],
|
||||
timeoutMs: 1,
|
||||
logging: true,
|
||||
};
|
||||
plugin.register(api as unknown as OpenClawPluginApi);
|
||||
const sessionKey = "agent:main:timeout-boilerplate-transcript";
|
||||
hoisted.sessionStore[sessionKey] = {
|
||||
sessionId: "s-timeout-boilerplate-transcript",
|
||||
updatedAt: 0,
|
||||
};
|
||||
runEmbeddedPiAgent.mockImplementationOnce(async (params: { sessionFile: string }) => {
|
||||
await writeTranscriptJsonl(params.sessionFile, [
|
||||
{
|
||||
type: "message",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: "LLM request timed out after 15000 ms.",
|
||||
},
|
||||
},
|
||||
]);
|
||||
await new Promise<never>(() => {});
|
||||
});
|
||||
|
||||
const result = await hooks.before_prompt_build(
|
||||
{ prompt: "what wings should i order? timeout boilerplate", messages: [] },
|
||||
{
|
||||
agentId: "main",
|
||||
trigger: "user",
|
||||
sessionKey,
|
||||
messageProvider: "webchat",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
const lines = getActiveMemoryLines(sessionKey);
|
||||
expect(lines).toEqual([expect.stringContaining("🧩 Active Memory: status=timeout")]);
|
||||
expect(lines.some((line) => line.includes("timeout_partial"))).toBe(false);
|
||||
expect(lines.some((line) => line.includes("LLM request timed out"))).toBe(false);
|
||||
});
|
||||
|
||||
it("returns partial transcript text when an aborted subagent rejects before the race timeout wins", async () => {
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
api.pluginConfig = {
|
||||
@@ -2258,6 +2302,171 @@ describe("active-memory plugin", () => {
|
||||
expect(wallClockMs).toBeLessThan(CONFIGURED_TIMEOUT_MS + HARD_DEADLINE_MARGIN_MS);
|
||||
});
|
||||
|
||||
it("fast-fails terminal zero-hit memory_search results without waiting for recall timeout", async () => {
|
||||
const CONFIGURED_TIMEOUT_MS = 1_000;
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
__testing.setSetupGraceTimeoutMsForTests(0);
|
||||
api.pluginConfig = {
|
||||
agents: ["main"],
|
||||
timeoutMs: CONFIGURED_TIMEOUT_MS,
|
||||
logging: true,
|
||||
};
|
||||
plugin.register(api as unknown as OpenClawPluginApi);
|
||||
const sessionKey = "agent:main:terminal-zero-hit";
|
||||
hoisted.sessionStore[sessionKey] = { sessionId: "s-terminal-zero-hit", updatedAt: 0 };
|
||||
runEmbeddedPiAgent.mockImplementationOnce(async (params: { sessionFile: string }) => {
|
||||
await writeTranscriptJsonl(params.sessionFile, [
|
||||
{
|
||||
message: {
|
||||
role: "toolResult",
|
||||
toolName: "memory_search",
|
||||
details: { results: [], debug: { backend: "qmd", hits: 0, searchMs: 8 } },
|
||||
},
|
||||
},
|
||||
]);
|
||||
await new Promise<never>(() => {});
|
||||
});
|
||||
|
||||
const startedAt = Date.now();
|
||||
const result = await hooks.before_prompt_build(
|
||||
{ prompt: "what food do i usually order? zero hit", messages: [] },
|
||||
{ agentId: "main", trigger: "user", sessionKey, messageProvider: "webchat" },
|
||||
);
|
||||
const wallClockMs = Date.now() - startedAt;
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
expect(wallClockMs).toBeLessThan(CONFIGURED_TIMEOUT_MS);
|
||||
expect(getActiveMemoryLines(sessionKey)).toEqual([
|
||||
expect.stringContaining("🧩 Active Memory: status=empty"),
|
||||
expect.stringContaining("🔎 Active Memory Debug: backend=qmd searchMs=8 hits=0"),
|
||||
]);
|
||||
});
|
||||
|
||||
it("does not fast-fail memory_search results solely because debug hits is zero", async () => {
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
__testing.setSetupGraceTimeoutMsForTests(0);
|
||||
api.pluginConfig = {
|
||||
agents: ["main"],
|
||||
timeoutMs: 500,
|
||||
logging: true,
|
||||
};
|
||||
plugin.register(api as unknown as OpenClawPluginApi);
|
||||
const sessionKey = "agent:main:terminal-zero-hit-with-results";
|
||||
hoisted.sessionStore[sessionKey] = {
|
||||
sessionId: "s-terminal-zero-hit-with-results",
|
||||
updatedAt: 0,
|
||||
};
|
||||
runEmbeddedPiAgent.mockImplementationOnce(async (params: { sessionFile: string }) => {
|
||||
await writeTranscriptJsonl(params.sessionFile, [
|
||||
{
|
||||
message: {
|
||||
role: "toolResult",
|
||||
toolName: "memory_search",
|
||||
details: {
|
||||
results: [{ path: "memory/food.md", text: "User usually orders ramen." }],
|
||||
debug: { backend: "qmd", hits: 0, searchMs: 8 },
|
||||
},
|
||||
},
|
||||
},
|
||||
]);
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
return { payloads: [{ text: "User usually orders ramen." }] };
|
||||
});
|
||||
|
||||
const result = await hooks.before_prompt_build(
|
||||
{ prompt: "what food do i usually order? zero hit with results", messages: [] },
|
||||
{ agentId: "main", trigger: "user", sessionKey, messageProvider: "webchat" },
|
||||
);
|
||||
|
||||
expect(result?.prependContext).toContain("User usually orders ramen.");
|
||||
expect(getActiveMemoryLines(sessionKey)).toEqual([
|
||||
expect.stringContaining("🧩 Active Memory: status=ok"),
|
||||
expect.stringContaining("🔎 Active Memory Debug: backend=qmd searchMs=8 hits=0"),
|
||||
]);
|
||||
});
|
||||
|
||||
it("fast-fails unavailable memory_search results without injecting provider errors", async () => {
|
||||
const CONFIGURED_TIMEOUT_MS = 1_000;
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
__testing.setSetupGraceTimeoutMsForTests(0);
|
||||
api.pluginConfig = {
|
||||
agents: ["main"],
|
||||
timeoutMs: CONFIGURED_TIMEOUT_MS,
|
||||
logging: true,
|
||||
};
|
||||
plugin.register(api as unknown as OpenClawPluginApi);
|
||||
const sessionKey = "agent:main:terminal-unavailable";
|
||||
hoisted.sessionStore[sessionKey] = { sessionId: "s-terminal-unavailable", updatedAt: 0 };
|
||||
runEmbeddedPiAgent.mockImplementationOnce(async (params: { sessionFile: string }) => {
|
||||
await writeTranscriptJsonl(params.sessionFile, [
|
||||
{
|
||||
message: {
|
||||
role: "toolResult",
|
||||
toolName: "memory_search",
|
||||
details: {
|
||||
disabled: true,
|
||||
warning: "Memory search is unavailable due to an embedding/provider error.",
|
||||
action: "Check the embedding provider configuration, then retry memory_search.",
|
||||
error: "embedding request failed",
|
||||
},
|
||||
},
|
||||
},
|
||||
]);
|
||||
await new Promise<never>(() => {});
|
||||
});
|
||||
|
||||
const startedAt = Date.now();
|
||||
const result = await hooks.before_prompt_build(
|
||||
{ prompt: "what food do i usually order? unavailable", messages: [] },
|
||||
{ agentId: "main", trigger: "user", sessionKey, messageProvider: "webchat" },
|
||||
);
|
||||
const wallClockMs = Date.now() - startedAt;
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
expect(wallClockMs).toBeLessThan(CONFIGURED_TIMEOUT_MS);
|
||||
expect(getActiveMemoryLines(sessionKey)).toEqual([
|
||||
expect.stringContaining("🧩 Active Memory: status=empty"),
|
||||
expect.stringContaining(
|
||||
"🔎 Active Memory Debug: Memory search is unavailable due to an embedding/provider error. Check the embedding provider configuration, then retry memory_search.",
|
||||
),
|
||||
]);
|
||||
});
|
||||
|
||||
it("does not treat memory_get misses as terminal recall results", async () => {
|
||||
__testing.setMinimumTimeoutMsForTests(1);
|
||||
__testing.setSetupGraceTimeoutMsForTests(0);
|
||||
api.pluginConfig = {
|
||||
agents: ["main"],
|
||||
timeoutMs: 500,
|
||||
};
|
||||
plugin.register(api as unknown as OpenClawPluginApi);
|
||||
runEmbeddedPiAgent.mockImplementationOnce(async (params: { sessionFile: string }) => {
|
||||
await writeTranscriptJsonl(params.sessionFile, [
|
||||
{
|
||||
message: {
|
||||
role: "toolResult",
|
||||
toolName: "memory_get",
|
||||
details: { path: "memory/missing.md", text: "", disabled: true, error: "not found" },
|
||||
},
|
||||
},
|
||||
]);
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
return { payloads: [{ text: "User usually orders ramen after late flights." }] };
|
||||
});
|
||||
|
||||
const result = await hooks.before_prompt_build(
|
||||
{ prompt: "what food do i usually order? memory get miss", messages: [] },
|
||||
{
|
||||
agentId: "main",
|
||||
trigger: "user",
|
||||
sessionKey: "agent:main:memory-get-miss",
|
||||
messageProvider: "webchat",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result?.prependContext).toContain("User usually orders ramen after late flights.");
|
||||
});
|
||||
|
||||
it("returns undefined instead of throwing when an unexpected error escapes prompt building", async () => {
|
||||
const result = await hooks.before_prompt_build(
|
||||
{ prompt: "what should i eat? escape test", messages: undefined as never },
|
||||
|
||||
@@ -46,6 +46,7 @@ const DEFAULT_PARTIAL_TRANSCRIPT_MAX_CHARS = 32_000;
|
||||
const DEFAULT_TRANSCRIPT_READ_MAX_LINES = 2_000;
|
||||
const DEFAULT_TRANSCRIPT_READ_MAX_BYTES = 50 * 1024 * 1024;
|
||||
const TIMEOUT_PARTIAL_DATA_GRACE_MS = 50;
|
||||
const TERMINAL_MEMORY_SEARCH_POLL_INTERVAL_MS = 25;
|
||||
|
||||
const NO_RECALL_VALUES = new Set([
|
||||
"",
|
||||
@@ -56,12 +57,21 @@ const NO_RECALL_VALUES = new Set([
|
||||
"no relevant memory",
|
||||
"no relevant memories",
|
||||
"timeout",
|
||||
"timed out",
|
||||
"request timed out",
|
||||
"llm request timed out",
|
||||
"the llm request timed out",
|
||||
"[]",
|
||||
"{}",
|
||||
"null",
|
||||
"n/a",
|
||||
]);
|
||||
|
||||
const TIMEOUT_BOILERPLATE_PATTERNS = [
|
||||
/^(?:error:\s*)?(?:the\s+)?(?:llm|model|request|operation|agent)\s+(?:request\s+)?timed out\b/i,
|
||||
/^(?:error:\s*)?active-memory timeout after \d+ms\b/i,
|
||||
];
|
||||
|
||||
const RECALLED_CONTEXT_LINE_PATTERNS = [
|
||||
/^🧩\s*active memory:/i,
|
||||
/^🔎\s*active memory debug:/i,
|
||||
@@ -209,6 +219,16 @@ type RecallSubagentResult = {
|
||||
searchDebug?: ActiveMemorySearchDebug;
|
||||
};
|
||||
|
||||
type TerminalMemorySearchResult = {
|
||||
status: "empty";
|
||||
searchDebug?: ActiveMemorySearchDebug;
|
||||
};
|
||||
|
||||
type TerminalMemorySearchWatch = {
|
||||
promise: Promise<TerminalMemorySearchResult>;
|
||||
stop: () => void;
|
||||
};
|
||||
|
||||
type CachedActiveRecallResult = {
|
||||
expiresAt: number;
|
||||
result: ActiveRecallResult;
|
||||
@@ -1549,6 +1569,41 @@ function extractActiveMemorySearchDebugFromSessionRecord(
|
||||
};
|
||||
}
|
||||
|
||||
function extractTerminalMemorySearchResultFromSessionRecord(
|
||||
value: unknown,
|
||||
): TerminalMemorySearchResult | undefined {
|
||||
const record = asRecord(value);
|
||||
const nestedMessage = asRecord(record?.message);
|
||||
const topLevelMessage =
|
||||
record?.role === "toolResult" ||
|
||||
record?.toolName === "memory_search" ||
|
||||
record?.toolName === "memory_recall"
|
||||
? record
|
||||
: undefined;
|
||||
const message = nestedMessage ?? topLevelMessage;
|
||||
if (!message) {
|
||||
return undefined;
|
||||
}
|
||||
const role = normalizeOptionalString(message.role);
|
||||
const toolName = normalizeOptionalString(message.toolName);
|
||||
if (role !== "toolResult" || (toolName !== "memory_search" && toolName !== "memory_recall")) {
|
||||
return undefined;
|
||||
}
|
||||
const details = asRecord(message.details);
|
||||
const debug = extractActiveMemorySearchDebugFromSessionRecord(value);
|
||||
const results = Array.isArray(details?.results) ? details.results : undefined;
|
||||
const disabled = details?.disabled === true;
|
||||
const unavailable =
|
||||
disabled || Boolean(debug?.warning) || Boolean(debug?.error) || Boolean(details?.error);
|
||||
const debugHits =
|
||||
typeof debug?.hits === "number" && Number.isFinite(debug.hits) ? debug.hits : undefined;
|
||||
const zeroHitSearch = results !== undefined ? results.length === 0 : debugHits === 0;
|
||||
if (unavailable || zeroHitSearch) {
|
||||
return { status: "empty", searchDebug: debug };
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async function readActiveMemorySearchDebug(
|
||||
sessionFile: string,
|
||||
limits?: TranscriptReadLimits,
|
||||
@@ -1567,6 +1622,93 @@ async function readActiveMemorySearchDebug(
|
||||
return found;
|
||||
}
|
||||
|
||||
async function readTerminalMemorySearchResult(
|
||||
sessionFile: string,
|
||||
limits?: TranscriptReadLimits,
|
||||
): Promise<TerminalMemorySearchResult | undefined> {
|
||||
let found: TerminalMemorySearchResult | undefined;
|
||||
await streamBoundedTranscriptJsonl({
|
||||
sessionFile,
|
||||
limits,
|
||||
onRecord: (record) => {
|
||||
const result = extractTerminalMemorySearchResultFromSessionRecord(record);
|
||||
if (result) {
|
||||
found = result;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
},
|
||||
});
|
||||
return found;
|
||||
}
|
||||
|
||||
function watchTerminalMemorySearchResult(params: {
|
||||
getSessionFile: () => string | undefined;
|
||||
abortSignal: AbortSignal;
|
||||
}): TerminalMemorySearchWatch {
|
||||
let stopped = false;
|
||||
let timeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
let inFlight = false;
|
||||
let resolveWatch: (result: TerminalMemorySearchResult) => void = () => {};
|
||||
const stop = () => {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
stopped = true;
|
||||
if (timeoutId) {
|
||||
clearTimeout(timeoutId);
|
||||
timeoutId = undefined;
|
||||
}
|
||||
params.abortSignal.removeEventListener("abort", onAbort);
|
||||
};
|
||||
const finish = (result: TerminalMemorySearchResult) => {
|
||||
stop();
|
||||
resolveWatch(result);
|
||||
};
|
||||
const schedule = () => {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
timeoutId = setTimeout(tick, TERMINAL_MEMORY_SEARCH_POLL_INTERVAL_MS);
|
||||
timeoutId.unref?.();
|
||||
};
|
||||
const tick = async () => {
|
||||
if (stopped || inFlight) {
|
||||
return;
|
||||
}
|
||||
if (params.abortSignal.aborted) {
|
||||
stop();
|
||||
return;
|
||||
}
|
||||
inFlight = true;
|
||||
try {
|
||||
const sessionFile = params.getSessionFile();
|
||||
const result = sessionFile ? await readTerminalMemorySearchResult(sessionFile) : undefined;
|
||||
if (result) {
|
||||
finish(result);
|
||||
return;
|
||||
}
|
||||
} catch {
|
||||
// Transcript polling is opportunistic; normal timeout handling remains authoritative.
|
||||
} finally {
|
||||
inFlight = false;
|
||||
}
|
||||
schedule();
|
||||
};
|
||||
function onAbort() {
|
||||
stop();
|
||||
}
|
||||
const promise = new Promise<TerminalMemorySearchResult>((resolve) => {
|
||||
resolveWatch = resolve;
|
||||
params.abortSignal.addEventListener("abort", onAbort, { once: true });
|
||||
void tick();
|
||||
});
|
||||
return {
|
||||
promise,
|
||||
stop,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeSearchDebug(value: unknown): ActiveMemorySearchDebug | undefined {
|
||||
const debug = asRecord(value);
|
||||
if (!debug) {
|
||||
@@ -1777,13 +1919,21 @@ function normalizeNoRecallValue(value: string): boolean {
|
||||
return NO_RECALL_VALUES.has(value.trim().toLowerCase());
|
||||
}
|
||||
|
||||
function isTimeoutBoilerplateSummary(value: string): boolean {
|
||||
return TIMEOUT_BOILERPLATE_PATTERNS.some((pattern) => pattern.test(value));
|
||||
}
|
||||
|
||||
function normalizeActiveSummary(rawReply: string): string | null {
|
||||
const trimmed = rawReply.trim();
|
||||
if (normalizeNoRecallValue(trimmed)) {
|
||||
return null;
|
||||
}
|
||||
const singleLine = trimmed.replace(/\s+/g, " ").trim();
|
||||
if (!singleLine || normalizeNoRecallValue(singleLine)) {
|
||||
if (
|
||||
!singleLine ||
|
||||
normalizeNoRecallValue(singleLine) ||
|
||||
isTimeoutBoilerplateSummary(singleLine)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
return singleLine;
|
||||
@@ -2299,6 +2449,7 @@ async function maybeResolveActiveRecall(params: {
|
||||
);
|
||||
});
|
||||
|
||||
let terminalMemorySearchWatch: TerminalMemorySearchWatch | undefined;
|
||||
try {
|
||||
const subagentPromise = runRecallSubagent({
|
||||
...params,
|
||||
@@ -2308,11 +2459,20 @@ async function maybeResolveActiveRecall(params: {
|
||||
sessionFile = value;
|
||||
},
|
||||
});
|
||||
terminalMemorySearchWatch = watchTerminalMemorySearchResult({
|
||||
getSessionFile: () => sessionFile,
|
||||
abortSignal: controller.signal,
|
||||
});
|
||||
// Silently catch late rejections after timeout so they don't become
|
||||
// unhandled promise rejections.
|
||||
subagentPromise.catch(() => undefined);
|
||||
|
||||
const raceResult = await Promise.race([subagentPromise, timeoutPromise]);
|
||||
const raceResult = await Promise.race([
|
||||
subagentPromise,
|
||||
timeoutPromise,
|
||||
terminalMemorySearchWatch.promise,
|
||||
]);
|
||||
terminalMemorySearchWatch.stop();
|
||||
|
||||
if (raceResult === TIMEOUT_SENTINEL) {
|
||||
const result = await buildTimeoutRecallResult({
|
||||
@@ -2338,6 +2498,33 @@ async function maybeResolveActiveRecall(params: {
|
||||
return result;
|
||||
}
|
||||
|
||||
if ("status" in raceResult) {
|
||||
controller.abort(new Error("active-memory terminal memory search result"));
|
||||
const result: ActiveRecallResult = {
|
||||
status: raceResult.status,
|
||||
elapsedMs: Date.now() - startedAt,
|
||||
summary: null,
|
||||
searchDebug: raceResult.searchDebug,
|
||||
};
|
||||
if (params.config.logging) {
|
||||
params.api.logger.info?.(
|
||||
`${logPrefix} done status=${result.status} elapsedMs=${String(result.elapsedMs)} summaryChars=0`,
|
||||
);
|
||||
}
|
||||
await persistPluginStatusLines({
|
||||
api: params.api,
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
statusLine: buildPluginStatusLine({ result, config: params.config }),
|
||||
searchDebug: result.searchDebug,
|
||||
});
|
||||
if (shouldCacheResult(result)) {
|
||||
setCachedResult(cacheKey, result, params.config.cacheTtlMs);
|
||||
}
|
||||
resetCircuitBreaker(cbKey);
|
||||
return result;
|
||||
}
|
||||
|
||||
const { rawReply, transcriptPath, searchDebug } = raceResult;
|
||||
const summary = truncateSummary(
|
||||
normalizeActiveSummary(rawReply) ?? "",
|
||||
@@ -2423,6 +2610,7 @@ async function maybeResolveActiveRecall(params: {
|
||||
});
|
||||
return result;
|
||||
} finally {
|
||||
terminalMemorySearchWatch?.stop();
|
||||
clearTimeout(timeoutId);
|
||||
}
|
||||
}
|
||||
@@ -2649,7 +2837,7 @@ export default definePluginEntry({
|
||||
},
|
||||
});
|
||||
|
||||
export const __testing = {
|
||||
const testing = {
|
||||
buildCacheKey,
|
||||
buildCircuitBreakerKey,
|
||||
buildMetadata,
|
||||
@@ -2679,3 +2867,5 @@ export const __testing = {
|
||||
return timeoutCircuitBreaker.get(key);
|
||||
},
|
||||
};
|
||||
|
||||
export { testing as __testing };
|
||||
|
||||
Reference in New Issue
Block a user