From a342b5d5ad5cb01639d44a1cdf0537b3d2de137e Mon Sep 17 00:00:00 2001 From: Lobster <10343873+omarshahine@users.noreply.github.com> Date: Wed, 15 Apr 2026 18:26:07 -0700 Subject: [PATCH] BlueBubbles/dedupe: migrate legacy unhashed dedupe file on startup so catchup does not re-dispatch already-handled messages after upgrade --- extensions/bluebubbles/src/catchup.ts | 10 ++++ extensions/bluebubbles/src/inbound-dedupe.ts | 60 ++++++++++++++++++-- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/extensions/bluebubbles/src/catchup.ts b/extensions/bluebubbles/src/catchup.ts index aafa5e4653b..35e0d90a238 100644 --- a/extensions/bluebubbles/src/catchup.ts +++ b/extensions/bluebubbles/src/catchup.ts @@ -4,6 +4,7 @@ import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plug import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; import { resolveBlueBubblesServerAccount } from "./account-resolve.js"; +import { warmupBlueBubblesInboundDedupe } from "./inbound-dedupe.js"; import { asRecord, normalizeWebhookMessage } from "./monitor-normalize.js"; import { processMessage } from "./monitor-processing.js"; import type { WebhookTarget } from "./monitor-shared.js"; @@ -415,6 +416,15 @@ async function runBlueBubblesCatchupInner( return null; } + // Ensure legacy→hashed dedupe file migration runs and the on-disk store + // is warm before we replay. Without this, an upgrade from a version that + // used the old `${safe}.json` naming to the current `${safe}__${hash}.json` + // would start with an empty dedupe cache and re-dispatch every message in + // the catchup window — producing duplicate replies. + await warmupBlueBubblesInboundDedupe(accountId).catch((err) => { + error?.(`[${accountId}] BlueBubbles catchup: dedupe warmup failed: ${String(err)}`); + }); + const { resolved, messages } = await fetchFn(windowStartMs, perRunLimit, { baseUrl, password, diff --git a/extensions/bluebubbles/src/inbound-dedupe.ts b/extensions/bluebubbles/src/inbound-dedupe.ts index 8327b899710..deffa49685f 100644 --- a/extensions/bluebubbles/src/inbound-dedupe.ts +++ b/extensions/bluebubbles/src/inbound-dedupe.ts @@ -1,4 +1,5 @@ import { createHash } from "node:crypto"; +import fs from "node:fs"; import path from "node:path"; import { type ClaimableDedupe, createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; @@ -33,6 +34,11 @@ function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { return resolveStateDir(env); } +function resolveLegacyNamespaceFilePath(namespace: string): string { + const safe = namespace.replace(/[^a-zA-Z0-9_-]/g, "_") || "global"; + return path.join(resolveStateDirFromEnv(), "bluebubbles", "inbound-dedupe", `${safe}.json`); +} + function resolveNamespaceFilePath(namespace: string): string { // Keep a readable prefix for operator debugging, but suffix with a short // hash of the raw namespace so account IDs that only differ by @@ -40,12 +46,42 @@ function resolveNamespaceFilePath(namespace: string): string { // onto the same file. const safePrefix = namespace.replace(/[^a-zA-Z0-9_-]/g, "_") || "ns"; const hash = createHash("sha256").update(namespace, "utf8").digest("hex").slice(0, 12); - return path.join( - resolveStateDirFromEnv(), - "bluebubbles", - "inbound-dedupe", - `${safePrefix}__${hash}.json`, - ); + const dir = path.join(resolveStateDirFromEnv(), "bluebubbles", "inbound-dedupe"); + const newPath = path.join(dir, `${safePrefix}__${hash}.json`); + + // One-time migration: earlier beta shipped `${safe}.json` (no hash). + // Rename so the upgrade preserves existing dedupe entries instead of + // starting from an empty file and replaying already-handled messages. + migrateLegacyDedupeFile(namespace, newPath); + + return newPath; +} + +const migratedNamespaces = new Set(); + +function migrateLegacyDedupeFile(namespace: string, newPath: string): void { + if (migratedNamespaces.has(namespace)) { + return; + } + migratedNamespaces.add(namespace); + try { + const legacyPath = resolveLegacyNamespaceFilePath(namespace); + if (legacyPath === newPath) { + return; + } + if (!fs.existsSync(legacyPath)) { + return; + } + if (!fs.existsSync(newPath)) { + fs.renameSync(legacyPath, newPath); + } else { + // Both exist: new file is authoritative; remove the stale legacy. + fs.unlinkSync(legacyPath); + } + } catch { + // Best-effort migration; a missed rename is strictly less harmful + // than crashing the module load path. + } } function buildPersistentImpl(): ClaimableDedupe { @@ -162,6 +198,18 @@ export async function claimBlueBubblesInboundMessage(params: { }; } +/** + * Ensure the legacy→hashed dedupe file migration runs and the on-disk + * store is warmed into memory for the given account. Call before any + * catchup replay so already-handled GUIDs are recognized even when the + * file-naming convention changed between versions. + */ +export async function warmupBlueBubblesInboundDedupe(accountId: string): Promise { + // Trigger the migration side-effect inside resolveNamespaceFilePath. + resolveNamespaceFilePath(accountId); + await impl.warmup(accountId); +} + /** * Reset inbound dedupe state between tests. Installs an in-memory-only * implementation so tests do not hit disk, avoiding file-lock timing issues