From 85aa463013b3005313595c2c6b2dc82a26aed3e7 Mon Sep 17 00:00:00 2001 From: Lobster <10343873+omarshahine@users.noreply.github.com> Date: Wed, 15 Apr 2026 19:19:57 -0700 Subject: [PATCH] fix(dedupe): serialize concurrent file writes to prevent lost-update race on inbound GUID commit The re-entrant file lock allowed two concurrent checkAndRecordInner calls (e.g., inbound user message + outbound agent reply) to read the same stale file contents, then the last writer silently overwrote the first. The in-memory cache masked this within a process lifetime, but after restart the lost GUID caused catchup to re-dispatch already-handled messages. Add an in-process write queue per file path so read-modify-write cycles targeting the same dedupe file are serialized. Also filter associated- message events (balloons, tapbacks) in catchup since they bypass the live path's debouncer and have distinct GUIDs from their parent message. --- extensions/bluebubbles/src/catchup.ts | 16 +++++++++ src/plugin-sdk/persistent-dedupe.ts | 51 ++++++++++++++++++++------- 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/extensions/bluebubbles/src/catchup.ts b/extensions/bluebubbles/src/catchup.ts index 35e0d90a238..aa6ee26e385 100644 --- a/extensions/bluebubbles/src/catchup.ts +++ b/extensions/bluebubbles/src/catchup.ts @@ -498,6 +498,22 @@ async function runBlueBubblesCatchupInner( continue; } + // Skip balloon/tapback/reaction events: they carry an + // `associatedMessageGuid` pointing at the parent text message and + // have a different `guid` of their own. The live webhook path handles + // them via the debouncer, which coalesces balloons with their parent. + // Without debouncing here, replaying a balloon would dispatch it as a + // standalone message — producing a duplicate reply to the parent. + const assocGuid = + typeof rec.associatedMessageGuid === "string" + ? rec.associatedMessageGuid.trim() + : typeof rec.associated_message_guid === "string" + ? rec.associated_message_guid.trim() + : ""; + if (assocGuid) { + continue; + } + const normalized = normalizeWebhookMessage({ type: "new-message", data: rec }); if (!normalized) { summary.failed++; diff --git a/src/plugin-sdk/persistent-dedupe.ts b/src/plugin-sdk/persistent-dedupe.ts index 79dd33e2779..f109a27d647 100644 --- a/src/plugin-sdk/persistent-dedupe.ts +++ b/src/plugin-sdk/persistent-dedupe.ts @@ -154,6 +154,29 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis const lockOptions = mergeLockOptions(options.lockOptions); const memory = createDedupeCache({ ttlMs, maxSize: memoryMaxSize }); const inflight = new Map>(); + // In-process write queue per file path. `withFileLock` is re-entrant + // within the same process (a second caller for the same path gets + // immediate access instead of waiting), so two concurrent + // checkAndRecordInner calls for different keys but the same file can + // race: both read the same stale data, and the last writer's + // writeJsonFileAtomically silently overwrites the first writer's + // additions. This queue serializes all read-modify-write cycles + // targeting the same file within this process, preventing the lost + // update while still allowing cross-process file-lock contention to + // be handled by the file lock itself. + const fileWriteQueues = new Map>(); + + function enqueueFileWrite(filePath: string, fn: () => Promise): Promise { + const prev = fileWriteQueues.get(filePath) ?? Promise.resolve(); + const next = prev.then(fn, fn); + fileWriteQueues.set(filePath, next); + void next.finally(() => { + if (fileWriteQueues.get(filePath) === next) { + fileWriteQueues.delete(filePath); + } + }); + return next; + } async function checkAndRecordInner( key: string, @@ -168,19 +191,21 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis const path = options.resolveFilePath(namespace); try { - const duplicate = await withFileLock(path, lockOptions, async () => { - const { value } = await readJsonFileWithFallback(path, {}); - const data = sanitizeData(value); - const seenAt = data[key]; - const isRecent = seenAt != null && (ttlMs <= 0 || now - seenAt < ttlMs); - if (isRecent) { - return true; - } - data[key] = now; - pruneData(data, now, ttlMs, fileMaxEntries); - await writeJsonFileAtomically(path, data); - return false; - }); + const duplicate = await enqueueFileWrite(path, () => + withFileLock(path, lockOptions, async () => { + const { value } = await readJsonFileWithFallback(path, {}); + const data = sanitizeData(value); + const seenAt = data[key]; + const isRecent = seenAt != null && (ttlMs <= 0 || now - seenAt < ttlMs); + if (isRecent) { + return true; + } + data[key] = now; + pruneData(data, now, ttlMs, fileMaxEntries); + await writeJsonFileAtomically(path, data); + return false; + }), + ); return !duplicate; } catch (error) { onDiskError?.(error);