From e321f21daaadb3e932bd613576fafcaacb87b974 Mon Sep 17 00:00:00 2001 From: ahdernasr Date: Fri, 20 Feb 2026 01:23:23 +0000 Subject: [PATCH] fix: serialize tool result delivery to preserve message ordering (#21231) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 68adbf58c8ec92d32e6183b0c6432963b2b4f9d8 Co-authored-by: ahdernasr <44983175+ahdernasr@users.noreply.github.com> Co-authored-by: joshavant <830519+joshavant@users.noreply.github.com> Reviewed-by: @joshavant --- CHANGELOG.md | 1 + .../reply/agent-runner-execution.ts | 48 +++++++++------- .../reply/agent-runner.runreplyagent.test.ts | 55 +++++++++++++++++++ 3 files changed, 83 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e7cbaee962..168d522d8fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai - Discord/Gateway: handle close code 4014 (missing privileged gateway intents) without crashing the gateway. Thanks @thewilloftheshadow. - Security/Net: strip sensitive headers (`Authorization`, `Proxy-Authorization`, `Cookie`, `Cookie2`) on cross-origin redirects in `fetchWithSsrFGuard` to prevent credential forwarding across origin boundaries. (#20313) Thanks @afurm. - Auto-reply/Runner: emit `onAgentRunStart` only after agent lifecycle or tool activity begins (and only once per run), so fallback preflight errors no longer mark runs as started. (#21165) Thanks @shakkernerd. +- Auto-reply/Tool results: serialize tool-result delivery and keep the delivery chain progressing after individual failures so concurrent tool outputs preserve user-visible ordering. (#21231) thanks @ahdernasr. - Auto-reply/Prompt caching: restore prefix-cache stability by keeping inbound system metadata session-stable and moving per-message IDs (`message_id`, `message_id_full`, `reply_to_id`, `sender_id`) into untrusted conversation context. (#20597) Thanks @anisoptera. - CLI/Onboarding: fix Anthropic-compatible custom provider verification by normalizing base URLs to avoid duplicate `/v1` paths during setup checks. (#21336) Thanks @17jmumford. - Security/Dependencies: bump transitive `hono` usage to `4.11.10` to incorporate timing-safe authentication comparison hardening for `basicAuth`/`bearerAuth` (`GHSA-gq3j-xvxp-8hrf`). Thanks @vincentkoc. diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 1bc0d9ed0f8..4becf72c780 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -379,29 +379,35 @@ export async function runAgentTurnWithFallback(params: { shouldEmitToolResult: params.shouldEmitToolResult, shouldEmitToolOutput: params.shouldEmitToolOutput, onToolResult: onToolResult - ? (payload) => { - // `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them. - // If a tool callback starts typing after the run finalized, we can end up with - // a typing loop that never sees a matching markRunComplete(). Track and drain. - const task = (async () => { - const { text, skip } = normalizeStreamingText(payload); - if (skip) { - return; - } - await params.typingSignals.signalTextDelta(text); - await onToolResult({ - text, - mediaUrls: payload.mediaUrls, - }); - })() - .catch((err) => { - logVerbose(`tool result delivery failed: ${String(err)}`); - }) - .finally(() => { + ? (() => { + // Serialize tool result delivery to preserve message ordering. + // Without this, concurrent tool callbacks race through typing signals + // and message sends, causing out-of-order delivery to the user. + // See: https://github.com/openclaw/openclaw/issues/11044 + let toolResultChain: Promise = Promise.resolve(); + return (payload: ReplyPayload) => { + toolResultChain = toolResultChain + .then(async () => { + const { text, skip } = normalizeStreamingText(payload); + if (skip) { + return; + } + await params.typingSignals.signalTextDelta(text); + await onToolResult({ + text, + mediaUrls: payload.mediaUrls, + }); + }) + .catch((err) => { + // Keep chain healthy after an error so later tool results still deliver. + logVerbose(`tool result delivery failed: ${String(err)}`); + }); + const task = toolResultChain.finally(() => { params.pendingToolTasks.delete(task); }); - params.pendingToolTasks.add(task); - } + params.pendingToolTasks.add(task); + }; + })() : undefined, }); }, diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.test.ts index f87f8279b9f..b009a8b633b 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.test.ts @@ -533,6 +533,61 @@ describe("runReplyAgent typing (heartbeat)", () => { vi.useRealTimers(); }); + it("delivers tool results in order even when dispatched concurrently", async () => { + const deliveryOrder: string[] = []; + const onToolResult = vi.fn(async (payload: { text?: string }) => { + // Simulate variable network latency: first result is slower than second + const delay = payload.text === "first" ? 50 : 10; + await new Promise((r) => setTimeout(r, delay)); + deliveryOrder.push(payload.text ?? ""); + }); + + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + // Fire two tool results without awaiting — simulates concurrent tool completion + void params.onToolResult?.({ text: "first", mediaUrls: [] }); + void params.onToolResult?.({ text: "second", mediaUrls: [] }); + // Small delay to let the chain settle before returning + await new Promise((r) => setTimeout(r, 150)); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run } = createMinimalRun({ + typingMode: "message", + opts: { onToolResult }, + }); + await run(); + + expect(onToolResult).toHaveBeenCalledTimes(2); + // Despite "first" having higher latency, it must be delivered before "second" + expect(deliveryOrder).toEqual(["first", "second"]); + }); + + it("continues delivering later tool results after an earlier tool result fails", async () => { + const delivered: string[] = []; + const onToolResult = vi.fn(async (payload: { text?: string }) => { + if (payload.text === "first") { + throw new Error("simulated delivery failure"); + } + delivered.push(payload.text ?? ""); + }); + + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + void params.onToolResult?.({ text: "first", mediaUrls: [] }); + void params.onToolResult?.({ text: "second", mediaUrls: [] }); + await new Promise((r) => setTimeout(r, 50)); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const { run } = createMinimalRun({ + typingMode: "message", + opts: { onToolResult }, + }); + await run(); + + expect(onToolResult).toHaveBeenCalledTimes(2); + expect(delivered).toEqual(["second"]); + }); + it("announces auto-compaction in verbose mode and tracks count", async () => { await withTempStateDir(async (stateDir) => { const storePath = path.join(stateDir, "sessions", "sessions.json");