From 80148cd2a2eff251783ede3c2efa96691479d89c Mon Sep 17 00:00:00 2001 From: Berg Date: Thu, 19 Feb 2026 20:15:43 +0000 Subject: [PATCH] fix: serialize tool result delivery to preserve message ordering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tool result callbacks were dispatched as fire-and-forget tasks via pendingToolTasks, with no ordering guarantee. When multiple tool calls completed near-simultaneously, their typing signals and message sends raced through independent async paths, causing out-of-order delivery to the user — particularly visible on Telegram. Replace the fire-and-forget Set pattern with a serialized promise chain (toolResultChain). Each tool result now awaits the previous one before starting its typing signal and delivery, ensuring messages arrive in the order they were produced by the agent. The existing pendingToolTasks tracking is preserved so the post-run Promise.allSettled() drain still works correctly. Fixes #11044 --- .../reply/agent-runner-execution.ts | 50 ++++++++++--------- .../reply/agent-runner.runreplyagent.test.ts | 29 +++++++++++ 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 1bc0d9ed0f8..5599e96c161 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -379,29 +379,33 @@ export async function runAgentTurnWithFallback(params: { shouldEmitToolResult: params.shouldEmitToolResult, shouldEmitToolOutput: params.shouldEmitToolOutput, onToolResult: onToolResult - ? (payload) => { - // `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them. - // If a tool callback starts typing after the run finalized, we can end up with - // a typing loop that never sees a matching markRunComplete(). Track and drain. - const task = (async () => { - const { text, skip } = normalizeStreamingText(payload); - if (skip) { - return; - } - await params.typingSignals.signalTextDelta(text); - await onToolResult({ - text, - mediaUrls: payload.mediaUrls, - }); - })() - .catch((err) => { - logVerbose(`tool result delivery failed: ${String(err)}`); - }) - .finally(() => { - params.pendingToolTasks.delete(task); - }); - params.pendingToolTasks.add(task); - } + ? (() => { + // Serialize tool result delivery to preserve message ordering. + // Without this, concurrent tool callbacks race through typing signals + // and message sends, causing out-of-order delivery to the user. + // See: https://github.com/openclaw/openclaw/issues/11044 + let toolResultChain: Promise = Promise.resolve(); + return (payload: ReplyPayload) => { + const task = (toolResultChain = toolResultChain.then(async () => { + const { text, skip } = normalizeStreamingText(payload); + if (skip) { + return; + } + await params.typingSignals.signalTextDelta(text); + await onToolResult({ + text, + mediaUrls: payload.mediaUrls, + }); + })) + .catch((err) => { + logVerbose(`tool result delivery failed: ${String(err)}`); + }) + .finally(() => { + params.pendingToolTasks.delete(task); + }); + params.pendingToolTasks.add(task); + }; + })() : undefined, }); }, diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.test.ts index f87f8279b9f..6492cd757cc 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.test.ts @@ -533,6 +533,35 @@ describe("runReplyAgent typing (heartbeat)", () => { vi.useRealTimers(); }); + it("delivers tool results in order even when dispatched concurrently", async () => { + const deliveryOrder: string[] = []; + const onToolResult = vi.fn(async (payload: { text?: string }) => { + // Simulate variable network latency: first result is slower than second + const delay = payload.text === "first" ? 50 : 10; + await new Promise((r) => setTimeout(r, delay)); + deliveryOrder.push(payload.text ?? ""); + }); + + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + // Fire two tool results without awaiting — simulates concurrent tool completion + void params.onToolResult?.({ text: "first", mediaUrls: [] }); + void params.onToolResult?.({ text: "second", mediaUrls: [] }); + // Small delay to let the chain settle before returning + await new Promise((r) => setTimeout(r, 150)); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run } = createMinimalRun({ + typingMode: "message", + opts: { onToolResult }, + }); + await run(); + + expect(onToolResult).toHaveBeenCalledTimes(2); + // Despite "first" having higher latency, it must be delivered before "second" + expect(deliveryOrder).toEqual(["first", "second"]); + }); + it("announces auto-compaction in verbose mode and tracks count", async () => { await withTempStateDir(async (stateDir) => { const storePath = path.join(stateDir, "sessions", "sessions.json");