From 6c196c913fc3fde51f3e2e1087ed94a6920fac1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?wangchunyue=28=E7=8E=8B=E6=98=A5=E8=B7=83=29?= <80630709+openperf@users.noreply.github.com> Date: Thu, 12 Mar 2026 15:01:19 +0800 Subject: [PATCH] fix(cron): prevent duplicate proactive delivery on transient retry (#40646) * fix(cron): prevent duplicate proactive delivery on transient retry * refactor: scope skipQueue to retryTransient path only Non-retrying direct delivery (structured content / thread) keeps the write-ahead queue so recoverPendingDeliveries can replay after a crash. Addresses review feedback from codex-connector. * fix: preserve write-ahead queue on initial delivery attempt The first call through retryTransientDirectCronDelivery now keeps the write-ahead queue entry so recoverPendingDeliveries can replay after a crash. Only subsequent retry attempts set skipQueue to prevent duplicate sends. Addresses second codex-connector review on ea5ae5c. * ci: retrigger checks * Cron: bypass write-ahead queue for direct isolated delivery * Tests: assert isolated cron skipQueue invariants * Changelog: add cron duplicate-delivery fix entry --------- Co-authored-by: Vincent Koc --- CHANGELOG.md | 1 + .../delivery-dispatch.double-announce.test.ts | 68 +++++++++++++++++++ src/cron/isolated-agent/delivery-dispatch.ts | 10 ++- 3 files changed, 78 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bd2517e4c8..029b1fa2024 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Cron/proactive delivery: keep isolated direct cron sends out of the write-ahead resend queue so transient-send retries do not replay duplicate proactive messages after restart. (#40646) Thanks @openperf and @vincentkoc. - TUI/chat log: reuse the active assistant message component for the same streaming run so `openclaw tui` no longer renders duplicate assistant replies. (#35364) Thanks @lisitan. - macOS/Reminders: add the missing `NSRemindersUsageDescription` to the bundled app so `apple-reminders` can trigger the system permission prompt from OpenClaw.app. (#8559) Thanks @dinakars777. - iMessage/self-chat echo dedupe: drop reflected duplicate copies only when a matching `is_from_me` event was just seen for the same chat, text, and `created_at`, preventing self-chat loops without broad text-only suppression. Related to #32166. (#38440) Thanks @vincentkoc. diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts index 9da88bbb4a3..2c7eb20a3c6 100644 --- a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -217,6 +217,9 @@ describe("dispatchCronDelivery — double-announce guard", () => { payloads: [{ text: "Detailed child result, everything finished successfully." }], }), ); + expect(deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ skipQueue: true }), + ); }); it("normal text delivery sends exactly once and sets deliveryAttempted=true", async () => { @@ -304,4 +307,69 @@ describe("dispatchCronDelivery — double-announce guard", () => { expect(deliverOutboundPayloads).not.toHaveBeenCalled(); expect(state.deliveryAttempted).toBe(false); }); + + it("text delivery always bypasses the write-ahead queue", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]); + + const params = makeBaseParams({ synthesizedText: "Daily digest ready." }); + const state = await dispatchCronDelivery(params); + + expect(state.delivered).toBe(true); + expect(state.deliveryAttempted).toBe(true); + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + + expect(deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "telegram", + to: "123456", + payloads: [{ text: "Daily digest ready." }], + skipQueue: true, + }), + ); + }); + + it("structured/thread delivery also bypasses the write-ahead queue", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]); + + const params = makeBaseParams({ synthesizedText: "Report attached." }); + // Simulate structured content so useDirectDelivery path is taken (no retryTransient) + (params as Record).deliveryPayloadHasStructuredContent = true; + await dispatchCronDelivery(params); + + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + expect(deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ skipQueue: true }), + ); + }); + + it("transient retry delivers exactly once with skipQueue on both attempts", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + + // First call throws a transient error, second call succeeds. + vi.mocked(deliverOutboundPayloads) + .mockRejectedValueOnce(new Error("gateway timeout")) + .mockResolvedValueOnce([{ ok: true } as never]); + + vi.stubEnv("OPENCLAW_TEST_FAST", "1"); + try { + const params = makeBaseParams({ synthesizedText: "Retry test." }); + const state = await dispatchCronDelivery(params); + + expect(state.delivered).toBe(true); + expect(state.deliveryAttempted).toBe(true); + // Two calls total: first failed transiently, second succeeded. + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(2); + + const calls = vi.mocked(deliverOutboundPayloads).mock.calls; + expect(calls[0][0]).toEqual(expect.objectContaining({ skipQueue: true })); + expect(calls[1][0]).toEqual(expect.objectContaining({ skipQueue: true })); + } finally { + vi.unstubAllEnvs(); + } + }); }); diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index fa9a295a777..a5dc0190b72 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -157,7 +157,9 @@ function isTransientDirectCronDeliveryError(error: unknown): boolean { } function resolveDirectCronRetryDelaysMs(): readonly number[] { - return process.env.OPENCLAW_TEST_FAST === "1" ? [8, 16, 32] : [5_000, 10_000, 20_000]; + return process.env.NODE_ENV === "test" && process.env.OPENCLAW_TEST_FAST === "1" + ? [8, 16, 32] + : [5_000, 10_000, 20_000]; } async function retryTransientDirectCronDelivery(params: { @@ -256,6 +258,12 @@ export async function dispatchCronDelivery( bestEffort: params.deliveryBestEffort, deps: createOutboundSendDeps(params.deps), abortSignal: params.abortSignal, + // Isolated cron direct delivery uses its own transient retry loop. + // Keep all attempts out of the write-ahead delivery queue so a + // late-successful first send cannot leave behind a failed queue + // entry that replays on the next restart. + // See: https://github.com/openclaw/openclaw/issues/40545 + skipQueue: true, }); const deliveryResults = options?.retryTransient ? await retryTransientDirectCronDelivery({