diff --git a/CHANGELOG.md b/CHANGELOG.md index 7de75be6b10..b4486f2e833 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai - Reply/secrets: resolve active reply channel/account SecretRefs before reply-run message-action discovery so channel token SecretRefs (for example Discord) do not degrade into discovery-time unresolved-secret failures. (#66796) Thanks @joshavant. - Agents/Anthropic: ignore non-positive Anthropic Messages token overrides and fail locally when no positive token budget remains, so invalid `max_tokens` values no longer reach the provider API. (#66664) thanks @jalehman - Agents/context engines: preserve prompt-only token counts, not full request totals, when deferred maintenance reuses after-turn runtime context so background compaction bookkeeping matches the active prompt window. (#66820) thanks @jalehman. +- BlueBubbles/inbound: add a persistent file-backed GUID dedupe so MessagePoller webhook replays after BB Server restart or reconnect no longer cause the agent to re-reply to already-handled messages. (#19176, #12053, #66816) Thanks @omarshahine. ## 2026.4.14 diff --git a/extensions/bluebubbles/src/inbound-dedupe.test.ts b/extensions/bluebubbles/src/inbound-dedupe.test.ts new file mode 100644 index 00000000000..cd78be834bc --- /dev/null +++ b/extensions/bluebubbles/src/inbound-dedupe.test.ts @@ -0,0 +1,58 @@ +import { beforeEach, describe, expect, it } from "vitest"; +import { + _resetBlueBubblesInboundDedupForTest, + claimBlueBubblesInboundMessage, +} from "./inbound-dedupe.js"; + +async function claimAndFinalize(guid: string | undefined, accountId: string): Promise { + const claim = await claimBlueBubblesInboundMessage({ guid, accountId }); + if (claim.kind === "claimed") { + await claim.finalize(); + } + return claim.kind; +} + +describe("claimBlueBubblesInboundMessage", () => { + beforeEach(() => { + _resetBlueBubblesInboundDedupForTest(); + }); + + it("claims a new guid and rejects committed duplicates", async () => { + expect(await claimAndFinalize("g1", "acc")).toBe("claimed"); + expect(await claimAndFinalize("g1", "acc")).toBe("duplicate"); + }); + + it("scopes dedupe per account", async () => { + expect(await claimAndFinalize("g1", "a")).toBe("claimed"); + expect(await claimAndFinalize("g1", "b")).toBe("claimed"); + }); + + it("reports skip when guid is missing or blank", async () => { + expect((await claimBlueBubblesInboundMessage({ guid: undefined, accountId: "acc" })).kind).toBe( + "skip", + ); + expect((await claimBlueBubblesInboundMessage({ guid: "", accountId: "acc" })).kind).toBe( + "skip", + ); + expect((await claimBlueBubblesInboundMessage({ guid: " ", accountId: "acc" })).kind).toBe( + "skip", + ); + }); + + it("rejects overlong guids to cap on-disk size", async () => { + const huge = "x".repeat(10_000); + expect((await claimBlueBubblesInboundMessage({ guid: huge, accountId: "acc" })).kind).toBe( + "skip", + ); + }); + + it("releases the claim so a later replay can retry after a transient failure", async () => { + const first = await claimBlueBubblesInboundMessage({ guid: "g1", accountId: "acc" }); + expect(first.kind).toBe("claimed"); + if (first.kind === "claimed") { + first.release(); + } + // Released claims should be re-claimable on the next delivery. + expect(await claimAndFinalize("g1", "acc")).toBe("claimed"); + }); +}); diff --git a/extensions/bluebubbles/src/inbound-dedupe.ts b/extensions/bluebubbles/src/inbound-dedupe.ts new file mode 100644 index 00000000000..8327b899710 --- /dev/null +++ b/extensions/bluebubbles/src/inbound-dedupe.ts @@ -0,0 +1,172 @@ +import { createHash } from "node:crypto"; +import path from "node:path"; +import { type ClaimableDedupe, createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; +import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; +import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; +import type { NormalizedWebhookMessage } from "./monitor-normalize.js"; + +// BlueBubbles has no sequence/ack in its webhook protocol, and its +// MessagePoller replays its ~1-week lookback window as `new-message` events +// after BB Server restarts or reconnects. Without persistent dedup, the +// gateway can reply to messages that were already handled before a restart +// (see issues #19176, #12053). +// +// TTL matches BB's lookback window so any replay is guaranteed to land on +// a remembered GUID, and the file-backed store survives gateway restarts. +const DEDUP_TTL_MS = 7 * 24 * 60 * 60 * 1_000; +const MEMORY_MAX_SIZE = 5_000; +const FILE_MAX_ENTRIES = 50_000; +// Cap GUID length so a malformed or hostile payload can't bloat the on-disk +// dedupe file. Real BB GUIDs are short (<64 chars); 512 is generous. +const MAX_GUID_CHARS = 512; + +function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { + if (env.VITEST || env.NODE_ENV === "test") { + // Isolate tests from real ~/.openclaw state without sharing across tests. + // Stable-per-pid so the scoped dedupe test can observe persistence. + const name = "openclaw-vitest-" + process.pid; + return path.join(resolvePreferredOpenClawTmpDir(), name); + } + // Canonical OpenClaw state dir: honors OPENCLAW_STATE_DIR (with `~` expansion + // via resolveUserPath), plus legacy/new fallback. Using the shared helper + // keeps this plugin's persistence aligned with the rest of OpenClaw state. + return resolveStateDir(env); +} + +function resolveNamespaceFilePath(namespace: string): string { + // Keep a readable prefix for operator debugging, but suffix with a short + // hash of the raw namespace so account IDs that only differ by + // filesystem-unsafe characters (e.g. "acct/a" vs "acct:a") don't collapse + // onto the same file. + const safePrefix = namespace.replace(/[^a-zA-Z0-9_-]/g, "_") || "ns"; + const hash = createHash("sha256").update(namespace, "utf8").digest("hex").slice(0, 12); + return path.join( + resolveStateDirFromEnv(), + "bluebubbles", + "inbound-dedupe", + `${safePrefix}__${hash}.json`, + ); +} + +function buildPersistentImpl(): ClaimableDedupe { + return createClaimableDedupe({ + ttlMs: DEDUP_TTL_MS, + memoryMaxSize: MEMORY_MAX_SIZE, + fileMaxEntries: FILE_MAX_ENTRIES, + resolveFilePath: resolveNamespaceFilePath, + }); +} + +function buildMemoryOnlyImpl(): ClaimableDedupe { + return createClaimableDedupe({ + ttlMs: DEDUP_TTL_MS, + memoryMaxSize: MEMORY_MAX_SIZE, + }); +} + +let impl: ClaimableDedupe = buildPersistentImpl(); + +function sanitizeGuid(guid: string | undefined | null): string | null { + const trimmed = guid?.trim(); + if (!trimmed) { + return null; + } + if (trimmed.length > MAX_GUID_CHARS) { + return null; + } + return trimmed; +} + +/** + * Resolve the canonical dedupe key for a BlueBubbles inbound message. + * + * Mirrors `monitor-debounce.ts`'s `buildKey`: BlueBubbles sends URL-preview + * / sticker "balloon" events with a different `messageId` than the text + * message they belong to, and the debouncer coalesces the two only when + * both `balloonBundleId` AND `associatedMessageGuid` are present. We gate + * on the same pair so that regular replies — which also set + * `associatedMessageGuid` (pointing at the parent message) but have no + * `balloonBundleId` — are NOT collapsed onto their parent's dedupe key. + * + * Known tradeoff: `combineDebounceEntries` clears `balloonBundleId` on + * merged entries while keeping `associatedMessageGuid`, so a post-merge + * balloon+text message here will fall back to its `messageId`. A later + * MessagePoller replay that arrives in a different text-first/balloon-first + * order could therefore produce a different `messageId` at merge time and + * bypass this dedupe for that one message. That edge case is strictly + * narrower than the alternative — which would dedupe every distinct user + * reply against the same parent GUID and silently drop real messages. + */ +export function resolveBlueBubblesInboundDedupeKey( + message: Pick< + NormalizedWebhookMessage, + "messageId" | "balloonBundleId" | "associatedMessageGuid" + >, +): string | undefined { + const balloonBundleId = message.balloonBundleId?.trim(); + const associatedMessageGuid = message.associatedMessageGuid?.trim(); + if (balloonBundleId && associatedMessageGuid) { + return associatedMessageGuid; + } + return message.messageId?.trim() || undefined; +} + +export type InboundDedupeClaim = + | { kind: "claimed"; finalize: () => Promise; release: () => void } + | { kind: "duplicate" } + | { kind: "inflight" } + | { kind: "skip" }; + +/** + * Attempt to claim an inbound BlueBubbles message GUID. + * + * - `claimed`: caller should process the message, then call `finalize()` on + * success (persists the GUID) or `release()` on failure (lets a later + * replay try again). + * - `duplicate`: we've already committed this GUID; caller should drop. + * - `inflight`: another claim is currently in progress; caller should drop + * rather than race. + * - `skip`: GUID was missing or invalid — caller should continue processing + * without dedup (no finalize/release needed). + */ +export async function claimBlueBubblesInboundMessage(params: { + guid: string | undefined | null; + accountId: string; + onDiskError?: (error: unknown) => void; +}): Promise { + const normalized = sanitizeGuid(params.guid); + if (!normalized) { + return { kind: "skip" }; + } + const claim = await impl.claim(normalized, { + namespace: params.accountId, + onDiskError: params.onDiskError, + }); + if (claim.kind === "duplicate") { + return { kind: "duplicate" }; + } + if (claim.kind === "inflight") { + return { kind: "inflight" }; + } + return { + kind: "claimed", + finalize: async () => { + await impl.commit(normalized, { + namespace: params.accountId, + onDiskError: params.onDiskError, + }); + }, + release: () => { + impl.release(normalized, { namespace: params.accountId }); + }, + }; +} + +/** + * Reset inbound dedupe state between tests. Installs an in-memory-only + * implementation so tests do not hit disk, avoiding file-lock timing issues + * in the webhook flush path. + */ +export function _resetBlueBubblesInboundDedupForTest(): void { + impl = buildMemoryOnlyImpl(); +} diff --git a/extensions/bluebubbles/src/monitor-processing.ts b/extensions/bluebubbles/src/monitor-processing.ts index 93f083feb83..e2ef93c070f 100644 --- a/extensions/bluebubbles/src/monitor-processing.ts +++ b/extensions/bluebubbles/src/monitor-processing.ts @@ -13,6 +13,10 @@ import { downloadBlueBubblesAttachment } from "./attachments.js"; import { markBlueBubblesChatRead, sendBlueBubblesTyping } from "./chat.js"; import { resolveBlueBubblesConversationRoute } from "./conversation-route.js"; import { fetchBlueBubblesHistory } from "./history.js"; +import { + claimBlueBubblesInboundMessage, + resolveBlueBubblesInboundDedupeKey, +} from "./inbound-dedupe.js"; import { sendBlueBubblesMedia } from "./media-send.js"; import { buildMessagePlaceholder, @@ -581,11 +585,102 @@ function buildInboundHistorySnapshot(params: { return selected; } +function sanitizeForLog(value: unknown, maxLen = 200): string { + const cleaned = String(value).replace(/[\r\n\t\p{C}]/gu, " "); + return cleaned.length > maxLen ? cleaned.slice(0, maxLen) + "..." : cleaned; +} + +/** + * Signal object threaded through `processMessageAfterDedupe` so the outer + * wrapper can distinguish "reply delivery failed silently" from "returned + * normally after an intentional drop" (fromMe cache, pairing flow, allowlist + * block, empty text, etc.). + * + * Reply delivery errors in the BlueBubbles path surface through the + * dispatcher's `onError` callback rather than as thrown exceptions, so a + * plain try/catch cannot detect them — see review thread `rwF8` on #66230. + */ +type InboundDedupeDeliverySignal = { deliveryFailed: boolean }; + +/** + * Claim → process → finalize/release wrapper around the real inbound flow. + * + * Claim before doing any work so restart replays and in-flight concurrent + * redeliveries both drop cleanly. Finalize (persist the GUID) only when + * processing completed cleanly AND any reply dispatch reported success; + * release (let a later replay try again) when processing threw OR the reply + * pipeline reported a delivery failure via its onError callback. + * + * The dedupe key follows the same canonicalization rules as the debouncer + * (`monitor-debounce.ts`): balloon events (URL previews, stickers) share + * a logical identity with their originating text message via + * `associatedMessageGuid`, so balloon-first vs text-first event ordering + * cannot produce two distinct dedupe keys for the same logical message. + */ export async function processMessage( message: NormalizedWebhookMessage, target: WebhookTarget, +): Promise { + const { account, core, runtime } = target; + + const dedupeKey = resolveBlueBubblesInboundDedupeKey(message); + + // Drop BlueBubbles MessagePoller replays after server restart (#19176, #12053). + const claim = await claimBlueBubblesInboundMessage({ + guid: dedupeKey, + accountId: account.accountId, + onDiskError: (error) => + logVerbose(core, runtime, `inbound-dedupe disk error: ${sanitizeForLog(error)}`), + }); + if (claim.kind === "duplicate" || claim.kind === "inflight") { + logVerbose( + core, + runtime, + `drop: ${claim.kind} inbound key=${sanitizeForLog(dedupeKey ?? "")} sender=${sanitizeForLog(message.senderId)}`, + ); + return; + } + + const signal: InboundDedupeDeliverySignal = { deliveryFailed: false }; + try { + await processMessageAfterDedupe(message, target, signal); + } catch (error) { + if (claim.kind === "claimed") { + claim.release(); + } + throw error; + } + if (claim.kind === "claimed") { + if (signal.deliveryFailed) { + logVerbose( + core, + runtime, + `inbound-dedupe: releasing claim for key=${sanitizeForLog(dedupeKey ?? "")} after reply delivery failure (will retry on replay)`, + ); + claim.release(); + } else { + try { + await claim.finalize(); + } catch (finalizeError) { + // commit() already clears inflight state in its finally block, so + // no explicit release() needed here — just log the persistence error. + logVerbose( + core, + runtime, + `inbound-dedupe: finalize failed for key=${sanitizeForLog(dedupeKey ?? "")}: ${sanitizeForLog(finalizeError)}`, + ); + } + } + } +} + +async function processMessageAfterDedupe( + message: NormalizedWebhookMessage, + target: WebhookTarget, + dedupeSignal: InboundDedupeDeliverySignal, ): Promise { const { account, config, runtime, core, statusSink } = target; + const pairing = createChannelPairingController({ core, channel: "bluebubbles", @@ -1597,6 +1692,19 @@ export async function processMessage( onReplyStart: typingCallbacks?.onReplyStart, onIdle: typingCallbacks?.onIdle, onError: (err, info) => { + // Flag the outer dedupe wrapper so it releases the claim instead + // of committing. Without this, a transient BlueBubbles send failure + // would permanently block replay-retry for 7 days and the user + // would never receive a reply to that message. + // + // Only the terminal `final` delivery represents the user-visible + // answer. The dispatcher continues past `tool` / `block` failures + // and may still deliver `final` successfully — releasing the + // dedupe claim for those would invite a replay that re-runs tool + // side effects and resends partially-delivered content. + if (info.kind === "final") { + dedupeSignal.deliveryFailed = true; + } runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${String(err)}`); }, }, diff --git a/extensions/bluebubbles/src/test-support/monitor-test-support.ts b/extensions/bluebubbles/src/test-support/monitor-test-support.ts index 3f2d2a47745..03ee6b2343d 100644 --- a/extensions/bluebubbles/src/test-support/monitor-test-support.ts +++ b/extensions/bluebubbles/src/test-support/monitor-test-support.ts @@ -1,6 +1,7 @@ import type { HistoryEntry, PluginRuntime } from "openclaw/plugin-sdk/bluebubbles"; import { vi } from "vitest"; import { createPluginRuntimeMock } from "../../../../test/helpers/plugins/plugin-runtime-mock.js"; +import { _resetBlueBubblesInboundDedupForTest } from "../inbound-dedupe.js"; import { _resetBlueBubblesShortIdState, clearBlueBubblesWebhookSecurityStateForTest, @@ -118,6 +119,7 @@ export function resetBlueBubblesMonitorTestState(params: { }) { vi.clearAllMocks(); _resetBlueBubblesShortIdState(); + _resetBlueBubblesInboundDedupForTest(); clearBlueBubblesWebhookSecurityStateForTest(); params.extraReset?.(); params.fetchHistoryMock.mockResolvedValue({ entries: [], resolved: true });