mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:20:43 +00:00
fix(agents): clamp process poll waits
This commit is contained in:
@@ -30,13 +30,14 @@ async function pollSession(
|
||||
callId: string,
|
||||
sessionId: string,
|
||||
timeout?: number | string,
|
||||
signal?: AbortSignal,
|
||||
) {
|
||||
const args = {
|
||||
action: "poll",
|
||||
sessionId,
|
||||
...(timeout === undefined ? {} : { timeout }),
|
||||
} as unknown as Parameters<ReturnType<typeof createProcessTool>["execute"]>[1];
|
||||
return processTool.execute(callId, args);
|
||||
return processTool.execute(callId, args, signal);
|
||||
}
|
||||
|
||||
function retryMs(result: Awaited<ReturnType<ReturnType<typeof createProcessTool>["execute"]>>) {
|
||||
@@ -102,6 +103,50 @@ test("process poll accepts string timeout values", async () => {
|
||||
});
|
||||
});
|
||||
|
||||
test("process poll clamps long waits to 30 seconds", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { processTool } = createProcessSessionHarness("sess-clamp");
|
||||
|
||||
const pollPromise = pollSession(processTool, "toolcall", "sess-clamp", 120_000);
|
||||
let resolved = false;
|
||||
void pollPromise.finally(() => {
|
||||
resolved = true;
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(29_999);
|
||||
expect(resolved).toBe(false);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
const poll = await pollPromise;
|
||||
expect(pollStatus(poll)).toBe("running");
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
test("process poll aborts while waiting for completion", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { processTool } = createProcessSessionHarness("sess-abort");
|
||||
const controller = new AbortController();
|
||||
|
||||
const pollPromise = pollSession(
|
||||
processTool,
|
||||
"toolcall",
|
||||
"sess-abort",
|
||||
30_000,
|
||||
controller.signal,
|
||||
);
|
||||
await vi.advanceTimersByTimeAsync(500);
|
||||
controller.abort();
|
||||
|
||||
await expect(pollPromise).rejects.toMatchObject({ name: "AbortError" });
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
test("process poll exposes adaptive retryInMs for repeated no-output polls", async () => {
|
||||
const sessionId = "sess-retry";
|
||||
const { processTool } = createProcessSessionHarness(sessionId);
|
||||
|
||||
@@ -49,7 +49,7 @@ function defaultTailNote(totalLines: number, usingDefaultTail: boolean) {
|
||||
return `\n\n[showing last ${DEFAULT_LOG_TAIL_LINES} of ${totalLines} lines; pass offset/limit to page]`;
|
||||
}
|
||||
|
||||
const MAX_POLL_WAIT_MS = 120_000;
|
||||
const MAX_POLL_WAIT_MS = 30_000;
|
||||
|
||||
function resolvePollWaitMs(value: unknown) {
|
||||
if (typeof value === "number" && Number.isFinite(value)) {
|
||||
@@ -94,6 +94,44 @@ function resetPollRetrySuggestion(sessionId: string): void {
|
||||
}
|
||||
}
|
||||
|
||||
function createAbortError(reason: unknown): Error {
|
||||
if (reason instanceof Error) {
|
||||
return reason;
|
||||
}
|
||||
const error = new Error(typeof reason === "string" ? reason : "Aborted");
|
||||
error.name = "AbortError";
|
||||
return error;
|
||||
}
|
||||
|
||||
async function sleepPollInterval(ms: number, signal?: AbortSignal): Promise<void> {
|
||||
if (signal?.aborted) {
|
||||
throw createAbortError(signal.reason);
|
||||
}
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||
let onAbort: (() => void) | undefined;
|
||||
const cleanup = () => {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
if (onAbort) {
|
||||
signal?.removeEventListener("abort", onAbort);
|
||||
}
|
||||
};
|
||||
const onResolve = () => {
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
onAbort = () => {
|
||||
cleanup();
|
||||
reject(createAbortError(signal?.reason));
|
||||
};
|
||||
timer = setTimeout(onResolve, ms);
|
||||
timer.unref?.();
|
||||
signal?.addEventListener("abort", onAbort, { once: true });
|
||||
});
|
||||
}
|
||||
|
||||
export function createProcessTool(
|
||||
defaults?: ProcessToolDefaults,
|
||||
): AgentToolWithMeta<typeof processSchema, unknown> {
|
||||
@@ -129,7 +167,7 @@ export function createProcessTool(
|
||||
displaySummary: PROCESS_TOOL_DISPLAY_SUMMARY,
|
||||
description: describeProcessTool({ hasCronTool: defaults?.hasCronTool === true }),
|
||||
parameters: processSchema,
|
||||
execute: async (_toolCallId, args, _signal, _onUpdate): Promise<AgentToolResult<unknown>> => {
|
||||
execute: async (_toolCallId, args, signal, _onUpdate): Promise<AgentToolResult<unknown>> => {
|
||||
const params = args as {
|
||||
action:
|
||||
| "list"
|
||||
@@ -307,9 +345,7 @@ export function createProcessTool(
|
||||
if (pollWaitMs > 0 && !scopedSession.exited) {
|
||||
const deadline = Date.now() + pollWaitMs;
|
||||
while (!scopedSession.exited && Date.now() < deadline) {
|
||||
await new Promise((resolve) =>
|
||||
setTimeout(resolve, Math.max(0, Math.min(250, deadline - Date.now()))),
|
||||
);
|
||||
await sleepPollInterval(Math.max(0, Math.min(250, deadline - Date.now())), signal);
|
||||
}
|
||||
}
|
||||
const { stdout, stderr } = drainSession(scopedSession);
|
||||
|
||||
Reference in New Issue
Block a user