mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:50:43 +00:00
fix(outbound): bail live delivery when a concurrent drain wins the claim
If a reconnect/startup drain observes the newly enqueued queue entry and calls claimRecoveryEntry before the live delivery path reaches tryClaimActiveDelivery, tryClaimActiveDelivery returns false. Previously the live path still proceeded to deliverOutboundPayloadsCore and then ack/fail, which would race the drain's own delivery and ack/fail for the same entry id and produce duplicate outbound messages. Treat a failed claim acquisition as "another in-process owner is already handling this queue entry" and bail out with an empty result array, leaving the queue entry in place for the drain to deliver and clean up. This closes the narrow residual race called out by the Aisle security review on openclaw/openclaw#70428. Made-with: Cursor
This commit is contained in:
committed by
Peter Steinberger
parent
c94a8702c7
commit
ca83f0fd7a
@@ -266,6 +266,9 @@ describe("deliverOutboundPayloads", () => {
|
||||
queueMocks.ackDelivery.mockResolvedValue(undefined);
|
||||
queueMocks.failDelivery.mockClear();
|
||||
queueMocks.failDelivery.mockResolvedValue(undefined);
|
||||
queueMocks.tryClaimActiveDelivery.mockClear();
|
||||
queueMocks.tryClaimActiveDelivery.mockReturnValue(true);
|
||||
queueMocks.releaseActiveDelivery.mockClear();
|
||||
logMocks.warn.mockClear();
|
||||
});
|
||||
|
||||
@@ -958,6 +961,30 @@ describe("deliverOutboundPayloads", () => {
|
||||
expect(sendMatrix).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("bails out without sending when a concurrent drain already claimed the queue entry", async () => {
|
||||
// Regression for openclaw/openclaw#70386: if a reconnect or startup drain
|
||||
// observes the newly enqueued entry and claims it before the live send
|
||||
// path claims it, the live path must not send. The drain already owns
|
||||
// ack/fail for that id; sending here would duplicate the outbound and
|
||||
// race queue cleanup.
|
||||
queueMocks.tryClaimActiveDelivery.mockReturnValueOnce(false);
|
||||
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" });
|
||||
|
||||
const results = await deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "matrix",
|
||||
to: "!room:example",
|
||||
payloads: [{ text: "hi" }],
|
||||
deps: { matrix: sendMatrix },
|
||||
});
|
||||
|
||||
expect(results).toEqual([]);
|
||||
expect(sendMatrix).not.toHaveBeenCalled();
|
||||
expect(queueMocks.ackDelivery).not.toHaveBeenCalled();
|
||||
expect(queueMocks.failDelivery).not.toHaveBeenCalled();
|
||||
expect(queueMocks.releaseActiveDelivery).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("acks the queue entry when delivery is aborted", async () => {
|
||||
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" });
|
||||
const abortController = new AbortController();
|
||||
|
||||
@@ -673,6 +673,15 @@ export async function deliverOutboundPayloads(
|
||||
// eligible by design to preserve crash replay) and produce duplicates.
|
||||
const heldActiveClaim = queueId ? tryClaimActiveDelivery(queueId) : false;
|
||||
|
||||
// If a concurrent reconnect/startup drain already claimed this queue entry
|
||||
// in the window between enqueueDelivery resolving and this synchronous
|
||||
// claim attempt, bail out of the live send and leave the queue entry in
|
||||
// place. The drain already owns ack/fail for this id; sending here would
|
||||
// duplicate the outbound message and race cleanup.
|
||||
if (queueId && !heldActiveClaim) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Wrap onError to detect partial failures under bestEffort mode.
|
||||
// When bestEffort is true, per-payload errors are caught and passed to onError
|
||||
// without throwing — so the outer try/catch never fires. We track whether any
|
||||
|
||||
Reference in New Issue
Block a user