fix(outbound): centralize active delivery claims

This commit is contained in:
Peter Steinberger
2026-04-23 03:08:49 +01:00
parent ca83f0fd7a
commit aa27a9474f
6 changed files with 103 additions and 63 deletions

View File

@@ -32,7 +32,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Codex harness: route Codex-tagged MCP tool approval elicitations through OpenClaw plugin approvals, including current empty-schema app-server requests, while leaving generic user-input prompts fail-closed. (#68807) Thanks @kesslerio.
- WhatsApp/outbound: hold an in-memory active-delivery claim while a live outbound send is in flight, so a concurrent reconnect drain no longer re-drives the same pending queue entry and duplicates cron sends 7-12x after the 30-minute inbound-silence watchdog fires mid-delivery. Crash-replay of fresh queue entries left behind by a dead process is preserved because the claim is intentionally process-local. Fixes #70386.
- WhatsApp/outbound: hold an in-memory active-delivery claim while a live outbound send is in flight, so a concurrent reconnect drain no longer re-drives the same pending queue entry and duplicates cron sends 7-12x after the 30-minute inbound-silence watchdog fires mid-delivery. Crash-replay of fresh queue entries left behind by a dead process is preserved because the claim is intentionally process-local. Fixes #70386. (#70428) Thanks @neeravmakwana.
- Providers/OpenAI: harden Voice Call realtime transcription against OpenAI Realtime session-update drift, forward language and prompt hints, and add live coverage for realtime STT.
- Providers/Moonshot: stop strict-sanitizing Kimi's native tool_call IDs (shaped like `functions.<name>:<index>`) on the OpenAI-compatible transport, so multi-turn agentic flows through Kimi K2.6 no longer break after 2-3 tool-calling rounds when the serving layer fails to match mangled IDs against the original tool definitions. Adds a `sanitizeToolCallIds` opt-out to the shared `openai-compatible` replay family helper and wires Moonshot to it. Fixes #62319. (#70030) Thanks @LeoDu0314.
- Dependencies/security: override transitive `uuid` to `14.0.0`, clearing the runtime advisory across dependencies.

View File

@@ -36,8 +36,12 @@ const queueMocks = vi.hoisted(() => ({
enqueueDelivery: vi.fn(async () => "mock-queue-id"),
ackDelivery: vi.fn(async () => {}),
failDelivery: vi.fn(async () => {}),
tryClaimActiveDelivery: vi.fn<(entryId: string) => boolean>(() => true),
releaseActiveDelivery: vi.fn<(entryId: string) => void>(() => {}),
withActiveDeliveryClaim: vi.fn<
(
entryId: string,
fn: () => Promise<unknown>,
) => Promise<{ status: "claimed"; value: unknown } | { status: "claimed-by-other-owner" }>
>(async (_entryId, fn) => ({ status: "claimed", value: await fn() })),
}));
const logMocks = vi.hoisted(() => ({
warn: vi.fn(),
@@ -72,8 +76,7 @@ vi.mock("./delivery-queue.js", () => ({
enqueueDelivery: queueMocks.enqueueDelivery,
ackDelivery: queueMocks.ackDelivery,
failDelivery: queueMocks.failDelivery,
tryClaimActiveDelivery: queueMocks.tryClaimActiveDelivery,
releaseActiveDelivery: queueMocks.releaseActiveDelivery,
withActiveDeliveryClaim: queueMocks.withActiveDeliveryClaim,
}));
vi.mock("../../logging/subsystem.js", () => ({
createSubsystemLogger: () => {
@@ -266,9 +269,11 @@ describe("deliverOutboundPayloads", () => {
queueMocks.ackDelivery.mockResolvedValue(undefined);
queueMocks.failDelivery.mockClear();
queueMocks.failDelivery.mockResolvedValue(undefined);
queueMocks.tryClaimActiveDelivery.mockClear();
queueMocks.tryClaimActiveDelivery.mockReturnValue(true);
queueMocks.releaseActiveDelivery.mockClear();
queueMocks.withActiveDeliveryClaim.mockClear();
queueMocks.withActiveDeliveryClaim.mockImplementation(async (_entryId, fn) => ({
status: "claimed",
value: await fn(),
}));
logMocks.warn.mockClear();
});
@@ -967,7 +972,9 @@ describe("deliverOutboundPayloads", () => {
// path claims it, the live path must not send. The drain already owns
// ack/fail for that id; sending here would duplicate the outbound and
// race queue cleanup.
queueMocks.tryClaimActiveDelivery.mockReturnValueOnce(false);
queueMocks.withActiveDeliveryClaim.mockResolvedValueOnce({
status: "claimed-by-other-owner",
});
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" });
const results = await deliverOutboundPayloads({
@@ -982,7 +989,6 @@ describe("deliverOutboundPayloads", () => {
expect(sendMatrix).not.toHaveBeenCalled();
expect(queueMocks.ackDelivery).not.toHaveBeenCalled();
expect(queueMocks.failDelivery).not.toHaveBeenCalled();
expect(queueMocks.releaseActiveDelivery).not.toHaveBeenCalled();
});
it("acks the queue entry when delivery is aborted", async () => {

View File

@@ -40,8 +40,7 @@ import {
ackDelivery,
enqueueDelivery,
failDelivery,
releaseActiveDelivery,
tryClaimActiveDelivery,
withActiveDeliveryClaim,
} from "./delivery-queue.js";
import type { OutboundIdentity } from "./identity.js";
import type { DeliveryMirror } from "./mirror.js";
@@ -666,22 +665,25 @@ export async function deliverOutboundPayloads(
gatewayClientScopes: params.gatewayClientScopes,
}).catch(() => null); // Best-effort — don't block delivery if queue write fails.
// Claim the queue entry against the shared in-memory recovery set so a
// concurrent reconnect/startup drain skips this id while the live send is
// still running. Without this, a reconnect during an in-flight delivery can
// re-drive the same entry (retryCount=0, lastAttemptAt=undefined is drain-
// eligible by design to preserve crash replay) and produce duplicates.
const heldActiveClaim = queueId ? tryClaimActiveDelivery(queueId) : false;
// If a concurrent reconnect/startup drain already claimed this queue entry
// in the window between enqueueDelivery resolving and this synchronous
// claim attempt, bail out of the live send and leave the queue entry in
// place. The drain already owns ack/fail for this id; sending here would
// duplicate the outbound message and race cleanup.
if (queueId && !heldActiveClaim) {
return [];
if (!queueId) {
return await deliverOutboundPayloadsWithQueueCleanup(params, null);
}
// Hold the same in-process claim used by recovery/drain while the live send
// owns this queue entry.
const claimResult = await withActiveDeliveryClaim(queueId, () =>
deliverOutboundPayloadsWithQueueCleanup(params, queueId),
);
if (claimResult.status === "claimed-by-other-owner") {
return [];
}
return claimResult.value;
}
async function deliverOutboundPayloadsWithQueueCleanup(
params: DeliverOutboundPayloadsParams,
queueId: string | null,
): Promise<OutboundDeliveryResult[]> {
// Wrap onError to detect partial failures under bestEffort mode.
// When bestEffort is true, per-payload errors are caught and passed to onError
// without throwing — so the outer try/catch never fires. We track whether any
@@ -716,10 +718,6 @@ export async function deliverOutboundPayloads(
}
}
throw err;
} finally {
if (queueId && heldActiveClaim) {
releaseActiveDelivery(queueId);
}
}
}

View File

@@ -36,6 +36,10 @@ export interface PendingDeliveryDrainDecision {
bypassBackoff?: boolean;
}
export type ActiveDeliveryClaimResult<T> =
| { status: "claimed"; value: T }
| { status: "claimed-by-other-owner" };
const MAX_RETRIES = 5;
/** Backoff delays in milliseconds indexed by retry count (1-based). */
@@ -90,20 +94,19 @@ function releaseRecoveryEntry(entryId: string): void {
entriesInProgress.delete(entryId);
}
/**
* Claim an entry id against the shared in-memory recovery set so a concurrent
* reconnect/startup drain will skip it while the owning caller is mid-flight.
* Returns `false` if the id is already claimed. Callers must pair a successful
* claim with {@link releaseActiveDelivery} in a `finally`. The claim is
* process-local and intentionally does not survive a crash, so crash-replay
* paths still recover fresh entries whose owning process died.
*/
export function tryClaimActiveDelivery(entryId: string): boolean {
return claimRecoveryEntry(entryId);
}
export async function withActiveDeliveryClaim<T>(
entryId: string,
fn: () => Promise<T>,
): Promise<ActiveDeliveryClaimResult<T>> {
if (!claimRecoveryEntry(entryId)) {
return { status: "claimed-by-other-owner" };
}
export function releaseActiveDelivery(entryId: string): void {
releaseRecoveryEntry(entryId);
try {
return { status: "claimed", value: await fn() };
} finally {
releaseRecoveryEntry(entryId);
}
}
function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) {
@@ -246,15 +249,8 @@ export async function drainPendingDeliveries(opts: {
const now = Date.now();
const deliver = opts.deliver;
const matchingEntries = (await loadPendingDeliveries(opts.stateDir))
.map((entry) => ({
entry,
decision: opts.selectEntry(entry, now),
}))
.filter(
(item): item is { entry: QueuedDelivery; decision: PendingDeliveryDrainDecision } =>
item.decision.match,
)
.toSorted((a, b) => a.entry.enqueuedAt - b.entry.enqueuedAt);
.filter((entry) => opts.selectEntry(entry, now).match)
.toSorted((a, b) => a.enqueuedAt - b.enqueuedAt);
if (matchingEntries.length === 0) {
return;
@@ -264,7 +260,7 @@ export async function drainPendingDeliveries(opts: {
`${opts.logLabel}: ${matchingEntries.length} pending message(s) matched ${opts.drainKey}`,
);
for (const { entry, decision } of matchingEntries) {
for (const entry of matchingEntries) {
if (!claimRecoveryEntry(entry.id)) {
opts.log.info(`${opts.logLabel}: entry ${entry.id} is already being recovered`);
continue;
@@ -280,6 +276,12 @@ export async function drainPendingDeliveries(opts: {
continue;
}
const currentDecision = opts.selectEntry(currentEntry, Date.now());
if (!currentDecision.match) {
opts.log.info(`${opts.logLabel}: entry ${currentEntry.id} no longer matches, skipping`);
continue;
}
if (currentEntry.retryCount >= MAX_RETRIES) {
try {
await moveToFailed(currentEntry.id, opts.stateDir);
@@ -296,7 +298,7 @@ export async function drainPendingDeliveries(opts: {
continue;
}
if (!decision.bypassBackoff) {
if (!currentDecision.bypassBackoff) {
const retryEligibility = isEntryEligibleForRecoveryRetry(currentEntry, Date.now());
if (!retryEligibility.eligible) {
opts.log.info(

View File

@@ -10,8 +10,7 @@ import {
MAX_RETRIES,
type RecoveryLogger,
recoverPendingDeliveries,
releaseActiveDelivery,
tryClaimActiveDelivery,
withActiveDeliveryClaim,
} from "./delivery-queue.js";
import {
createRecoveryLog,
@@ -416,6 +415,43 @@ describe("drainPendingDeliveries for reconnect", () => {
expect(deliver).not.toHaveBeenCalled();
});
it("recomputes backoff bypass after rereading the claimed entry", async () => {
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
const id = await enqueueFailedDirectChatDelivery({ accountId: "acct1", stateDir: tmpDir });
const entryPath = path.join(tmpDir, "delivery-queue", `${id}.json`);
let mutated = false;
await drainPendingDeliveries({
drainKey: "directchat:acct1",
logLabel: "DirectChat reconnect drain",
cfg: stubCfg,
log,
stateDir: tmpDir,
deliver,
selectEntry: (entry) => {
if (entry.id === id && !mutated) {
mutated = true;
const nextEntry = JSON.parse(fs.readFileSync(entryPath, "utf-8")) as {
lastError?: string;
};
nextEntry.lastError = "network down";
fs.writeFileSync(entryPath, JSON.stringify(nextEntry, null, 2));
}
return {
match:
entry.channel === "directchat" &&
normalizeReconnectAccountIdForTest(entry.accountId) === "acct1",
bypassBackoff:
typeof entry.lastError === "string" && entry.lastError.includes(NO_LISTENER_ERROR),
};
},
});
expect(deliver).not.toHaveBeenCalled();
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet"));
});
it("skips entries that an in-flight live delivery has actively claimed", async () => {
// Regression for openclaw/openclaw#70386: a reconnect drain that runs
// while the live send is still writing to the adapter must not re-drive
@@ -430,16 +466,14 @@ describe("drainPendingDeliveries for reconnect", () => {
tmpDir,
);
expect(tryClaimActiveDelivery(id)).toBe(true);
try {
const claimResult = await withActiveDeliveryClaim(id, async () => {
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
expect(deliver).not.toHaveBeenCalled();
expect(log.info).toHaveBeenCalledWith(
expect.stringContaining(`entry ${id} is already being recovered`),
);
} finally {
releaseActiveDelivery(id);
}
});
expect(claimResult.status).toBe("claimed");
// Once the live delivery path releases its claim (success or failure), a
// later reconnect drain is free to pick the entry up again.

View File

@@ -15,10 +15,10 @@ export {
isPermanentDeliveryError,
MAX_RETRIES,
recoverPendingDeliveries,
releaseActiveDelivery,
tryClaimActiveDelivery,
withActiveDeliveryClaim,
} from "./delivery-queue-recovery.js";
export type {
ActiveDeliveryClaimResult,
DeliverFn,
PendingDeliveryDrainDecision,
RecoveryLogger,