mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 16:40:49 +00:00
BlueBubbles/dedupe: migrate legacy unhashed dedupe file on startup so catchup does not re-dispatch already-handled messages after upgrade
This commit is contained in:
@@ -4,6 +4,7 @@ import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plug
|
||||
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
|
||||
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
|
||||
import { resolveBlueBubblesServerAccount } from "./account-resolve.js";
|
||||
import { warmupBlueBubblesInboundDedupe } from "./inbound-dedupe.js";
|
||||
import { asRecord, normalizeWebhookMessage } from "./monitor-normalize.js";
|
||||
import { processMessage } from "./monitor-processing.js";
|
||||
import type { WebhookTarget } from "./monitor-shared.js";
|
||||
@@ -415,6 +416,15 @@ async function runBlueBubblesCatchupInner(
|
||||
return null;
|
||||
}
|
||||
|
||||
// Ensure legacy→hashed dedupe file migration runs and the on-disk store
|
||||
// is warm before we replay. Without this, an upgrade from a version that
|
||||
// used the old `${safe}.json` naming to the current `${safe}__${hash}.json`
|
||||
// would start with an empty dedupe cache and re-dispatch every message in
|
||||
// the catchup window — producing duplicate replies.
|
||||
await warmupBlueBubblesInboundDedupe(accountId).catch((err) => {
|
||||
error?.(`[${accountId}] BlueBubbles catchup: dedupe warmup failed: ${String(err)}`);
|
||||
});
|
||||
|
||||
const { resolved, messages } = await fetchFn(windowStartMs, perRunLimit, {
|
||||
baseUrl,
|
||||
password,
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { type ClaimableDedupe, createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
|
||||
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
|
||||
@@ -33,6 +34,11 @@ function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string {
|
||||
return resolveStateDir(env);
|
||||
}
|
||||
|
||||
function resolveLegacyNamespaceFilePath(namespace: string): string {
|
||||
const safe = namespace.replace(/[^a-zA-Z0-9_-]/g, "_") || "global";
|
||||
return path.join(resolveStateDirFromEnv(), "bluebubbles", "inbound-dedupe", `${safe}.json`);
|
||||
}
|
||||
|
||||
function resolveNamespaceFilePath(namespace: string): string {
|
||||
// Keep a readable prefix for operator debugging, but suffix with a short
|
||||
// hash of the raw namespace so account IDs that only differ by
|
||||
@@ -40,12 +46,42 @@ function resolveNamespaceFilePath(namespace: string): string {
|
||||
// onto the same file.
|
||||
const safePrefix = namespace.replace(/[^a-zA-Z0-9_-]/g, "_") || "ns";
|
||||
const hash = createHash("sha256").update(namespace, "utf8").digest("hex").slice(0, 12);
|
||||
return path.join(
|
||||
resolveStateDirFromEnv(),
|
||||
"bluebubbles",
|
||||
"inbound-dedupe",
|
||||
`${safePrefix}__${hash}.json`,
|
||||
);
|
||||
const dir = path.join(resolveStateDirFromEnv(), "bluebubbles", "inbound-dedupe");
|
||||
const newPath = path.join(dir, `${safePrefix}__${hash}.json`);
|
||||
|
||||
// One-time migration: earlier beta shipped `${safe}.json` (no hash).
|
||||
// Rename so the upgrade preserves existing dedupe entries instead of
|
||||
// starting from an empty file and replaying already-handled messages.
|
||||
migrateLegacyDedupeFile(namespace, newPath);
|
||||
|
||||
return newPath;
|
||||
}
|
||||
|
||||
const migratedNamespaces = new Set<string>();
|
||||
|
||||
function migrateLegacyDedupeFile(namespace: string, newPath: string): void {
|
||||
if (migratedNamespaces.has(namespace)) {
|
||||
return;
|
||||
}
|
||||
migratedNamespaces.add(namespace);
|
||||
try {
|
||||
const legacyPath = resolveLegacyNamespaceFilePath(namespace);
|
||||
if (legacyPath === newPath) {
|
||||
return;
|
||||
}
|
||||
if (!fs.existsSync(legacyPath)) {
|
||||
return;
|
||||
}
|
||||
if (!fs.existsSync(newPath)) {
|
||||
fs.renameSync(legacyPath, newPath);
|
||||
} else {
|
||||
// Both exist: new file is authoritative; remove the stale legacy.
|
||||
fs.unlinkSync(legacyPath);
|
||||
}
|
||||
} catch {
|
||||
// Best-effort migration; a missed rename is strictly less harmful
|
||||
// than crashing the module load path.
|
||||
}
|
||||
}
|
||||
|
||||
function buildPersistentImpl(): ClaimableDedupe {
|
||||
@@ -162,6 +198,18 @@ export async function claimBlueBubblesInboundMessage(params: {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure the legacy→hashed dedupe file migration runs and the on-disk
|
||||
* store is warmed into memory for the given account. Call before any
|
||||
* catchup replay so already-handled GUIDs are recognized even when the
|
||||
* file-naming convention changed between versions.
|
||||
*/
|
||||
export async function warmupBlueBubblesInboundDedupe(accountId: string): Promise<void> {
|
||||
// Trigger the migration side-effect inside resolveNamespaceFilePath.
|
||||
resolveNamespaceFilePath(accountId);
|
||||
await impl.warmup(accountId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset inbound dedupe state between tests. Installs an in-memory-only
|
||||
* implementation so tests do not hit disk, avoiding file-lock timing issues
|
||||
|
||||
Reference in New Issue
Block a user