From c5cd7aabcf4e1bfc87d8b2d3468db1638f366643 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 28 Apr 2026 10:45:47 +0100 Subject: [PATCH] fix(auto-reply): bound pending tool result drain --- CHANGELOG.md | 1 + src/auto-reply/reply/agent-runner.ts | 7 +- .../reply/pending-tool-task-drain.test.ts | 92 +++++++++++++++++++ .../reply/pending-tool-task-drain.ts | 70 ++++++++++++++ 4 files changed, 169 insertions(+), 1 deletion(-) create mode 100644 src/auto-reply/reply/pending-tool-task-drain.test.ts create mode 100644 src/auto-reply/reply/pending-tool-task-drain.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index b3230fd5510..36be8a1c929 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai - Plugins/inspector: keep bundled plugin runtime capture quiet and config-tolerant for Codex, memory-lancedb, Feishu, Mattermost, QQBot, and Tlon so plugin-inspector JSON checks can validate the full bundled set. Thanks @vincentkoc. - Slack/auto-reply: keep fully consumed text reset triggers such as `new session` out of `BodyForAgent` after directive cleanup, so configured Slack reset phrases do not leak into the fresh model turn. Fixes #73137. Thanks @neeravmakwana. - Plugins/runtime deps: prune stale retained bundled runtime deps and keep doctor/secret channel contract scans on lightweight artifacts, so disabled bundled channels stop preserving old dependency trees or importing heavy plugin surfaces. Thanks @SymbolStar and @vincentkoc. +- Auto-reply: bound the post-run pending tool-result delivery drain with a progress-aware idle timeout, so a never-settling tool-result task no longer leaves the session active forever while slow healthy deliveries can keep draining. Fixes #53889; supersedes #64733 and #73434. Thanks @zijunl and @wujiaming88. - Gateway/startup: start chat channels without waiting for primary model prewarm, keeping model warmup bounded in the background so Slack and other channels come online promptly when provider discovery is slow. Supersedes #73420. Thanks @dorukardahan. - Gateway/install: carry env-backed config SecretRefs such as `channels.discord.token` into generated service environments when they are present only in the installing shell, while keeping gateway auth SecretRefs non-persisted. Fixes #67817; supersedes #73426. Thanks @wdimaculangan and @ztexydt-cqh. - Auto-reply/commands: stop bare `/reset` and `/new` after reset hooks acknowledge the command, so non-ACP channels no longer fall through into empty provider calls while `/reset ` and `/new ` still seed the next model turn. Fixes #73367 and #73412. Thanks @hoyanhan, @wenxu007, and @amdhelper. diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 47daefa238e..0214795c60e 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -15,6 +15,7 @@ import { } from "../../config/sessions.js"; import type { TypingMode } from "../../config/types.js"; import { resolveSessionTranscriptCandidates } from "../../gateway/session-utils.fs.js"; +import { logVerbose } from "../../globals.js"; import { emitAgentEvent } from "../../infra/agent-events.js"; import { emitTrustedDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js"; import { @@ -60,6 +61,7 @@ import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-repl import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js"; import { createFollowupRunner } from "./followup-runner.js"; import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-routing.js"; +import { drainPendingToolTasks } from "./pending-tool-task-drain.js"; import { readPostCompactionContext } from "./post-compaction-context.js"; import { resolveActiveRunQueueAction } from "./queue-policy.js"; import { @@ -1230,7 +1232,10 @@ export async function runReplyAgent(params: { blockReplyPipeline.stop(); } if (pendingToolTasks.size > 0) { - await Promise.allSettled(pendingToolTasks); + await drainPendingToolTasks({ + tasks: pendingToolTasks, + onTimeout: logVerbose, + }); } const usage = runResult.meta?.agentMeta?.usage; diff --git a/src/auto-reply/reply/pending-tool-task-drain.test.ts b/src/auto-reply/reply/pending-tool-task-drain.test.ts new file mode 100644 index 00000000000..ed37dab6492 --- /dev/null +++ b/src/auto-reply/reply/pending-tool-task-drain.test.ts @@ -0,0 +1,92 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { drainPendingToolTasks } from "./pending-tool-task-drain.js"; + +function deferredTask() { + let resolve!: () => void; + let reject!: (error: Error) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +async function flushPromises() { + await Promise.resolve(); + await Promise.resolve(); +} + +describe("drainPendingToolTasks", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("settles immediately when there are no pending tasks", async () => { + await expect(drainPendingToolTasks({ tasks: new Set() })).resolves.toEqual({ + kind: "settled", + }); + }); + + it("waits for all pending tasks to settle", async () => { + const first = deferredTask(); + const second = deferredTask(); + const tasks = new Set([first.promise, second.promise]); + + const drain = drainPendingToolTasks({ tasks, idleTimeoutMs: 1_000 }); + first.resolve(); + await flushPromises(); + expect(tasks.size).toBe(1); + second.resolve(); + + await expect(drain).resolves.toEqual({ kind: "settled" }); + expect(tasks.size).toBe(0); + }); + + it("resets the idle timeout after each completed task", async () => { + vi.useFakeTimers(); + const first = deferredTask(); + const second = deferredTask(); + const onTimeout = vi.fn(); + const tasks = new Set([first.promise, second.promise]); + + const drain = drainPendingToolTasks({ tasks, idleTimeoutMs: 100, onTimeout }); + + await vi.advanceTimersByTimeAsync(80); + first.resolve(); + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(80); + expect(onTimeout).not.toHaveBeenCalled(); + + second.resolve(); + await expect(drain).resolves.toEqual({ kind: "settled" }); + expect(onTimeout).not.toHaveBeenCalled(); + }); + + it("returns timeout when no pending task settles before the idle window", async () => { + vi.useFakeTimers(); + const stuck = deferredTask(); + const onTimeout = vi.fn(); + const tasks = new Set([stuck.promise]); + + const drain = drainPendingToolTasks({ tasks, idleTimeoutMs: 100, onTimeout }); + await vi.advanceTimersByTimeAsync(100); + + await expect(drain).resolves.toEqual({ kind: "timeout", remaining: 1 }); + expect(onTimeout).toHaveBeenCalledWith(expect.stringContaining("1 task(s) still pending")); + expect(tasks.size).toBe(1); + }); + + it("treats rejected tasks as drained progress", async () => { + const failed = deferredTask(); + const later = deferredTask(); + const tasks = new Set([failed.promise, later.promise]); + + const drain = drainPendingToolTasks({ tasks, idleTimeoutMs: 1_000 }); + failed.reject(new Error("send failed")); + await flushPromises(); + expect(tasks.size).toBe(1); + later.resolve(); + + await expect(drain).resolves.toEqual({ kind: "settled" }); + }); +}); diff --git a/src/auto-reply/reply/pending-tool-task-drain.ts b/src/auto-reply/reply/pending-tool-task-drain.ts new file mode 100644 index 00000000000..defe1b41d3e --- /dev/null +++ b/src/auto-reply/reply/pending-tool-task-drain.ts @@ -0,0 +1,70 @@ +export const DEFAULT_PENDING_TOOL_DRAIN_IDLE_TIMEOUT_MS = 30_000; + +export type PendingToolTaskDrainResult = + | { kind: "settled" } + | { kind: "timeout"; remaining: number }; + +type DrainOptions = { + tasks: Set>; + idleTimeoutMs?: number; + onTimeout?: (message: string) => void; +}; + +function createIdleTimeoutPromise(timeoutMs: number): { + promise: Promise<"timeout">; + clear: () => void; +} { + let timeoutId: ReturnType | undefined; + const promise = new Promise<"timeout">((resolve) => { + timeoutId = setTimeout(() => resolve("timeout"), timeoutMs); + timeoutId.unref?.(); + }); + return { + promise, + clear: () => { + if (timeoutId) { + clearTimeout(timeoutId); + } + }, + }; +} + +export async function drainPendingToolTasks({ + tasks, + idleTimeoutMs = DEFAULT_PENDING_TOOL_DRAIN_IDLE_TIMEOUT_MS, + onTimeout, +}: DrainOptions): Promise { + if (tasks.size === 0) { + return { kind: "settled" }; + } + if (idleTimeoutMs <= 0) { + return { kind: "timeout", remaining: tasks.size }; + } + + while (tasks.size > 0) { + const snapshot = [...tasks]; + const timeout = createIdleTimeoutPromise(idleTimeoutMs); + const outcome = await Promise.race<{ kind: "settled"; task: Promise } | "timeout">([ + timeout.promise, + ...snapshot.map((task) => + task.then( + () => ({ kind: "settled" as const, task }), + () => ({ kind: "settled" as const, task }), + ), + ), + ]); + timeout.clear(); + + if (outcome === "timeout") { + const remaining = tasks.size; + onTimeout?.( + `pending tool tasks made no progress within ${idleTimeoutMs}ms; proceeding with ${remaining} task(s) still pending to avoid session deadlock`, + ); + return { kind: "timeout", remaining }; + } + + tasks.delete(outcome.task); + } + + return { kind: "settled" }; +}