From ca83f0fd7a3a6ea9a017a135dd4864e294c23b0f Mon Sep 17 00:00:00 2001 From: Neerav Makwana <261249544+neeravmakwana@users.noreply.github.com> Date: Wed, 22 Apr 2026 21:43:57 -0400 Subject: [PATCH] fix(outbound): bail live delivery when a concurrent drain wins the claim If a reconnect/startup drain observes the newly enqueued queue entry and calls claimRecoveryEntry before the live delivery path reaches tryClaimActiveDelivery, tryClaimActiveDelivery returns false. Previously the live path still proceeded to deliverOutboundPayloadsCore and then ack/fail, which would race the drain's own delivery and ack/fail for the same entry id and produce duplicate outbound messages. Treat a failed claim acquisition as "another in-process owner is already handling this queue entry" and bail out with an empty result array, leaving the queue entry in place for the drain to deliver and clean up. This closes the narrow residual race called out by the Aisle security review on openclaw/openclaw#70428. Made-with: Cursor --- src/infra/outbound/deliver.test.ts | 27 +++++++++++++++++++++++++++ src/infra/outbound/deliver.ts | 9 +++++++++ 2 files changed, 36 insertions(+) diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index eb250133be8..6e6b62e2c24 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -266,6 +266,9 @@ describe("deliverOutboundPayloads", () => { queueMocks.ackDelivery.mockResolvedValue(undefined); queueMocks.failDelivery.mockClear(); queueMocks.failDelivery.mockResolvedValue(undefined); + queueMocks.tryClaimActiveDelivery.mockClear(); + queueMocks.tryClaimActiveDelivery.mockReturnValue(true); + queueMocks.releaseActiveDelivery.mockClear(); logMocks.warn.mockClear(); }); @@ -958,6 +961,30 @@ describe("deliverOutboundPayloads", () => { expect(sendMatrix).not.toHaveBeenCalled(); }); + it("bails out without sending when a concurrent drain already claimed the queue entry", async () => { + // Regression for openclaw/openclaw#70386: if a reconnect or startup drain + // observes the newly enqueued entry and claims it before the live send + // path claims it, the live path must not send. The drain already owns + // ack/fail for that id; sending here would duplicate the outbound and + // race queue cleanup. + queueMocks.tryClaimActiveDelivery.mockReturnValueOnce(false); + const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" }); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hi" }], + deps: { matrix: sendMatrix }, + }); + + expect(results).toEqual([]); + expect(sendMatrix).not.toHaveBeenCalled(); + expect(queueMocks.ackDelivery).not.toHaveBeenCalled(); + expect(queueMocks.failDelivery).not.toHaveBeenCalled(); + expect(queueMocks.releaseActiveDelivery).not.toHaveBeenCalled(); + }); + it("acks the queue entry when delivery is aborted", async () => { const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" }); const abortController = new AbortController(); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 15836e4cc40..a4c65980a67 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -673,6 +673,15 @@ export async function deliverOutboundPayloads( // eligible by design to preserve crash replay) and produce duplicates. const heldActiveClaim = queueId ? tryClaimActiveDelivery(queueId) : false; + // If a concurrent reconnect/startup drain already claimed this queue entry + // in the window between enqueueDelivery resolving and this synchronous + // claim attempt, bail out of the live send and leave the queue entry in + // place. The drain already owns ack/fail for this id; sending here would + // duplicate the outbound message and race cleanup. + if (queueId && !heldActiveClaim) { + return []; + } + // Wrap onError to detect partial failures under bestEffort mode. // When bestEffort is true, per-payload errors are caught and passed to onError // without throwing — so the outer try/catch never fires. We track whether any