mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-07 15:20:42 +00:00
fix(auto-reply): bound pending tool result drain
This commit is contained in:
@@ -15,6 +15,7 @@ import {
|
||||
} from "../../config/sessions.js";
|
||||
import type { TypingMode } from "../../config/types.js";
|
||||
import { resolveSessionTranscriptCandidates } from "../../gateway/session-utils.fs.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { emitAgentEvent } from "../../infra/agent-events.js";
|
||||
import { emitTrustedDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
|
||||
import {
|
||||
@@ -60,6 +61,7 @@ import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-repl
|
||||
import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js";
|
||||
import { createFollowupRunner } from "./followup-runner.js";
|
||||
import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-routing.js";
|
||||
import { drainPendingToolTasks } from "./pending-tool-task-drain.js";
|
||||
import { readPostCompactionContext } from "./post-compaction-context.js";
|
||||
import { resolveActiveRunQueueAction } from "./queue-policy.js";
|
||||
import {
|
||||
@@ -1230,7 +1232,10 @@ export async function runReplyAgent(params: {
|
||||
blockReplyPipeline.stop();
|
||||
}
|
||||
if (pendingToolTasks.size > 0) {
|
||||
await Promise.allSettled(pendingToolTasks);
|
||||
await drainPendingToolTasks({
|
||||
tasks: pendingToolTasks,
|
||||
onTimeout: logVerbose,
|
||||
});
|
||||
}
|
||||
|
||||
const usage = runResult.meta?.agentMeta?.usage;
|
||||
|
||||
92
src/auto-reply/reply/pending-tool-task-drain.test.ts
Normal file
92
src/auto-reply/reply/pending-tool-task-drain.test.ts
Normal file
@@ -0,0 +1,92 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { drainPendingToolTasks } from "./pending-tool-task-drain.js";
|
||||
|
||||
function deferredTask() {
|
||||
let resolve!: () => void;
|
||||
let reject!: (error: Error) => void;
|
||||
const promise = new Promise<void>((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
async function flushPromises() {
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
}
|
||||
|
||||
describe("drainPendingToolTasks", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("settles immediately when there are no pending tasks", async () => {
|
||||
await expect(drainPendingToolTasks({ tasks: new Set() })).resolves.toEqual({
|
||||
kind: "settled",
|
||||
});
|
||||
});
|
||||
|
||||
it("waits for all pending tasks to settle", async () => {
|
||||
const first = deferredTask();
|
||||
const second = deferredTask();
|
||||
const tasks = new Set([first.promise, second.promise]);
|
||||
|
||||
const drain = drainPendingToolTasks({ tasks, idleTimeoutMs: 1_000 });
|
||||
first.resolve();
|
||||
await flushPromises();
|
||||
expect(tasks.size).toBe(1);
|
||||
second.resolve();
|
||||
|
||||
await expect(drain).resolves.toEqual({ kind: "settled" });
|
||||
expect(tasks.size).toBe(0);
|
||||
});
|
||||
|
||||
it("resets the idle timeout after each completed task", async () => {
|
||||
vi.useFakeTimers();
|
||||
const first = deferredTask();
|
||||
const second = deferredTask();
|
||||
const onTimeout = vi.fn();
|
||||
const tasks = new Set([first.promise, second.promise]);
|
||||
|
||||
const drain = drainPendingToolTasks({ tasks, idleTimeoutMs: 100, onTimeout });
|
||||
|
||||
await vi.advanceTimersByTimeAsync(80);
|
||||
first.resolve();
|
||||
await Promise.resolve();
|
||||
await vi.advanceTimersByTimeAsync(80);
|
||||
expect(onTimeout).not.toHaveBeenCalled();
|
||||
|
||||
second.resolve();
|
||||
await expect(drain).resolves.toEqual({ kind: "settled" });
|
||||
expect(onTimeout).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("returns timeout when no pending task settles before the idle window", async () => {
|
||||
vi.useFakeTimers();
|
||||
const stuck = deferredTask();
|
||||
const onTimeout = vi.fn();
|
||||
const tasks = new Set([stuck.promise]);
|
||||
|
||||
const drain = drainPendingToolTasks({ tasks, idleTimeoutMs: 100, onTimeout });
|
||||
await vi.advanceTimersByTimeAsync(100);
|
||||
|
||||
await expect(drain).resolves.toEqual({ kind: "timeout", remaining: 1 });
|
||||
expect(onTimeout).toHaveBeenCalledWith(expect.stringContaining("1 task(s) still pending"));
|
||||
expect(tasks.size).toBe(1);
|
||||
});
|
||||
|
||||
it("treats rejected tasks as drained progress", async () => {
|
||||
const failed = deferredTask();
|
||||
const later = deferredTask();
|
||||
const tasks = new Set([failed.promise, later.promise]);
|
||||
|
||||
const drain = drainPendingToolTasks({ tasks, idleTimeoutMs: 1_000 });
|
||||
failed.reject(new Error("send failed"));
|
||||
await flushPromises();
|
||||
expect(tasks.size).toBe(1);
|
||||
later.resolve();
|
||||
|
||||
await expect(drain).resolves.toEqual({ kind: "settled" });
|
||||
});
|
||||
});
|
||||
70
src/auto-reply/reply/pending-tool-task-drain.ts
Normal file
70
src/auto-reply/reply/pending-tool-task-drain.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
export const DEFAULT_PENDING_TOOL_DRAIN_IDLE_TIMEOUT_MS = 30_000;
|
||||
|
||||
export type PendingToolTaskDrainResult =
|
||||
| { kind: "settled" }
|
||||
| { kind: "timeout"; remaining: number };
|
||||
|
||||
type DrainOptions = {
|
||||
tasks: Set<Promise<void>>;
|
||||
idleTimeoutMs?: number;
|
||||
onTimeout?: (message: string) => void;
|
||||
};
|
||||
|
||||
function createIdleTimeoutPromise(timeoutMs: number): {
|
||||
promise: Promise<"timeout">;
|
||||
clear: () => void;
|
||||
} {
|
||||
let timeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
const promise = new Promise<"timeout">((resolve) => {
|
||||
timeoutId = setTimeout(() => resolve("timeout"), timeoutMs);
|
||||
timeoutId.unref?.();
|
||||
});
|
||||
return {
|
||||
promise,
|
||||
clear: () => {
|
||||
if (timeoutId) {
|
||||
clearTimeout(timeoutId);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export async function drainPendingToolTasks({
|
||||
tasks,
|
||||
idleTimeoutMs = DEFAULT_PENDING_TOOL_DRAIN_IDLE_TIMEOUT_MS,
|
||||
onTimeout,
|
||||
}: DrainOptions): Promise<PendingToolTaskDrainResult> {
|
||||
if (tasks.size === 0) {
|
||||
return { kind: "settled" };
|
||||
}
|
||||
if (idleTimeoutMs <= 0) {
|
||||
return { kind: "timeout", remaining: tasks.size };
|
||||
}
|
||||
|
||||
while (tasks.size > 0) {
|
||||
const snapshot = [...tasks];
|
||||
const timeout = createIdleTimeoutPromise(idleTimeoutMs);
|
||||
const outcome = await Promise.race<{ kind: "settled"; task: Promise<void> } | "timeout">([
|
||||
timeout.promise,
|
||||
...snapshot.map((task) =>
|
||||
task.then(
|
||||
() => ({ kind: "settled" as const, task }),
|
||||
() => ({ kind: "settled" as const, task }),
|
||||
),
|
||||
),
|
||||
]);
|
||||
timeout.clear();
|
||||
|
||||
if (outcome === "timeout") {
|
||||
const remaining = tasks.size;
|
||||
onTimeout?.(
|
||||
`pending tool tasks made no progress within ${idleTimeoutMs}ms; proceeding with ${remaining} task(s) still pending to avoid session deadlock`,
|
||||
);
|
||||
return { kind: "timeout", remaining };
|
||||
}
|
||||
|
||||
tasks.delete(outcome.task);
|
||||
}
|
||||
|
||||
return { kind: "settled" };
|
||||
}
|
||||
Reference in New Issue
Block a user