diff --git a/CHANGELOG.md b/CHANGELOG.md index 200c190eb7a..9315366ee78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Codex harness: route Codex-tagged MCP tool approval elicitations through OpenClaw plugin approvals, including current empty-schema app-server requests, while leaving generic user-input prompts fail-closed. (#68807) Thanks @kesslerio. +- WhatsApp/outbound: hold an in-memory active-delivery claim while a live outbound send is in flight, so a concurrent reconnect drain no longer re-drives the same pending queue entry and duplicates cron sends 7-12x after the 30-minute inbound-silence watchdog fires mid-delivery. Crash-replay of fresh queue entries left behind by a dead process is preserved because the claim is intentionally process-local. Fixes #70386. - Providers/OpenAI: harden Voice Call realtime transcription against OpenAI Realtime session-update drift, forward language and prompt hints, and add live coverage for realtime STT. - Providers/Moonshot: stop strict-sanitizing Kimi's native tool_call IDs (shaped like `functions.:`) on the OpenAI-compatible transport, so multi-turn agentic flows through Kimi K2.6 no longer break after 2-3 tool-calling rounds when the serving layer fails to match mangled IDs against the original tool definitions. Adds a `sanitizeToolCallIds` opt-out to the shared `openai-compatible` replay family helper and wires Moonshot to it. Fixes #62319. (#70030) Thanks @LeoDu0314. - Dependencies/security: override transitive `uuid` to `14.0.0`, clearing the runtime advisory across dependencies. diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index dcdcad69365..eb250133be8 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -36,6 +36,8 @@ const queueMocks = vi.hoisted(() => ({ enqueueDelivery: vi.fn(async () => "mock-queue-id"), ackDelivery: vi.fn(async () => {}), failDelivery: vi.fn(async () => {}), + tryClaimActiveDelivery: vi.fn<(entryId: string) => boolean>(() => true), + releaseActiveDelivery: vi.fn<(entryId: string) => void>(() => {}), })); const logMocks = vi.hoisted(() => ({ warn: vi.fn(), @@ -70,6 +72,8 @@ vi.mock("./delivery-queue.js", () => ({ enqueueDelivery: queueMocks.enqueueDelivery, ackDelivery: queueMocks.ackDelivery, failDelivery: queueMocks.failDelivery, + tryClaimActiveDelivery: queueMocks.tryClaimActiveDelivery, + releaseActiveDelivery: queueMocks.releaseActiveDelivery, })); vi.mock("../../logging/subsystem.js", () => ({ createSubsystemLogger: () => { diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index a727590f490..15836e4cc40 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -36,7 +36,13 @@ import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { formatErrorMessage } from "../errors.js"; import { throwIfAborted } from "./abort.js"; import type { OutboundDeliveryResult } from "./deliver-types.js"; -import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js"; +import { + ackDelivery, + enqueueDelivery, + failDelivery, + releaseActiveDelivery, + tryClaimActiveDelivery, +} from "./delivery-queue.js"; import type { OutboundIdentity } from "./identity.js"; import type { DeliveryMirror } from "./mirror.js"; import { @@ -660,6 +666,13 @@ export async function deliverOutboundPayloads( gatewayClientScopes: params.gatewayClientScopes, }).catch(() => null); // Best-effort — don't block delivery if queue write fails. + // Claim the queue entry against the shared in-memory recovery set so a + // concurrent reconnect/startup drain skips this id while the live send is + // still running. Without this, a reconnect during an in-flight delivery can + // re-drive the same entry (retryCount=0, lastAttemptAt=undefined is drain- + // eligible by design to preserve crash replay) and produce duplicates. + const heldActiveClaim = queueId ? tryClaimActiveDelivery(queueId) : false; + // Wrap onError to detect partial failures under bestEffort mode. // When bestEffort is true, per-payload errors are caught and passed to onError // without throwing — so the outer try/catch never fires. We track whether any @@ -694,6 +707,10 @@ export async function deliverOutboundPayloads( } } throw err; + } finally { + if (queueId && heldActiveClaim) { + releaseActiveDelivery(queueId); + } } } diff --git a/src/infra/outbound/delivery-queue-recovery.ts b/src/infra/outbound/delivery-queue-recovery.ts index 2b49783eb5e..c8874213ece 100644 --- a/src/infra/outbound/delivery-queue-recovery.ts +++ b/src/infra/outbound/delivery-queue-recovery.ts @@ -90,6 +90,22 @@ function releaseRecoveryEntry(entryId: string): void { entriesInProgress.delete(entryId); } +/** + * Claim an entry id against the shared in-memory recovery set so a concurrent + * reconnect/startup drain will skip it while the owning caller is mid-flight. + * Returns `false` if the id is already claimed. Callers must pair a successful + * claim with {@link releaseActiveDelivery} in a `finally`. The claim is + * process-local and intentionally does not survive a crash, so crash-replay + * paths still recover fresh entries whose owning process died. + */ +export function tryClaimActiveDelivery(entryId: string): boolean { + return claimRecoveryEntry(entryId); +} + +export function releaseActiveDelivery(entryId: string): void { + releaseRecoveryEntry(entryId); +} + function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) { return { cfg, diff --git a/src/infra/outbound/delivery-queue.reconnect-drain.test.ts b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts index 03e67083dcb..f4234b5599c 100644 --- a/src/infra/outbound/delivery-queue.reconnect-drain.test.ts +++ b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts @@ -10,6 +10,8 @@ import { MAX_RETRIES, type RecoveryLogger, recoverPendingDeliveries, + releaseActiveDelivery, + tryClaimActiveDelivery, } from "./delivery-queue.js"; import { createRecoveryLog, @@ -413,4 +415,35 @@ describe("drainPendingDeliveries for reconnect", () => { expect(deliver).not.toHaveBeenCalled(); }); + + it("skips entries that an in-flight live delivery has actively claimed", async () => { + // Regression for openclaw/openclaw#70386: a reconnect drain that runs + // while the live send is still writing to the adapter must not re-drive + // the same entry. The live delivery path holds an in-memory active claim + // for `queueId` across its send; drain honors that claim via the same + // `entriesInProgress` set used for startup recovery. + const log = createRecoveryLog(); + const deliver = vi.fn(async () => {}); + + const id = await enqueueDelivery( + { channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, + tmpDir, + ); + + expect(tryClaimActiveDelivery(id)).toBe(true); + try { + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); + expect(deliver).not.toHaveBeenCalled(); + expect(log.info).toHaveBeenCalledWith( + expect.stringContaining(`entry ${id} is already being recovered`), + ); + } finally { + releaseActiveDelivery(id); + } + + // Once the live delivery path releases its claim (success or failure), a + // later reconnect drain is free to pick the entry up again. + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); + expect(deliver).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index 78cfd1bff87..78a167292ea 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -15,6 +15,8 @@ export { isPermanentDeliveryError, MAX_RETRIES, recoverPendingDeliveries, + releaseActiveDelivery, + tryClaimActiveDelivery, } from "./delivery-queue-recovery.js"; export type { DeliverFn,