mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 12:00:44 +00:00
fix(outbound): hold active-delivery claim so reconnect drain skips live sends
Reconnect drain (drainPendingDeliveries) matches fresh pending entries by design to preserve crash-replay, but the live delivery path in deliverOutboundPayloads held no in-memory claim while the send was running. A reconnect firing mid-send therefore re-drove the same queue entry and produced duplicate outbound messages (e.g. WhatsApp cron sends going out 7-12x when the 30-minute inbound-silence watchdog fired during delivery). Claim the queueId against the existing entriesInProgress set right after enqueueDelivery and release it in the finally branch around ack/fail. Drain already skips claimed ids via claimRecoveryEntry, so no drain-side change is needed. The claim is process-local on purpose: a crashed owner leaves no claim behind, so startup recovery still reclaims orphaned entries. Fixes #70386. Made-with: Cursor
This commit is contained in:
committed by
Peter Steinberger
parent
adda0dcf20
commit
c94a8702c7
@@ -32,6 +32,7 @@ Docs: https://docs.openclaw.ai
|
||||
### Fixes
|
||||
|
||||
- Codex harness: route Codex-tagged MCP tool approval elicitations through OpenClaw plugin approvals, including current empty-schema app-server requests, while leaving generic user-input prompts fail-closed. (#68807) Thanks @kesslerio.
|
||||
- WhatsApp/outbound: hold an in-memory active-delivery claim while a live outbound send is in flight, so a concurrent reconnect drain no longer re-drives the same pending queue entry and duplicates cron sends 7-12x after the 30-minute inbound-silence watchdog fires mid-delivery. Crash-replay of fresh queue entries left behind by a dead process is preserved because the claim is intentionally process-local. Fixes #70386.
|
||||
- Providers/OpenAI: harden Voice Call realtime transcription against OpenAI Realtime session-update drift, forward language and prompt hints, and add live coverage for realtime STT.
|
||||
- Providers/Moonshot: stop strict-sanitizing Kimi's native tool_call IDs (shaped like `functions.<name>:<index>`) on the OpenAI-compatible transport, so multi-turn agentic flows through Kimi K2.6 no longer break after 2-3 tool-calling rounds when the serving layer fails to match mangled IDs against the original tool definitions. Adds a `sanitizeToolCallIds` opt-out to the shared `openai-compatible` replay family helper and wires Moonshot to it. Fixes #62319. (#70030) Thanks @LeoDu0314.
|
||||
- Dependencies/security: override transitive `uuid` to `14.0.0`, clearing the runtime advisory across dependencies.
|
||||
|
||||
@@ -36,6 +36,8 @@ const queueMocks = vi.hoisted(() => ({
|
||||
enqueueDelivery: vi.fn(async () => "mock-queue-id"),
|
||||
ackDelivery: vi.fn(async () => {}),
|
||||
failDelivery: vi.fn(async () => {}),
|
||||
tryClaimActiveDelivery: vi.fn<(entryId: string) => boolean>(() => true),
|
||||
releaseActiveDelivery: vi.fn<(entryId: string) => void>(() => {}),
|
||||
}));
|
||||
const logMocks = vi.hoisted(() => ({
|
||||
warn: vi.fn(),
|
||||
@@ -70,6 +72,8 @@ vi.mock("./delivery-queue.js", () => ({
|
||||
enqueueDelivery: queueMocks.enqueueDelivery,
|
||||
ackDelivery: queueMocks.ackDelivery,
|
||||
failDelivery: queueMocks.failDelivery,
|
||||
tryClaimActiveDelivery: queueMocks.tryClaimActiveDelivery,
|
||||
releaseActiveDelivery: queueMocks.releaseActiveDelivery,
|
||||
}));
|
||||
vi.mock("../../logging/subsystem.js", () => ({
|
||||
createSubsystemLogger: () => {
|
||||
|
||||
@@ -36,7 +36,13 @@ import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
import { formatErrorMessage } from "../errors.js";
|
||||
import { throwIfAborted } from "./abort.js";
|
||||
import type { OutboundDeliveryResult } from "./deliver-types.js";
|
||||
import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js";
|
||||
import {
|
||||
ackDelivery,
|
||||
enqueueDelivery,
|
||||
failDelivery,
|
||||
releaseActiveDelivery,
|
||||
tryClaimActiveDelivery,
|
||||
} from "./delivery-queue.js";
|
||||
import type { OutboundIdentity } from "./identity.js";
|
||||
import type { DeliveryMirror } from "./mirror.js";
|
||||
import {
|
||||
@@ -660,6 +666,13 @@ export async function deliverOutboundPayloads(
|
||||
gatewayClientScopes: params.gatewayClientScopes,
|
||||
}).catch(() => null); // Best-effort — don't block delivery if queue write fails.
|
||||
|
||||
// Claim the queue entry against the shared in-memory recovery set so a
|
||||
// concurrent reconnect/startup drain skips this id while the live send is
|
||||
// still running. Without this, a reconnect during an in-flight delivery can
|
||||
// re-drive the same entry (retryCount=0, lastAttemptAt=undefined is drain-
|
||||
// eligible by design to preserve crash replay) and produce duplicates.
|
||||
const heldActiveClaim = queueId ? tryClaimActiveDelivery(queueId) : false;
|
||||
|
||||
// Wrap onError to detect partial failures under bestEffort mode.
|
||||
// When bestEffort is true, per-payload errors are caught and passed to onError
|
||||
// without throwing — so the outer try/catch never fires. We track whether any
|
||||
@@ -694,6 +707,10 @@ export async function deliverOutboundPayloads(
|
||||
}
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
if (queueId && heldActiveClaim) {
|
||||
releaseActiveDelivery(queueId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -90,6 +90,22 @@ function releaseRecoveryEntry(entryId: string): void {
|
||||
entriesInProgress.delete(entryId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Claim an entry id against the shared in-memory recovery set so a concurrent
|
||||
* reconnect/startup drain will skip it while the owning caller is mid-flight.
|
||||
* Returns `false` if the id is already claimed. Callers must pair a successful
|
||||
* claim with {@link releaseActiveDelivery} in a `finally`. The claim is
|
||||
* process-local and intentionally does not survive a crash, so crash-replay
|
||||
* paths still recover fresh entries whose owning process died.
|
||||
*/
|
||||
export function tryClaimActiveDelivery(entryId: string): boolean {
|
||||
return claimRecoveryEntry(entryId);
|
||||
}
|
||||
|
||||
export function releaseActiveDelivery(entryId: string): void {
|
||||
releaseRecoveryEntry(entryId);
|
||||
}
|
||||
|
||||
function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) {
|
||||
return {
|
||||
cfg,
|
||||
|
||||
@@ -10,6 +10,8 @@ import {
|
||||
MAX_RETRIES,
|
||||
type RecoveryLogger,
|
||||
recoverPendingDeliveries,
|
||||
releaseActiveDelivery,
|
||||
tryClaimActiveDelivery,
|
||||
} from "./delivery-queue.js";
|
||||
import {
|
||||
createRecoveryLog,
|
||||
@@ -413,4 +415,35 @@ describe("drainPendingDeliveries for reconnect", () => {
|
||||
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("skips entries that an in-flight live delivery has actively claimed", async () => {
|
||||
// Regression for openclaw/openclaw#70386: a reconnect drain that runs
|
||||
// while the live send is still writing to the adapter must not re-drive
|
||||
// the same entry. The live delivery path holds an in-memory active claim
|
||||
// for `queueId` across its send; drain honors that claim via the same
|
||||
// `entriesInProgress` set used for startup recovery.
|
||||
const log = createRecoveryLog();
|
||||
const deliver = vi.fn<DeliverFn>(async () => {});
|
||||
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
|
||||
tmpDir,
|
||||
);
|
||||
|
||||
expect(tryClaimActiveDelivery(id)).toBe(true);
|
||||
try {
|
||||
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
expect(log.info).toHaveBeenCalledWith(
|
||||
expect.stringContaining(`entry ${id} is already being recovered`),
|
||||
);
|
||||
} finally {
|
||||
releaseActiveDelivery(id);
|
||||
}
|
||||
|
||||
// Once the live delivery path releases its claim (success or failure), a
|
||||
// later reconnect drain is free to pick the entry up again.
|
||||
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
|
||||
expect(deliver).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -15,6 +15,8 @@ export {
|
||||
isPermanentDeliveryError,
|
||||
MAX_RETRIES,
|
||||
recoverPendingDeliveries,
|
||||
releaseActiveDelivery,
|
||||
tryClaimActiveDelivery,
|
||||
} from "./delivery-queue-recovery.js";
|
||||
export type {
|
||||
DeliverFn,
|
||||
|
||||
Reference in New Issue
Block a user