From 303cde8f6087f0b4859b26313df5cdc5cd0e4952 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 26 Apr 2026 10:58:19 -0700 Subject: [PATCH] fix(auto-reply): poison inbound dedupe after partial turn failure * fix(auto-reply): poison inbound dedupe after replay-unsafe failures * fix(clownfish): address review for ghcrawl-165980-agentic-merge (1) --- CHANGELOG.md | 1 + .../reply/dispatch-from-config.test.ts | 89 +++++++++++++++++++ src/auto-reply/reply/dispatch-from-config.ts | 32 ++++++- src/auto-reply/reply/inbound-dedupe.test.ts | 29 ++++++ 4 files changed, 150 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 52bd60a09f7..75a8c6579ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Auto-reply: poison inbound message dedupe after replay-unsafe provider/runtime failures so retries stay safe before visible progress but cannot duplicate messages after block output, tool side effects, or session progress. Fixes #69303; keeps #58549 and #64606 as duplicate validation. Thanks @martingarramon, @NikolaFC, and @zeroth-blip. - Gateway/Bonjour: keep @homebridge/ciao cancellation handlers registered across advertiser restarts so late probing cancellations cannot crash Linux and other mDNS-churned gateways. Thanks @codex. - Plugins/startup: load the default `memory-core` slot during Gateway startup when permitted so active-memory recall can call `memory_search` and `memory_get` without requiring an explicit `plugins.slots.memory` entry, while preserving `plugins.slots.memory: "none"`. Thanks @codex. - Plugins/CLI: prefer native require for compiled bundled plugin JavaScript before jiti so read-only config, status, device, and node commands avoid unnecessary transform overhead on slow hosts. Fixes #62842. Thanks @Effet. diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 22cc2cc944a..9645d912024 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -3392,6 +3392,95 @@ describe("dispatchReplyFromConfig", () => { ); }); + it("poisons inbound dedupe when dispatch fails after a block reply", async () => { + setNoAbort(); + const ctx = buildTestCtx({ + Provider: "whatsapp", + OriginatingChannel: "whatsapp", + OriginatingTo: "whatsapp:+15555550125", + To: "whatsapp:+15555550125", + AccountId: "default", + MessageSid: "msg-dup-block-error", + SessionKey: "agent:main:whatsapp:direct:+15555550125", + CommandBody: "hello", + RawBody: "hello", + Body: "hello", + }); + const firstDispatcher = createDispatcher(); + const replyResolver = vi.fn( + async (_ctx: MsgContext, opts?: GetReplyOptions): Promise => { + await opts?.onBlockReply?.({ text: "partial answer" }); + throw new Error("provider failed after block"); + }, + ); + + await expect( + dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher: firstDispatcher, + replyResolver, + }), + ).rejects.toThrow("provider failed after block"); + + await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher: createDispatcher(), + replyResolver, + }); + + expect(firstDispatcher.sendBlockReply).toHaveBeenCalledWith({ text: "partial answer" }); + expect(replyResolver).toHaveBeenCalledTimes(1); + }); + + it("poisons inbound dedupe when dispatch fails after a suppressed tool result", async () => { + setNoAbort(); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "deny", + }; + const ctx = buildTestCtx({ + Provider: "whatsapp", + OriginatingChannel: "whatsapp", + OriginatingTo: "whatsapp:+15555550126", + To: "whatsapp:+15555550126", + AccountId: "default", + MessageSid: "msg-dup-tool-error", + SessionKey: "agent:main:whatsapp:direct:+15555550126", + CommandBody: "hello", + RawBody: "hello", + Body: "hello", + }); + const firstDispatcher = createDispatcher(); + const replyResolver = vi.fn( + async (_ctx: MsgContext, opts?: GetReplyOptions): Promise => { + await opts?.onToolResult?.({ text: "tool touched external state" }); + throw new Error("provider failed after tool"); + }, + ); + + await expect( + dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher: firstDispatcher, + replyResolver, + }), + ).rejects.toThrow("provider failed after tool"); + + await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher: createDispatcher(), + replyResolver, + }); + + expect(firstDispatcher.sendToolResult).not.toHaveBeenCalled(); + expect(replyResolver).toHaveBeenCalledTimes(1); + }); + it("passes configOverride to replyResolver when provided", async () => { setNoAbort(); const cfg = emptyConfig; diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 9e48a73367d..ccd05fb9c1f 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -343,6 +343,10 @@ export async function dispatchReplyFromConfig( recordProcessed("skipped", { reason: "duplicate" }); return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; } + let inboundDedupeReplayUnsafe = false; + const markInboundDedupeReplayUnsafe = () => { + inboundDedupeReplayUnsafe = true; + }; const initialSessionStoreEntry = resolveSessionStoreLookup(ctx, cfg); const boundAcpDispatchSessionKey = resolveBoundAcpDispatchSessionKey({ ctx, cfg }); @@ -473,6 +477,7 @@ export async function dispatchReplyFromConfig( if (!shouldRouteToOriginating || !routeReplyChannel || !routeReplyTo || !routeReplyRuntime) { return null; } + markInboundDedupeReplayUnsafe(); return await routeReplyRuntime.routeReply({ payload, channel: routeReplyChannel, @@ -538,6 +543,7 @@ export async function dispatchReplyFromConfig( } return result.ok; } + markInboundDedupeReplayUnsafe(); return mode === "additive" ? dispatcher.sendToolResult(payload) : dispatcher.sendFinalReply(payload); @@ -721,6 +727,7 @@ export async function dispatchReplyFromConfig( ); } } else { + markInboundDedupeReplayUnsafe(); queuedFinal = dispatcher.sendFinalReply(payload); } } else { @@ -744,6 +751,9 @@ export async function dispatchReplyFromConfig( const sendFinalPayload = async ( payload: ReplyPayload, ): Promise<{ queuedFinal: boolean; routedFinalCount: number }> => { + if (resolveSendableOutboundReplyParts(payload).hasContent) { + markInboundDedupeReplayUnsafe(); + } const ttsPayload = await maybeApplyTtsToReplyPayload({ payload, cfg, @@ -767,6 +777,7 @@ export async function dispatchReplyFromConfig( routedFinalCount: result.ok ? 1 : 0, }; } + markInboundDedupeReplayUnsafe(); return { queuedFinal: dispatcher.sendFinalReply(normalizedPayload), routedFinalCount: 0, @@ -898,6 +909,7 @@ export async function dispatchReplyFromConfig( await sendPayloadAsync(payload, undefined, false); return; } + markInboundDedupeReplayUnsafe(); dispatcher.sendToolResult(payload); }; const sendPlanUpdate = async (payload: { @@ -914,6 +926,7 @@ export async function dispatchReplyFromConfig( await sendPayloadAsync(replyPayload, undefined, false); return; } + markInboundDedupeReplayUnsafe(); dispatcher.sendToolResult(replyPayload); }; const summarizeApprovalLabel = (payload: { @@ -1019,6 +1032,7 @@ export async function dispatchReplyFromConfig( suppressTyping: typing.suppressTyping, onToolResult: (payload: ReplyPayload) => { const run = async () => { + markInboundDedupeReplayUnsafe(); await onToolResultFromReplyOptions?.(payload); if (suppressDelivery) { return; @@ -1055,12 +1069,14 @@ export async function dispatchReplyFromConfig( if (shouldRouteToOriginating) { await sendPayloadAsync(deliveryPayload, undefined, false); } else { + markInboundDedupeReplayUnsafe(); dispatcher.sendToolResult(deliveryPayload); } }; return run(); }, onPlanUpdate: async (payload) => { + markInboundDedupeReplayUnsafe(); await onPlanUpdateFromReplyOptions?.(payload); if (payload.phase !== "update" || suppressDefaultToolProgressMessages) { return; @@ -1068,6 +1084,7 @@ export async function dispatchReplyFromConfig( await sendPlanUpdate({ explanation: payload.explanation, steps: payload.steps }); }, onApprovalEvent: async (payload) => { + markInboundDedupeReplayUnsafe(); await onApprovalEventFromReplyOptions?.(payload); if (payload.phase !== "requested" || suppressDefaultToolProgressMessages) { return; @@ -1083,6 +1100,7 @@ export async function dispatchReplyFromConfig( await maybeSendWorkingStatus(label); }, onPatchSummary: async (payload) => { + markInboundDedupeReplayUnsafe(); await onPatchSummaryFromReplyOptions?.(payload); if (payload.phase !== "end" || suppressDefaultToolProgressMessages) { return; @@ -1095,6 +1113,12 @@ export async function dispatchReplyFromConfig( }, onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => { const run = async () => { + if ( + payload.isReasoning !== true && + resolveSendableOutboundReplyParts(payload).hasContent + ) { + markInboundDedupeReplayUnsafe(); + } if (suppressDelivery) { return; } @@ -1156,6 +1180,7 @@ export async function dispatchReplyFromConfig( if (shouldRouteToOriginating) { await sendPayloadAsync(normalizedPayload, context?.abortSignal, false); } else { + markInboundDedupeReplayUnsafe(); dispatcher.sendBlockReply(normalizedPayload); } }; @@ -1268,6 +1293,7 @@ export async function dispatchReplyFromConfig( ); } } else { + markInboundDedupeReplayUnsafe(); const didQueue = dispatcher.sendFinalReply(normalizedTtsOnlyPayload); queuedFinal = didQueue || queuedFinal; } @@ -1293,7 +1319,11 @@ export async function dispatchReplyFromConfig( return { queuedFinal, counts }; } catch (err) { if (inboundDedupeClaim.status === "claimed") { - releaseInboundDedupe(inboundDedupeClaim.key); + if (inboundDedupeReplayUnsafe) { + commitInboundDedupe(inboundDedupeClaim.key); + } else { + releaseInboundDedupe(inboundDedupeClaim.key); + } } recordProcessed("error", { error: String(err) }); markIdle("message_error"); diff --git a/src/auto-reply/reply/inbound-dedupe.test.ts b/src/auto-reply/reply/inbound-dedupe.test.ts index f73a8a9edb6..ba6d029a0aa 100644 --- a/src/auto-reply/reply/inbound-dedupe.test.ts +++ b/src/auto-reply/reply/inbound-dedupe.test.ts @@ -72,4 +72,33 @@ describe("inbound dedupe", () => { inboundB.resetInboundDedupe(); } }); + + it("shares claim/commit state across distinct module instances", async () => { + const inboundA = await importFreshModule( + import.meta.url, + "./inbound-dedupe.js?scope=commit-a", + ); + const inboundB = await importFreshModule( + import.meta.url, + "./inbound-dedupe.js?scope=commit-b", + ); + + inboundA.resetInboundDedupe(); + inboundB.resetInboundDedupe(); + + try { + const firstClaim = inboundA.claimInboundDedupe(sharedInboundContext); + expect(firstClaim).toMatchObject({ status: "claimed" }); + if (firstClaim.status !== "claimed") { + throw new Error("expected claimed inbound dedupe result"); + } + inboundA.commitInboundDedupe(firstClaim.key); + expect(inboundB.claimInboundDedupe(sharedInboundContext)).toMatchObject({ + status: "duplicate", + }); + } finally { + inboundA.resetInboundDedupe(); + inboundB.resetInboundDedupe(); + } + }); });