diff --git a/CHANGELOG.md b/CHANGELOG.md index 9315366ee78..d5d9d4aed76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,7 +32,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Codex harness: route Codex-tagged MCP tool approval elicitations through OpenClaw plugin approvals, including current empty-schema app-server requests, while leaving generic user-input prompts fail-closed. (#68807) Thanks @kesslerio. -- WhatsApp/outbound: hold an in-memory active-delivery claim while a live outbound send is in flight, so a concurrent reconnect drain no longer re-drives the same pending queue entry and duplicates cron sends 7-12x after the 30-minute inbound-silence watchdog fires mid-delivery. Crash-replay of fresh queue entries left behind by a dead process is preserved because the claim is intentionally process-local. Fixes #70386. +- WhatsApp/outbound: hold an in-memory active-delivery claim while a live outbound send is in flight, so a concurrent reconnect drain no longer re-drives the same pending queue entry and duplicates cron sends 7-12x after the 30-minute inbound-silence watchdog fires mid-delivery. Crash-replay of fresh queue entries left behind by a dead process is preserved because the claim is intentionally process-local. Fixes #70386. (#70428) Thanks @neeravmakwana. - Providers/OpenAI: harden Voice Call realtime transcription against OpenAI Realtime session-update drift, forward language and prompt hints, and add live coverage for realtime STT. - Providers/Moonshot: stop strict-sanitizing Kimi's native tool_call IDs (shaped like `functions.:`) on the OpenAI-compatible transport, so multi-turn agentic flows through Kimi K2.6 no longer break after 2-3 tool-calling rounds when the serving layer fails to match mangled IDs against the original tool definitions. Adds a `sanitizeToolCallIds` opt-out to the shared `openai-compatible` replay family helper and wires Moonshot to it. Fixes #62319. (#70030) Thanks @LeoDu0314. - Dependencies/security: override transitive `uuid` to `14.0.0`, clearing the runtime advisory across dependencies. diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 6e6b62e2c24..21b7c990ee0 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -36,8 +36,12 @@ const queueMocks = vi.hoisted(() => ({ enqueueDelivery: vi.fn(async () => "mock-queue-id"), ackDelivery: vi.fn(async () => {}), failDelivery: vi.fn(async () => {}), - tryClaimActiveDelivery: vi.fn<(entryId: string) => boolean>(() => true), - releaseActiveDelivery: vi.fn<(entryId: string) => void>(() => {}), + withActiveDeliveryClaim: vi.fn< + ( + entryId: string, + fn: () => Promise, + ) => Promise<{ status: "claimed"; value: unknown } | { status: "claimed-by-other-owner" }> + >(async (_entryId, fn) => ({ status: "claimed", value: await fn() })), })); const logMocks = vi.hoisted(() => ({ warn: vi.fn(), @@ -72,8 +76,7 @@ vi.mock("./delivery-queue.js", () => ({ enqueueDelivery: queueMocks.enqueueDelivery, ackDelivery: queueMocks.ackDelivery, failDelivery: queueMocks.failDelivery, - tryClaimActiveDelivery: queueMocks.tryClaimActiveDelivery, - releaseActiveDelivery: queueMocks.releaseActiveDelivery, + withActiveDeliveryClaim: queueMocks.withActiveDeliveryClaim, })); vi.mock("../../logging/subsystem.js", () => ({ createSubsystemLogger: () => { @@ -266,9 +269,11 @@ describe("deliverOutboundPayloads", () => { queueMocks.ackDelivery.mockResolvedValue(undefined); queueMocks.failDelivery.mockClear(); queueMocks.failDelivery.mockResolvedValue(undefined); - queueMocks.tryClaimActiveDelivery.mockClear(); - queueMocks.tryClaimActiveDelivery.mockReturnValue(true); - queueMocks.releaseActiveDelivery.mockClear(); + queueMocks.withActiveDeliveryClaim.mockClear(); + queueMocks.withActiveDeliveryClaim.mockImplementation(async (_entryId, fn) => ({ + status: "claimed", + value: await fn(), + })); logMocks.warn.mockClear(); }); @@ -967,7 +972,9 @@ describe("deliverOutboundPayloads", () => { // path claims it, the live path must not send. The drain already owns // ack/fail for that id; sending here would duplicate the outbound and // race queue cleanup. - queueMocks.tryClaimActiveDelivery.mockReturnValueOnce(false); + queueMocks.withActiveDeliveryClaim.mockResolvedValueOnce({ + status: "claimed-by-other-owner", + }); const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" }); const results = await deliverOutboundPayloads({ @@ -982,7 +989,6 @@ describe("deliverOutboundPayloads", () => { expect(sendMatrix).not.toHaveBeenCalled(); expect(queueMocks.ackDelivery).not.toHaveBeenCalled(); expect(queueMocks.failDelivery).not.toHaveBeenCalled(); - expect(queueMocks.releaseActiveDelivery).not.toHaveBeenCalled(); }); it("acks the queue entry when delivery is aborted", async () => { diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index a4c65980a67..26688b30938 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -40,8 +40,7 @@ import { ackDelivery, enqueueDelivery, failDelivery, - releaseActiveDelivery, - tryClaimActiveDelivery, + withActiveDeliveryClaim, } from "./delivery-queue.js"; import type { OutboundIdentity } from "./identity.js"; import type { DeliveryMirror } from "./mirror.js"; @@ -666,22 +665,25 @@ export async function deliverOutboundPayloads( gatewayClientScopes: params.gatewayClientScopes, }).catch(() => null); // Best-effort — don't block delivery if queue write fails. - // Claim the queue entry against the shared in-memory recovery set so a - // concurrent reconnect/startup drain skips this id while the live send is - // still running. Without this, a reconnect during an in-flight delivery can - // re-drive the same entry (retryCount=0, lastAttemptAt=undefined is drain- - // eligible by design to preserve crash replay) and produce duplicates. - const heldActiveClaim = queueId ? tryClaimActiveDelivery(queueId) : false; - - // If a concurrent reconnect/startup drain already claimed this queue entry - // in the window between enqueueDelivery resolving and this synchronous - // claim attempt, bail out of the live send and leave the queue entry in - // place. The drain already owns ack/fail for this id; sending here would - // duplicate the outbound message and race cleanup. - if (queueId && !heldActiveClaim) { - return []; + if (!queueId) { + return await deliverOutboundPayloadsWithQueueCleanup(params, null); } + // Hold the same in-process claim used by recovery/drain while the live send + // owns this queue entry. + const claimResult = await withActiveDeliveryClaim(queueId, () => + deliverOutboundPayloadsWithQueueCleanup(params, queueId), + ); + if (claimResult.status === "claimed-by-other-owner") { + return []; + } + return claimResult.value; +} + +async function deliverOutboundPayloadsWithQueueCleanup( + params: DeliverOutboundPayloadsParams, + queueId: string | null, +): Promise { // Wrap onError to detect partial failures under bestEffort mode. // When bestEffort is true, per-payload errors are caught and passed to onError // without throwing — so the outer try/catch never fires. We track whether any @@ -716,10 +718,6 @@ export async function deliverOutboundPayloads( } } throw err; - } finally { - if (queueId && heldActiveClaim) { - releaseActiveDelivery(queueId); - } } } diff --git a/src/infra/outbound/delivery-queue-recovery.ts b/src/infra/outbound/delivery-queue-recovery.ts index c8874213ece..4bfd20cd4fc 100644 --- a/src/infra/outbound/delivery-queue-recovery.ts +++ b/src/infra/outbound/delivery-queue-recovery.ts @@ -36,6 +36,10 @@ export interface PendingDeliveryDrainDecision { bypassBackoff?: boolean; } +export type ActiveDeliveryClaimResult = + | { status: "claimed"; value: T } + | { status: "claimed-by-other-owner" }; + const MAX_RETRIES = 5; /** Backoff delays in milliseconds indexed by retry count (1-based). */ @@ -90,20 +94,19 @@ function releaseRecoveryEntry(entryId: string): void { entriesInProgress.delete(entryId); } -/** - * Claim an entry id against the shared in-memory recovery set so a concurrent - * reconnect/startup drain will skip it while the owning caller is mid-flight. - * Returns `false` if the id is already claimed. Callers must pair a successful - * claim with {@link releaseActiveDelivery} in a `finally`. The claim is - * process-local and intentionally does not survive a crash, so crash-replay - * paths still recover fresh entries whose owning process died. - */ -export function tryClaimActiveDelivery(entryId: string): boolean { - return claimRecoveryEntry(entryId); -} +export async function withActiveDeliveryClaim( + entryId: string, + fn: () => Promise, +): Promise> { + if (!claimRecoveryEntry(entryId)) { + return { status: "claimed-by-other-owner" }; + } -export function releaseActiveDelivery(entryId: string): void { - releaseRecoveryEntry(entryId); + try { + return { status: "claimed", value: await fn() }; + } finally { + releaseRecoveryEntry(entryId); + } } function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) { @@ -246,15 +249,8 @@ export async function drainPendingDeliveries(opts: { const now = Date.now(); const deliver = opts.deliver; const matchingEntries = (await loadPendingDeliveries(opts.stateDir)) - .map((entry) => ({ - entry, - decision: opts.selectEntry(entry, now), - })) - .filter( - (item): item is { entry: QueuedDelivery; decision: PendingDeliveryDrainDecision } => - item.decision.match, - ) - .toSorted((a, b) => a.entry.enqueuedAt - b.entry.enqueuedAt); + .filter((entry) => opts.selectEntry(entry, now).match) + .toSorted((a, b) => a.enqueuedAt - b.enqueuedAt); if (matchingEntries.length === 0) { return; @@ -264,7 +260,7 @@ export async function drainPendingDeliveries(opts: { `${opts.logLabel}: ${matchingEntries.length} pending message(s) matched ${opts.drainKey}`, ); - for (const { entry, decision } of matchingEntries) { + for (const entry of matchingEntries) { if (!claimRecoveryEntry(entry.id)) { opts.log.info(`${opts.logLabel}: entry ${entry.id} is already being recovered`); continue; @@ -280,6 +276,12 @@ export async function drainPendingDeliveries(opts: { continue; } + const currentDecision = opts.selectEntry(currentEntry, Date.now()); + if (!currentDecision.match) { + opts.log.info(`${opts.logLabel}: entry ${currentEntry.id} no longer matches, skipping`); + continue; + } + if (currentEntry.retryCount >= MAX_RETRIES) { try { await moveToFailed(currentEntry.id, opts.stateDir); @@ -296,7 +298,7 @@ export async function drainPendingDeliveries(opts: { continue; } - if (!decision.bypassBackoff) { + if (!currentDecision.bypassBackoff) { const retryEligibility = isEntryEligibleForRecoveryRetry(currentEntry, Date.now()); if (!retryEligibility.eligible) { opts.log.info( diff --git a/src/infra/outbound/delivery-queue.reconnect-drain.test.ts b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts index f4234b5599c..76512fc8b85 100644 --- a/src/infra/outbound/delivery-queue.reconnect-drain.test.ts +++ b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts @@ -10,8 +10,7 @@ import { MAX_RETRIES, type RecoveryLogger, recoverPendingDeliveries, - releaseActiveDelivery, - tryClaimActiveDelivery, + withActiveDeliveryClaim, } from "./delivery-queue.js"; import { createRecoveryLog, @@ -416,6 +415,43 @@ describe("drainPendingDeliveries for reconnect", () => { expect(deliver).not.toHaveBeenCalled(); }); + it("recomputes backoff bypass after rereading the claimed entry", async () => { + const log = createRecoveryLog(); + const deliver = vi.fn(async () => {}); + const id = await enqueueFailedDirectChatDelivery({ accountId: "acct1", stateDir: tmpDir }); + const entryPath = path.join(tmpDir, "delivery-queue", `${id}.json`); + let mutated = false; + + await drainPendingDeliveries({ + drainKey: "directchat:acct1", + logLabel: "DirectChat reconnect drain", + cfg: stubCfg, + log, + stateDir: tmpDir, + deliver, + selectEntry: (entry) => { + if (entry.id === id && !mutated) { + mutated = true; + const nextEntry = JSON.parse(fs.readFileSync(entryPath, "utf-8")) as { + lastError?: string; + }; + nextEntry.lastError = "network down"; + fs.writeFileSync(entryPath, JSON.stringify(nextEntry, null, 2)); + } + return { + match: + entry.channel === "directchat" && + normalizeReconnectAccountIdForTest(entry.accountId) === "acct1", + bypassBackoff: + typeof entry.lastError === "string" && entry.lastError.includes(NO_LISTENER_ERROR), + }; + }, + }); + + expect(deliver).not.toHaveBeenCalled(); + expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet")); + }); + it("skips entries that an in-flight live delivery has actively claimed", async () => { // Regression for openclaw/openclaw#70386: a reconnect drain that runs // while the live send is still writing to the adapter must not re-drive @@ -430,16 +466,14 @@ describe("drainPendingDeliveries for reconnect", () => { tmpDir, ); - expect(tryClaimActiveDelivery(id)).toBe(true); - try { + const claimResult = await withActiveDeliveryClaim(id, async () => { await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); expect(deliver).not.toHaveBeenCalled(); expect(log.info).toHaveBeenCalledWith( expect.stringContaining(`entry ${id} is already being recovered`), ); - } finally { - releaseActiveDelivery(id); - } + }); + expect(claimResult.status).toBe("claimed"); // Once the live delivery path releases its claim (success or failure), a // later reconnect drain is free to pick the entry up again. diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index 78a167292ea..631fba50f86 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -15,10 +15,10 @@ export { isPermanentDeliveryError, MAX_RETRIES, recoverPendingDeliveries, - releaseActiveDelivery, - tryClaimActiveDelivery, + withActiveDeliveryClaim, } from "./delivery-queue-recovery.js"; export type { + ActiveDeliveryClaimResult, DeliverFn, PendingDeliveryDrainDecision, RecoveryLogger,