mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
fix: serialize tool result delivery to preserve message ordering (#21231)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 68adbf58c8
Co-authored-by: ahdernasr <44983175+ahdernasr@users.noreply.github.com>
Co-authored-by: joshavant <830519+joshavant@users.noreply.github.com>
Reviewed-by: @joshavant
This commit is contained in:
@@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- Discord/Gateway: handle close code 4014 (missing privileged gateway intents) without crashing the gateway. Thanks @thewilloftheshadow.
|
- Discord/Gateway: handle close code 4014 (missing privileged gateway intents) without crashing the gateway. Thanks @thewilloftheshadow.
|
||||||
- Security/Net: strip sensitive headers (`Authorization`, `Proxy-Authorization`, `Cookie`, `Cookie2`) on cross-origin redirects in `fetchWithSsrFGuard` to prevent credential forwarding across origin boundaries. (#20313) Thanks @afurm.
|
- Security/Net: strip sensitive headers (`Authorization`, `Proxy-Authorization`, `Cookie`, `Cookie2`) on cross-origin redirects in `fetchWithSsrFGuard` to prevent credential forwarding across origin boundaries. (#20313) Thanks @afurm.
|
||||||
- Auto-reply/Runner: emit `onAgentRunStart` only after agent lifecycle or tool activity begins (and only once per run), so fallback preflight errors no longer mark runs as started. (#21165) Thanks @shakkernerd.
|
- Auto-reply/Runner: emit `onAgentRunStart` only after agent lifecycle or tool activity begins (and only once per run), so fallback preflight errors no longer mark runs as started. (#21165) Thanks @shakkernerd.
|
||||||
|
- Auto-reply/Tool results: serialize tool-result delivery and keep the delivery chain progressing after individual failures so concurrent tool outputs preserve user-visible ordering. (#21231) thanks @ahdernasr.
|
||||||
- Auto-reply/Prompt caching: restore prefix-cache stability by keeping inbound system metadata session-stable and moving per-message IDs (`message_id`, `message_id_full`, `reply_to_id`, `sender_id`) into untrusted conversation context. (#20597) Thanks @anisoptera.
|
- Auto-reply/Prompt caching: restore prefix-cache stability by keeping inbound system metadata session-stable and moving per-message IDs (`message_id`, `message_id_full`, `reply_to_id`, `sender_id`) into untrusted conversation context. (#20597) Thanks @anisoptera.
|
||||||
- CLI/Onboarding: fix Anthropic-compatible custom provider verification by normalizing base URLs to avoid duplicate `/v1` paths during setup checks. (#21336) Thanks @17jmumford.
|
- CLI/Onboarding: fix Anthropic-compatible custom provider verification by normalizing base URLs to avoid duplicate `/v1` paths during setup checks. (#21336) Thanks @17jmumford.
|
||||||
- Security/Dependencies: bump transitive `hono` usage to `4.11.10` to incorporate timing-safe authentication comparison hardening for `basicAuth`/`bearerAuth` (`GHSA-gq3j-xvxp-8hrf`). Thanks @vincentkoc.
|
- Security/Dependencies: bump transitive `hono` usage to `4.11.10` to incorporate timing-safe authentication comparison hardening for `basicAuth`/`bearerAuth` (`GHSA-gq3j-xvxp-8hrf`). Thanks @vincentkoc.
|
||||||
|
|||||||
@@ -379,29 +379,35 @@ export async function runAgentTurnWithFallback(params: {
|
|||||||
shouldEmitToolResult: params.shouldEmitToolResult,
|
shouldEmitToolResult: params.shouldEmitToolResult,
|
||||||
shouldEmitToolOutput: params.shouldEmitToolOutput,
|
shouldEmitToolOutput: params.shouldEmitToolOutput,
|
||||||
onToolResult: onToolResult
|
onToolResult: onToolResult
|
||||||
? (payload) => {
|
? (() => {
|
||||||
// `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them.
|
// Serialize tool result delivery to preserve message ordering.
|
||||||
// If a tool callback starts typing after the run finalized, we can end up with
|
// Without this, concurrent tool callbacks race through typing signals
|
||||||
// a typing loop that never sees a matching markRunComplete(). Track and drain.
|
// and message sends, causing out-of-order delivery to the user.
|
||||||
const task = (async () => {
|
// See: https://github.com/openclaw/openclaw/issues/11044
|
||||||
const { text, skip } = normalizeStreamingText(payload);
|
let toolResultChain: Promise<void> = Promise.resolve();
|
||||||
if (skip) {
|
return (payload: ReplyPayload) => {
|
||||||
return;
|
toolResultChain = toolResultChain
|
||||||
}
|
.then(async () => {
|
||||||
await params.typingSignals.signalTextDelta(text);
|
const { text, skip } = normalizeStreamingText(payload);
|
||||||
await onToolResult({
|
if (skip) {
|
||||||
text,
|
return;
|
||||||
mediaUrls: payload.mediaUrls,
|
}
|
||||||
});
|
await params.typingSignals.signalTextDelta(text);
|
||||||
})()
|
await onToolResult({
|
||||||
.catch((err) => {
|
text,
|
||||||
logVerbose(`tool result delivery failed: ${String(err)}`);
|
mediaUrls: payload.mediaUrls,
|
||||||
})
|
});
|
||||||
.finally(() => {
|
})
|
||||||
|
.catch((err) => {
|
||||||
|
// Keep chain healthy after an error so later tool results still deliver.
|
||||||
|
logVerbose(`tool result delivery failed: ${String(err)}`);
|
||||||
|
});
|
||||||
|
const task = toolResultChain.finally(() => {
|
||||||
params.pendingToolTasks.delete(task);
|
params.pendingToolTasks.delete(task);
|
||||||
});
|
});
|
||||||
params.pendingToolTasks.add(task);
|
params.pendingToolTasks.add(task);
|
||||||
}
|
};
|
||||||
|
})()
|
||||||
: undefined,
|
: undefined,
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -533,6 +533,61 @@ describe("runReplyAgent typing (heartbeat)", () => {
|
|||||||
vi.useRealTimers();
|
vi.useRealTimers();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("delivers tool results in order even when dispatched concurrently", async () => {
|
||||||
|
const deliveryOrder: string[] = [];
|
||||||
|
const onToolResult = vi.fn(async (payload: { text?: string }) => {
|
||||||
|
// Simulate variable network latency: first result is slower than second
|
||||||
|
const delay = payload.text === "first" ? 50 : 10;
|
||||||
|
await new Promise((r) => setTimeout(r, delay));
|
||||||
|
deliveryOrder.push(payload.text ?? "");
|
||||||
|
});
|
||||||
|
|
||||||
|
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
|
||||||
|
// Fire two tool results without awaiting — simulates concurrent tool completion
|
||||||
|
void params.onToolResult?.({ text: "first", mediaUrls: [] });
|
||||||
|
void params.onToolResult?.({ text: "second", mediaUrls: [] });
|
||||||
|
// Small delay to let the chain settle before returning
|
||||||
|
await new Promise((r) => setTimeout(r, 150));
|
||||||
|
return { payloads: [{ text: "final" }], meta: {} };
|
||||||
|
});
|
||||||
|
|
||||||
|
const { run } = createMinimalRun({
|
||||||
|
typingMode: "message",
|
||||||
|
opts: { onToolResult },
|
||||||
|
});
|
||||||
|
await run();
|
||||||
|
|
||||||
|
expect(onToolResult).toHaveBeenCalledTimes(2);
|
||||||
|
// Despite "first" having higher latency, it must be delivered before "second"
|
||||||
|
expect(deliveryOrder).toEqual(["first", "second"]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("continues delivering later tool results after an earlier tool result fails", async () => {
|
||||||
|
const delivered: string[] = [];
|
||||||
|
const onToolResult = vi.fn(async (payload: { text?: string }) => {
|
||||||
|
if (payload.text === "first") {
|
||||||
|
throw new Error("simulated delivery failure");
|
||||||
|
}
|
||||||
|
delivered.push(payload.text ?? "");
|
||||||
|
});
|
||||||
|
|
||||||
|
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
|
||||||
|
void params.onToolResult?.({ text: "first", mediaUrls: [] });
|
||||||
|
void params.onToolResult?.({ text: "second", mediaUrls: [] });
|
||||||
|
await new Promise((r) => setTimeout(r, 50));
|
||||||
|
return { payloads: [{ text: "final" }], meta: {} };
|
||||||
|
});
|
||||||
|
|
||||||
|
const { run } = createMinimalRun({
|
||||||
|
typingMode: "message",
|
||||||
|
opts: { onToolResult },
|
||||||
|
});
|
||||||
|
await run();
|
||||||
|
|
||||||
|
expect(onToolResult).toHaveBeenCalledTimes(2);
|
||||||
|
expect(delivered).toEqual(["second"]);
|
||||||
|
});
|
||||||
|
|
||||||
it("announces auto-compaction in verbose mode and tracks count", async () => {
|
it("announces auto-compaction in verbose mode and tracks count", async () => {
|
||||||
await withTempStateDir(async (stateDir) => {
|
await withTempStateDir(async (stateDir) => {
|
||||||
const storePath = path.join(stateDir, "sessions", "sessions.json");
|
const storePath = path.join(stateDir, "sessions", "sessions.json");
|
||||||
|
|||||||
Reference in New Issue
Block a user