mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 21:00:44 +00:00
fix(dedupe): serialize concurrent file writes to prevent lost-update race on inbound GUID commit
The re-entrant file lock allowed two concurrent checkAndRecordInner calls (e.g., inbound user message + outbound agent reply) to read the same stale file contents, then the last writer silently overwrote the first. The in-memory cache masked this within a process lifetime, but after restart the lost GUID caused catchup to re-dispatch already-handled messages. Add an in-process write queue per file path so read-modify-write cycles targeting the same dedupe file are serialized. Also filter associated- message events (balloons, tapbacks) in catchup since they bypass the live path's debouncer and have distinct GUIDs from their parent message.
This commit is contained in:
@@ -498,6 +498,22 @@ async function runBlueBubblesCatchupInner(
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip balloon/tapback/reaction events: they carry an
|
||||
// `associatedMessageGuid` pointing at the parent text message and
|
||||
// have a different `guid` of their own. The live webhook path handles
|
||||
// them via the debouncer, which coalesces balloons with their parent.
|
||||
// Without debouncing here, replaying a balloon would dispatch it as a
|
||||
// standalone message — producing a duplicate reply to the parent.
|
||||
const assocGuid =
|
||||
typeof rec.associatedMessageGuid === "string"
|
||||
? rec.associatedMessageGuid.trim()
|
||||
: typeof rec.associated_message_guid === "string"
|
||||
? rec.associated_message_guid.trim()
|
||||
: "";
|
||||
if (assocGuid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const normalized = normalizeWebhookMessage({ type: "new-message", data: rec });
|
||||
if (!normalized) {
|
||||
summary.failed++;
|
||||
|
||||
@@ -154,6 +154,29 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis
|
||||
const lockOptions = mergeLockOptions(options.lockOptions);
|
||||
const memory = createDedupeCache({ ttlMs, maxSize: memoryMaxSize });
|
||||
const inflight = new Map<string, Promise<boolean>>();
|
||||
// In-process write queue per file path. `withFileLock` is re-entrant
|
||||
// within the same process (a second caller for the same path gets
|
||||
// immediate access instead of waiting), so two concurrent
|
||||
// checkAndRecordInner calls for different keys but the same file can
|
||||
// race: both read the same stale data, and the last writer's
|
||||
// writeJsonFileAtomically silently overwrites the first writer's
|
||||
// additions. This queue serializes all read-modify-write cycles
|
||||
// targeting the same file within this process, preventing the lost
|
||||
// update while still allowing cross-process file-lock contention to
|
||||
// be handled by the file lock itself.
|
||||
const fileWriteQueues = new Map<string, Promise<unknown>>();
|
||||
|
||||
function enqueueFileWrite<T>(filePath: string, fn: () => Promise<T>): Promise<T> {
|
||||
const prev = fileWriteQueues.get(filePath) ?? Promise.resolve();
|
||||
const next = prev.then(fn, fn);
|
||||
fileWriteQueues.set(filePath, next);
|
||||
void next.finally(() => {
|
||||
if (fileWriteQueues.get(filePath) === next) {
|
||||
fileWriteQueues.delete(filePath);
|
||||
}
|
||||
});
|
||||
return next;
|
||||
}
|
||||
|
||||
async function checkAndRecordInner(
|
||||
key: string,
|
||||
@@ -168,19 +191,21 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis
|
||||
|
||||
const path = options.resolveFilePath(namespace);
|
||||
try {
|
||||
const duplicate = await withFileLock(path, lockOptions, async () => {
|
||||
const { value } = await readJsonFileWithFallback<PersistentDedupeData>(path, {});
|
||||
const data = sanitizeData(value);
|
||||
const seenAt = data[key];
|
||||
const isRecent = seenAt != null && (ttlMs <= 0 || now - seenAt < ttlMs);
|
||||
if (isRecent) {
|
||||
return true;
|
||||
}
|
||||
data[key] = now;
|
||||
pruneData(data, now, ttlMs, fileMaxEntries);
|
||||
await writeJsonFileAtomically(path, data);
|
||||
return false;
|
||||
});
|
||||
const duplicate = await enqueueFileWrite(path, () =>
|
||||
withFileLock(path, lockOptions, async () => {
|
||||
const { value } = await readJsonFileWithFallback<PersistentDedupeData>(path, {});
|
||||
const data = sanitizeData(value);
|
||||
const seenAt = data[key];
|
||||
const isRecent = seenAt != null && (ttlMs <= 0 || now - seenAt < ttlMs);
|
||||
if (isRecent) {
|
||||
return true;
|
||||
}
|
||||
data[key] = now;
|
||||
pruneData(data, now, ttlMs, fileMaxEntries);
|
||||
await writeJsonFileAtomically(path, data);
|
||||
return false;
|
||||
}),
|
||||
);
|
||||
return !duplicate;
|
||||
} catch (error) {
|
||||
onDiskError?.(error);
|
||||
|
||||
Reference in New Issue
Block a user