diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index eb250133be8..6e6b62e2c24 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -266,6 +266,9 @@ describe("deliverOutboundPayloads", () => { queueMocks.ackDelivery.mockResolvedValue(undefined); queueMocks.failDelivery.mockClear(); queueMocks.failDelivery.mockResolvedValue(undefined); + queueMocks.tryClaimActiveDelivery.mockClear(); + queueMocks.tryClaimActiveDelivery.mockReturnValue(true); + queueMocks.releaseActiveDelivery.mockClear(); logMocks.warn.mockClear(); }); @@ -958,6 +961,30 @@ describe("deliverOutboundPayloads", () => { expect(sendMatrix).not.toHaveBeenCalled(); }); + it("bails out without sending when a concurrent drain already claimed the queue entry", async () => { + // Regression for openclaw/openclaw#70386: if a reconnect or startup drain + // observes the newly enqueued entry and claims it before the live send + // path claims it, the live path must not send. The drain already owns + // ack/fail for that id; sending here would duplicate the outbound and + // race queue cleanup. + queueMocks.tryClaimActiveDelivery.mockReturnValueOnce(false); + const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" }); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hi" }], + deps: { matrix: sendMatrix }, + }); + + expect(results).toEqual([]); + expect(sendMatrix).not.toHaveBeenCalled(); + expect(queueMocks.ackDelivery).not.toHaveBeenCalled(); + expect(queueMocks.failDelivery).not.toHaveBeenCalled(); + expect(queueMocks.releaseActiveDelivery).not.toHaveBeenCalled(); + }); + it("acks the queue entry when delivery is aborted", async () => { const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" }); const abortController = new AbortController(); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 15836e4cc40..a4c65980a67 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -673,6 +673,15 @@ export async function deliverOutboundPayloads( // eligible by design to preserve crash replay) and produce duplicates. const heldActiveClaim = queueId ? tryClaimActiveDelivery(queueId) : false; + // If a concurrent reconnect/startup drain already claimed this queue entry + // in the window between enqueueDelivery resolving and this synchronous + // claim attempt, bail out of the live send and leave the queue entry in + // place. The drain already owns ack/fail for this id; sending here would + // duplicate the outbound message and race cleanup. + if (queueId && !heldActiveClaim) { + return []; + } + // Wrap onError to detect partial failures under bestEffort mode. // When bestEffort is true, per-payload errors are caught and passed to onError // without throwing — so the outer try/catch never fires. We track whether any