mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:40:44 +00:00
BlueBubbles MessagePoller replays its ~1-week lookback window as new-message webhooks after BB Server restart or reconnect. Add a persistent file-backed GUID dedupe (TTL=7d) at the top of processMessage using createClaimableDedupe from the Plugin SDK. Claim/finalize/release semantics ensure transient delivery failures release the GUID so a later replay can retry. Fixes #19176, #12053. Co-authored-by: Omar Shahine <omar@shahine.com>
This commit is contained in:
@@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Reply/secrets: resolve active reply channel/account SecretRefs before reply-run message-action discovery so channel token SecretRefs (for example Discord) do not degrade into discovery-time unresolved-secret failures. (#66796) Thanks @joshavant.
|
||||
- Agents/Anthropic: ignore non-positive Anthropic Messages token overrides and fail locally when no positive token budget remains, so invalid `max_tokens` values no longer reach the provider API. (#66664) thanks @jalehman
|
||||
- Agents/context engines: preserve prompt-only token counts, not full request totals, when deferred maintenance reuses after-turn runtime context so background compaction bookkeeping matches the active prompt window. (#66820) thanks @jalehman.
|
||||
- BlueBubbles/inbound: add a persistent file-backed GUID dedupe so MessagePoller webhook replays after BB Server restart or reconnect no longer cause the agent to re-reply to already-handled messages. (#19176, #12053, #66816) Thanks @omarshahine.
|
||||
|
||||
## 2026.4.14
|
||||
|
||||
|
||||
58
extensions/bluebubbles/src/inbound-dedupe.test.ts
Normal file
58
extensions/bluebubbles/src/inbound-dedupe.test.ts
Normal file
@@ -0,0 +1,58 @@
|
||||
import { beforeEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
_resetBlueBubblesInboundDedupForTest,
|
||||
claimBlueBubblesInboundMessage,
|
||||
} from "./inbound-dedupe.js";
|
||||
|
||||
async function claimAndFinalize(guid: string | undefined, accountId: string): Promise<string> {
|
||||
const claim = await claimBlueBubblesInboundMessage({ guid, accountId });
|
||||
if (claim.kind === "claimed") {
|
||||
await claim.finalize();
|
||||
}
|
||||
return claim.kind;
|
||||
}
|
||||
|
||||
describe("claimBlueBubblesInboundMessage", () => {
|
||||
beforeEach(() => {
|
||||
_resetBlueBubblesInboundDedupForTest();
|
||||
});
|
||||
|
||||
it("claims a new guid and rejects committed duplicates", async () => {
|
||||
expect(await claimAndFinalize("g1", "acc")).toBe("claimed");
|
||||
expect(await claimAndFinalize("g1", "acc")).toBe("duplicate");
|
||||
});
|
||||
|
||||
it("scopes dedupe per account", async () => {
|
||||
expect(await claimAndFinalize("g1", "a")).toBe("claimed");
|
||||
expect(await claimAndFinalize("g1", "b")).toBe("claimed");
|
||||
});
|
||||
|
||||
it("reports skip when guid is missing or blank", async () => {
|
||||
expect((await claimBlueBubblesInboundMessage({ guid: undefined, accountId: "acc" })).kind).toBe(
|
||||
"skip",
|
||||
);
|
||||
expect((await claimBlueBubblesInboundMessage({ guid: "", accountId: "acc" })).kind).toBe(
|
||||
"skip",
|
||||
);
|
||||
expect((await claimBlueBubblesInboundMessage({ guid: " ", accountId: "acc" })).kind).toBe(
|
||||
"skip",
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects overlong guids to cap on-disk size", async () => {
|
||||
const huge = "x".repeat(10_000);
|
||||
expect((await claimBlueBubblesInboundMessage({ guid: huge, accountId: "acc" })).kind).toBe(
|
||||
"skip",
|
||||
);
|
||||
});
|
||||
|
||||
it("releases the claim so a later replay can retry after a transient failure", async () => {
|
||||
const first = await claimBlueBubblesInboundMessage({ guid: "g1", accountId: "acc" });
|
||||
expect(first.kind).toBe("claimed");
|
||||
if (first.kind === "claimed") {
|
||||
first.release();
|
||||
}
|
||||
// Released claims should be re-claimable on the next delivery.
|
||||
expect(await claimAndFinalize("g1", "acc")).toBe("claimed");
|
||||
});
|
||||
});
|
||||
172
extensions/bluebubbles/src/inbound-dedupe.ts
Normal file
172
extensions/bluebubbles/src/inbound-dedupe.ts
Normal file
@@ -0,0 +1,172 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import path from "node:path";
|
||||
import { type ClaimableDedupe, createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
|
||||
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
|
||||
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
|
||||
import type { NormalizedWebhookMessage } from "./monitor-normalize.js";
|
||||
|
||||
// BlueBubbles has no sequence/ack in its webhook protocol, and its
|
||||
// MessagePoller replays its ~1-week lookback window as `new-message` events
|
||||
// after BB Server restarts or reconnects. Without persistent dedup, the
|
||||
// gateway can reply to messages that were already handled before a restart
|
||||
// (see issues #19176, #12053).
|
||||
//
|
||||
// TTL matches BB's lookback window so any replay is guaranteed to land on
|
||||
// a remembered GUID, and the file-backed store survives gateway restarts.
|
||||
const DEDUP_TTL_MS = 7 * 24 * 60 * 60 * 1_000;
|
||||
const MEMORY_MAX_SIZE = 5_000;
|
||||
const FILE_MAX_ENTRIES = 50_000;
|
||||
// Cap GUID length so a malformed or hostile payload can't bloat the on-disk
|
||||
// dedupe file. Real BB GUIDs are short (<64 chars); 512 is generous.
|
||||
const MAX_GUID_CHARS = 512;
|
||||
|
||||
function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string {
|
||||
if (env.VITEST || env.NODE_ENV === "test") {
|
||||
// Isolate tests from real ~/.openclaw state without sharing across tests.
|
||||
// Stable-per-pid so the scoped dedupe test can observe persistence.
|
||||
const name = "openclaw-vitest-" + process.pid;
|
||||
return path.join(resolvePreferredOpenClawTmpDir(), name);
|
||||
}
|
||||
// Canonical OpenClaw state dir: honors OPENCLAW_STATE_DIR (with `~` expansion
|
||||
// via resolveUserPath), plus legacy/new fallback. Using the shared helper
|
||||
// keeps this plugin's persistence aligned with the rest of OpenClaw state.
|
||||
return resolveStateDir(env);
|
||||
}
|
||||
|
||||
function resolveNamespaceFilePath(namespace: string): string {
|
||||
// Keep a readable prefix for operator debugging, but suffix with a short
|
||||
// hash of the raw namespace so account IDs that only differ by
|
||||
// filesystem-unsafe characters (e.g. "acct/a" vs "acct:a") don't collapse
|
||||
// onto the same file.
|
||||
const safePrefix = namespace.replace(/[^a-zA-Z0-9_-]/g, "_") || "ns";
|
||||
const hash = createHash("sha256").update(namespace, "utf8").digest("hex").slice(0, 12);
|
||||
return path.join(
|
||||
resolveStateDirFromEnv(),
|
||||
"bluebubbles",
|
||||
"inbound-dedupe",
|
||||
`${safePrefix}__${hash}.json`,
|
||||
);
|
||||
}
|
||||
|
||||
function buildPersistentImpl(): ClaimableDedupe {
|
||||
return createClaimableDedupe({
|
||||
ttlMs: DEDUP_TTL_MS,
|
||||
memoryMaxSize: MEMORY_MAX_SIZE,
|
||||
fileMaxEntries: FILE_MAX_ENTRIES,
|
||||
resolveFilePath: resolveNamespaceFilePath,
|
||||
});
|
||||
}
|
||||
|
||||
function buildMemoryOnlyImpl(): ClaimableDedupe {
|
||||
return createClaimableDedupe({
|
||||
ttlMs: DEDUP_TTL_MS,
|
||||
memoryMaxSize: MEMORY_MAX_SIZE,
|
||||
});
|
||||
}
|
||||
|
||||
let impl: ClaimableDedupe = buildPersistentImpl();
|
||||
|
||||
function sanitizeGuid(guid: string | undefined | null): string | null {
|
||||
const trimmed = guid?.trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
if (trimmed.length > MAX_GUID_CHARS) {
|
||||
return null;
|
||||
}
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the canonical dedupe key for a BlueBubbles inbound message.
|
||||
*
|
||||
* Mirrors `monitor-debounce.ts`'s `buildKey`: BlueBubbles sends URL-preview
|
||||
* / sticker "balloon" events with a different `messageId` than the text
|
||||
* message they belong to, and the debouncer coalesces the two only when
|
||||
* both `balloonBundleId` AND `associatedMessageGuid` are present. We gate
|
||||
* on the same pair so that regular replies — which also set
|
||||
* `associatedMessageGuid` (pointing at the parent message) but have no
|
||||
* `balloonBundleId` — are NOT collapsed onto their parent's dedupe key.
|
||||
*
|
||||
* Known tradeoff: `combineDebounceEntries` clears `balloonBundleId` on
|
||||
* merged entries while keeping `associatedMessageGuid`, so a post-merge
|
||||
* balloon+text message here will fall back to its `messageId`. A later
|
||||
* MessagePoller replay that arrives in a different text-first/balloon-first
|
||||
* order could therefore produce a different `messageId` at merge time and
|
||||
* bypass this dedupe for that one message. That edge case is strictly
|
||||
* narrower than the alternative — which would dedupe every distinct user
|
||||
* reply against the same parent GUID and silently drop real messages.
|
||||
*/
|
||||
export function resolveBlueBubblesInboundDedupeKey(
|
||||
message: Pick<
|
||||
NormalizedWebhookMessage,
|
||||
"messageId" | "balloonBundleId" | "associatedMessageGuid"
|
||||
>,
|
||||
): string | undefined {
|
||||
const balloonBundleId = message.balloonBundleId?.trim();
|
||||
const associatedMessageGuid = message.associatedMessageGuid?.trim();
|
||||
if (balloonBundleId && associatedMessageGuid) {
|
||||
return associatedMessageGuid;
|
||||
}
|
||||
return message.messageId?.trim() || undefined;
|
||||
}
|
||||
|
||||
export type InboundDedupeClaim =
|
||||
| { kind: "claimed"; finalize: () => Promise<void>; release: () => void }
|
||||
| { kind: "duplicate" }
|
||||
| { kind: "inflight" }
|
||||
| { kind: "skip" };
|
||||
|
||||
/**
|
||||
* Attempt to claim an inbound BlueBubbles message GUID.
|
||||
*
|
||||
* - `claimed`: caller should process the message, then call `finalize()` on
|
||||
* success (persists the GUID) or `release()` on failure (lets a later
|
||||
* replay try again).
|
||||
* - `duplicate`: we've already committed this GUID; caller should drop.
|
||||
* - `inflight`: another claim is currently in progress; caller should drop
|
||||
* rather than race.
|
||||
* - `skip`: GUID was missing or invalid — caller should continue processing
|
||||
* without dedup (no finalize/release needed).
|
||||
*/
|
||||
export async function claimBlueBubblesInboundMessage(params: {
|
||||
guid: string | undefined | null;
|
||||
accountId: string;
|
||||
onDiskError?: (error: unknown) => void;
|
||||
}): Promise<InboundDedupeClaim> {
|
||||
const normalized = sanitizeGuid(params.guid);
|
||||
if (!normalized) {
|
||||
return { kind: "skip" };
|
||||
}
|
||||
const claim = await impl.claim(normalized, {
|
||||
namespace: params.accountId,
|
||||
onDiskError: params.onDiskError,
|
||||
});
|
||||
if (claim.kind === "duplicate") {
|
||||
return { kind: "duplicate" };
|
||||
}
|
||||
if (claim.kind === "inflight") {
|
||||
return { kind: "inflight" };
|
||||
}
|
||||
return {
|
||||
kind: "claimed",
|
||||
finalize: async () => {
|
||||
await impl.commit(normalized, {
|
||||
namespace: params.accountId,
|
||||
onDiskError: params.onDiskError,
|
||||
});
|
||||
},
|
||||
release: () => {
|
||||
impl.release(normalized, { namespace: params.accountId });
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset inbound dedupe state between tests. Installs an in-memory-only
|
||||
* implementation so tests do not hit disk, avoiding file-lock timing issues
|
||||
* in the webhook flush path.
|
||||
*/
|
||||
export function _resetBlueBubblesInboundDedupForTest(): void {
|
||||
impl = buildMemoryOnlyImpl();
|
||||
}
|
||||
@@ -13,6 +13,10 @@ import { downloadBlueBubblesAttachment } from "./attachments.js";
|
||||
import { markBlueBubblesChatRead, sendBlueBubblesTyping } from "./chat.js";
|
||||
import { resolveBlueBubblesConversationRoute } from "./conversation-route.js";
|
||||
import { fetchBlueBubblesHistory } from "./history.js";
|
||||
import {
|
||||
claimBlueBubblesInboundMessage,
|
||||
resolveBlueBubblesInboundDedupeKey,
|
||||
} from "./inbound-dedupe.js";
|
||||
import { sendBlueBubblesMedia } from "./media-send.js";
|
||||
import {
|
||||
buildMessagePlaceholder,
|
||||
@@ -581,11 +585,102 @@ function buildInboundHistorySnapshot(params: {
|
||||
return selected;
|
||||
}
|
||||
|
||||
function sanitizeForLog(value: unknown, maxLen = 200): string {
|
||||
const cleaned = String(value).replace(/[\r\n\t\p{C}]/gu, " ");
|
||||
return cleaned.length > maxLen ? cleaned.slice(0, maxLen) + "..." : cleaned;
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal object threaded through `processMessageAfterDedupe` so the outer
|
||||
* wrapper can distinguish "reply delivery failed silently" from "returned
|
||||
* normally after an intentional drop" (fromMe cache, pairing flow, allowlist
|
||||
* block, empty text, etc.).
|
||||
*
|
||||
* Reply delivery errors in the BlueBubbles path surface through the
|
||||
* dispatcher's `onError` callback rather than as thrown exceptions, so a
|
||||
* plain try/catch cannot detect them — see review thread `rwF8` on #66230.
|
||||
*/
|
||||
type InboundDedupeDeliverySignal = { deliveryFailed: boolean };
|
||||
|
||||
/**
|
||||
* Claim → process → finalize/release wrapper around the real inbound flow.
|
||||
*
|
||||
* Claim before doing any work so restart replays and in-flight concurrent
|
||||
* redeliveries both drop cleanly. Finalize (persist the GUID) only when
|
||||
* processing completed cleanly AND any reply dispatch reported success;
|
||||
* release (let a later replay try again) when processing threw OR the reply
|
||||
* pipeline reported a delivery failure via its onError callback.
|
||||
*
|
||||
* The dedupe key follows the same canonicalization rules as the debouncer
|
||||
* (`monitor-debounce.ts`): balloon events (URL previews, stickers) share
|
||||
* a logical identity with their originating text message via
|
||||
* `associatedMessageGuid`, so balloon-first vs text-first event ordering
|
||||
* cannot produce two distinct dedupe keys for the same logical message.
|
||||
*/
|
||||
export async function processMessage(
|
||||
message: NormalizedWebhookMessage,
|
||||
target: WebhookTarget,
|
||||
): Promise<void> {
|
||||
const { account, core, runtime } = target;
|
||||
|
||||
const dedupeKey = resolveBlueBubblesInboundDedupeKey(message);
|
||||
|
||||
// Drop BlueBubbles MessagePoller replays after server restart (#19176, #12053).
|
||||
const claim = await claimBlueBubblesInboundMessage({
|
||||
guid: dedupeKey,
|
||||
accountId: account.accountId,
|
||||
onDiskError: (error) =>
|
||||
logVerbose(core, runtime, `inbound-dedupe disk error: ${sanitizeForLog(error)}`),
|
||||
});
|
||||
if (claim.kind === "duplicate" || claim.kind === "inflight") {
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`drop: ${claim.kind} inbound key=${sanitizeForLog(dedupeKey ?? "")} sender=${sanitizeForLog(message.senderId)}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const signal: InboundDedupeDeliverySignal = { deliveryFailed: false };
|
||||
try {
|
||||
await processMessageAfterDedupe(message, target, signal);
|
||||
} catch (error) {
|
||||
if (claim.kind === "claimed") {
|
||||
claim.release();
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
if (claim.kind === "claimed") {
|
||||
if (signal.deliveryFailed) {
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`inbound-dedupe: releasing claim for key=${sanitizeForLog(dedupeKey ?? "")} after reply delivery failure (will retry on replay)`,
|
||||
);
|
||||
claim.release();
|
||||
} else {
|
||||
try {
|
||||
await claim.finalize();
|
||||
} catch (finalizeError) {
|
||||
// commit() already clears inflight state in its finally block, so
|
||||
// no explicit release() needed here — just log the persistence error.
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`inbound-dedupe: finalize failed for key=${sanitizeForLog(dedupeKey ?? "")}: ${sanitizeForLog(finalizeError)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function processMessageAfterDedupe(
|
||||
message: NormalizedWebhookMessage,
|
||||
target: WebhookTarget,
|
||||
dedupeSignal: InboundDedupeDeliverySignal,
|
||||
): Promise<void> {
|
||||
const { account, config, runtime, core, statusSink } = target;
|
||||
|
||||
const pairing = createChannelPairingController({
|
||||
core,
|
||||
channel: "bluebubbles",
|
||||
@@ -1597,6 +1692,19 @@ export async function processMessage(
|
||||
onReplyStart: typingCallbacks?.onReplyStart,
|
||||
onIdle: typingCallbacks?.onIdle,
|
||||
onError: (err, info) => {
|
||||
// Flag the outer dedupe wrapper so it releases the claim instead
|
||||
// of committing. Without this, a transient BlueBubbles send failure
|
||||
// would permanently block replay-retry for 7 days and the user
|
||||
// would never receive a reply to that message.
|
||||
//
|
||||
// Only the terminal `final` delivery represents the user-visible
|
||||
// answer. The dispatcher continues past `tool` / `block` failures
|
||||
// and may still deliver `final` successfully — releasing the
|
||||
// dedupe claim for those would invite a replay that re-runs tool
|
||||
// side effects and resends partially-delivered content.
|
||||
if (info.kind === "final") {
|
||||
dedupeSignal.deliveryFailed = true;
|
||||
}
|
||||
runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${String(err)}`);
|
||||
},
|
||||
},
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { HistoryEntry, PluginRuntime } from "openclaw/plugin-sdk/bluebubbles";
|
||||
import { vi } from "vitest";
|
||||
import { createPluginRuntimeMock } from "../../../../test/helpers/plugins/plugin-runtime-mock.js";
|
||||
import { _resetBlueBubblesInboundDedupForTest } from "../inbound-dedupe.js";
|
||||
import {
|
||||
_resetBlueBubblesShortIdState,
|
||||
clearBlueBubblesWebhookSecurityStateForTest,
|
||||
@@ -118,6 +119,7 @@ export function resetBlueBubblesMonitorTestState(params: {
|
||||
}) {
|
||||
vi.clearAllMocks();
|
||||
_resetBlueBubblesShortIdState();
|
||||
_resetBlueBubblesInboundDedupForTest();
|
||||
clearBlueBubblesWebhookSecurityStateForTest();
|
||||
params.extraReset?.();
|
||||
params.fetchHistoryMock.mockResolvedValue({ entries: [], resolved: true });
|
||||
|
||||
Reference in New Issue
Block a user