From d3c293d9c8f1d9d41718a13b720763dc4f3f2ca2 Mon Sep 17 00:00:00 2001 From: "clawsweeper[bot]" <274271284+clawsweeper[bot]@users.noreply.github.com> Date: Mon, 25 May 2026 05:18:19 +0000 Subject: [PATCH] fix(commitments): serialize load-modify-save with in-process queue + cross-process file lock (#86326) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: - The PR adds a commitments-store writer helper, wraps load-modify-save mutators and expiry cleanup with a per-path queue plus `withFileLock`, adds three concurrency regressions, and updates the changelog. - PR surface: Source +153, Tests +61, Docs +1. Total +215 across 4 files. - Reproducibility: yes. Source inspection on current main shows the unqueued load-modify-save mutation path, a ... inked proof log shows the Promise.all repro changing from 20/20 lost writes before the patch to 0/20 after. Automerge notes: - PR branch already contained follow-up commit before automerge: fix(commitments): serialize load-modify-save with in-process queue + … Validation: - ClawSweeper review passed for head a349f41ccfe88105a8b26733081759ac18e5a668. - Required merge gates passed before the squash merge. Prepared head SHA: a349f41ccfe88105a8b26733081759ac18e5a668 Review: https://github.com/openclaw/openclaw/pull/86326#issuecomment-4531553610 Co-authored-by: ai-hpc Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com> Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com> Approved-by: takhoffman Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com> --- CHANGELOG.md | 1 + src/commitments/store-writer.ts | 135 +++++++++++++++++++++++++ src/commitments/store.test.ts | 61 ++++++++++++ src/commitments/store.ts | 170 ++++++++++++++++++-------------- 4 files changed, 291 insertions(+), 76 deletions(-) create mode 100644 src/commitments/store-writer.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 87c6be06603..b33bea8fad7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents/commitments: serialize commitment store load-modify-save writes so concurrent heartbeat and CLI updates no longer lose dismissal, sent, or attempt state. (#81153) Thanks @ai-hpc. - Gateway/perf: tighten restart and startup benchmark failure handling so long profiling runs, failed probes, and fresh Linux runners no longer produce false passing or `n/a` results. - Checks: keep intentional Knip unused-file findings optional so full CI and sparse proof workspaces stay aligned. - Docker: restore writable `~/.config` in runtime images. Fixes #85968. Thanks @hkoessler and @Bartok9. diff --git a/src/commitments/store-writer.ts b/src/commitments/store-writer.ts new file mode 100644 index 00000000000..55e9d44e471 --- /dev/null +++ b/src/commitments/store-writer.ts @@ -0,0 +1,135 @@ +// Per-store-path mutation gate for the commitments store. Mirrors the +// in-process queue + cross-process file-lock pattern in +// src/plugin-sdk/persistent-dedupe.ts (issue #81145). + +import fs from "node:fs/promises"; +import path from "node:path"; +import { type FileLockOptions, withFileLock } from "../plugin-sdk/file-lock.js"; + +type CommitmentsStoreWriterTask = { + fn: () => Promise; + resolve: (value: unknown) => void; + reject: (reason: unknown) => void; +}; + +type CommitmentsStoreWriterQueue = { + running: boolean; + pending: CommitmentsStoreWriterTask[]; + drainPromise: Promise | null; +}; + +const WRITER_QUEUES = new Map(); + +// Matches src/plugin-sdk/persistent-dedupe.ts so both lock-protected stores share tuning. +const DEFAULT_COMMITMENTS_LOCK_OPTIONS: FileLockOptions = { + retries: { + retries: 6, + factor: 1.35, + minTimeout: 8, + maxTimeout: 180, + randomize: true, + }, + stale: 60_000, +}; + +function getOrCreateWriterQueue(storePath: string): CommitmentsStoreWriterQueue { + const existing = WRITER_QUEUES.get(storePath); + if (existing) { + return existing; + } + const created: CommitmentsStoreWriterQueue = { + running: false, + pending: [], + drainPromise: null, + }; + WRITER_QUEUES.set(storePath, created); + return created; +} + +async function drainCommitmentsStoreWriterQueue(storePath: string): Promise { + const queue = WRITER_QUEUES.get(storePath); + if (!queue) { + return; + } + if (queue.drainPromise) { + await queue.drainPromise; + return; + } + queue.running = true; + queue.drainPromise = (async () => { + try { + while (queue.pending.length > 0) { + const task = queue.pending.shift(); + if (!task) { + continue; + } + let result: unknown; + let failed: unknown; + let hasFailure = false; + try { + result = await task.fn(); + } catch (err) { + hasFailure = true; + failed = err; + } + if (hasFailure) { + task.reject(failed); + continue; + } + task.resolve(result); + } + } finally { + queue.running = false; + queue.drainPromise = null; + if (queue.pending.length === 0) { + WRITER_QUEUES.delete(storePath); + } else { + queueMicrotask(() => { + void drainCommitmentsStoreWriterQueue(storePath); + }); + } + } + })(); + await queue.drainPromise; +} + +// The advisory lockfile lives next to the data file; create the parent dir up +// front so acquireFileLock does not ENOENT before the user fn ever runs. +async function ensureCommitmentsStoreDir(storePath: string): Promise { + await fs.mkdir(path.dirname(storePath), { recursive: true }); +} + +export async function runExclusiveCommitmentsStoreWrite( + storePath: string, + fn: () => Promise, +): Promise { + if (!storePath || typeof storePath !== "string") { + throw new Error( + `runExclusiveCommitmentsStoreWrite: storePath must be a non-empty string, got ${JSON.stringify( + storePath, + )}`, + ); + } + const queue = getOrCreateWriterQueue(storePath); + return await new Promise((resolve, reject) => { + const task: CommitmentsStoreWriterTask = { + fn: async () => { + await ensureCommitmentsStoreDir(storePath); + return await withFileLock(storePath, DEFAULT_COMMITMENTS_LOCK_OPTIONS, fn); + }, + resolve: (value) => resolve(value as T), + reject, + }; + queue.pending.push(task); + void drainCommitmentsStoreWriterQueue(storePath); + }); +} + +export function clearCommitmentsStoreWriterQueuesForTest(): void { + for (const queue of WRITER_QUEUES.values()) { + for (const task of queue.pending) { + task.reject(new Error("commitments store writer queue cleared for test")); + } + } + WRITER_QUEUES.clear(); +} diff --git a/src/commitments/store.test.ts b/src/commitments/store.test.ts index cb0f02eef46..6e70506b64f 100644 --- a/src/commitments/store.test.ts +++ b/src/commitments/store.test.ts @@ -7,6 +7,8 @@ import { listDueCommitmentsForSession, listPendingCommitmentsForScope, loadCommitmentStore, + markCommitmentsAttempted, + markCommitmentsStatus, saveCommitmentStore, } from "./store.js"; import type { CommitmentRecord } from "./types.js"; @@ -232,4 +234,63 @@ describe("commitment store delivery selection", () => { expect(expiredCommitments[0]?.id).toBe("cm_interview"); expect(expiredCommitments[0]?.status).toBe("expired"); }); + + it("serializes concurrent markCommitmentsStatus on disjoint ids without losing a write", async () => { + await useTempStateDir(); + await saveCommitmentStore(undefined, { + version: 1, + commitments: [ + commitment({ id: "cm_raceA", dedupeKey: "race-A" }), + commitment({ id: "cm_raceB", dedupeKey: "race-B" }), + ], + }); + + await Promise.all([ + markCommitmentsStatus({ ids: ["cm_raceA"], status: "dismissed", nowMs }), + markCommitmentsStatus({ ids: ["cm_raceB"], status: "dismissed", nowMs }), + ]); + + const after = await loadCommitmentStore(); + const byId = Object.fromEntries(after.commitments.map((c) => [c.id, c.status])); + expect(byId.cm_raceA).toBe("dismissed"); + expect(byId.cm_raceB).toBe("dismissed"); + }); + + it("serializes concurrent markCommitmentsAttempted bumps without losing the counter", async () => { + await useTempStateDir(); + await saveCommitmentStore(undefined, { + version: 1, + commitments: [commitment({ id: "cm_race_attempts", attempts: 0 })], + }); + + await Promise.all( + Array.from({ length: 5 }, () => + markCommitmentsAttempted({ ids: ["cm_race_attempts"], nowMs }), + ), + ); + + const after = await loadCommitmentStore(); + expect(after.commitments[0]?.attempts).toBe(5); + }); + + it("serializes a markCommitmentsStatus dismiss against a concurrent attempted bump", async () => { + await useTempStateDir(); + await saveCommitmentStore(undefined, { + version: 1, + commitments: [ + commitment({ id: "cm_dismiss_target", dedupeKey: "dismiss-target" }), + commitment({ id: "cm_attempt_target", dedupeKey: "attempt-target", attempts: 2 }), + ], + }); + + await Promise.all([ + markCommitmentsStatus({ ids: ["cm_dismiss_target"], status: "dismissed", nowMs }), + markCommitmentsAttempted({ ids: ["cm_attempt_target"], nowMs }), + ]); + + const after = await loadCommitmentStore(); + const byId = Object.fromEntries(after.commitments.map((c) => [c.id, c])); + expect(byId.cm_dismiss_target?.status).toBe("dismissed"); + expect(byId.cm_attempt_target?.attempts).toBe(3); + }); }); diff --git a/src/commitments/store.ts b/src/commitments/store.ts index 11d22d49eca..c836a704977 100644 --- a/src/commitments/store.ts +++ b/src/commitments/store.ts @@ -9,6 +9,7 @@ import { DEFAULT_COMMITMENT_MAX_PER_HEARTBEAT, resolveCommitmentsConfig, } from "./config.js"; +import { runExclusiveCommitmentsStoreWrite } from "./store-writer.js"; import type { CommitmentCandidate, CommitmentExtractionItem, @@ -331,12 +332,24 @@ function expireStaleCommitmentsInStore(store: CommitmentStoreFile, nowMs: number return changed; } -async function loadCommitmentStoreWithExpiredMarked(nowMs: number): Promise { +// Unchecked variant — runs without queue protection. Callers that already hold +// the commitments-store writer queue must use this to avoid re-entry deadlock. +async function loadAndMarkExpiredUnchecked( + nowMs: number, +): Promise<{ store: CommitmentStoreFile; needsSave: boolean }> { const { store, hadLegacySourceText } = await loadCommitmentStoreInternal(); - if (expireStaleCommitmentsInStore(store, nowMs) || hadLegacySourceText) { - await saveCommitmentStore(undefined, store); - } - return store; + const expireChanged = expireStaleCommitmentsInStore(store, nowMs); + return { store, needsSave: expireChanged || hadLegacySourceText }; +} + +async function loadCommitmentStoreWithExpiredMarked(nowMs: number): Promise { + return await runExclusiveCommitmentsStoreWrite(resolveCommitmentStorePath(), async () => { + const { store, needsSave } = await loadAndMarkExpiredUnchecked(nowMs); + if (needsSave) { + await saveCommitmentStore(undefined, store); + } + return store; + }); } export async function listPendingCommitmentsForScope(params: { @@ -377,47 +390,48 @@ export async function upsertInferredCommitments(params: { return []; } const nowMs = params.nowMs ?? Date.now(); - const store = await loadCommitmentStoreWithExpiredMarked(nowMs); - const created: CommitmentRecord[] = []; const scopeKey = buildCommitmentScopeKey(params.item); - - for (const entry of params.candidates) { - const dedupeKey = entry.candidate.dedupeKey.trim(); - const existingIndex = store.commitments.findIndex( - (commitment) => - buildCommitmentScopeKey(commitment) === scopeKey && - commitment.dedupeKey === dedupeKey && - isActiveStatus(commitment.status), - ); - if (existingIndex >= 0) { - const existing = store.commitments[existingIndex]; - store.commitments[existingIndex] = { - ...existing, - reason: entry.candidate.reason.trim() || existing.reason, - suggestedText: entry.candidate.suggestedText.trim() || existing.suggestedText, - confidence: Math.max(existing.confidence, entry.candidate.confidence), - dueWindow: { - earliestMs: Math.min(existing.dueWindow.earliestMs, entry.earliestMs), - latestMs: Math.max(existing.dueWindow.latestMs, entry.latestMs), - timezone: entry.timezone, - }, - updatedAtMs: nowMs, - }; - continue; + return await runExclusiveCommitmentsStoreWrite(resolveCommitmentStorePath(), async () => { + const { store } = await loadAndMarkExpiredUnchecked(nowMs); + const created: CommitmentRecord[] = []; + for (const entry of params.candidates) { + const dedupeKey = entry.candidate.dedupeKey.trim(); + const existingIndex = store.commitments.findIndex( + (commitment) => + buildCommitmentScopeKey(commitment) === scopeKey && + commitment.dedupeKey === dedupeKey && + isActiveStatus(commitment.status), + ); + if (existingIndex >= 0) { + const existing = store.commitments[existingIndex]; + store.commitments[existingIndex] = { + ...existing, + reason: entry.candidate.reason.trim() || existing.reason, + suggestedText: entry.candidate.suggestedText.trim() || existing.suggestedText, + confidence: Math.max(existing.confidence, entry.candidate.confidence), + dueWindow: { + earliestMs: Math.min(existing.dueWindow.earliestMs, entry.earliestMs), + latestMs: Math.max(existing.dueWindow.latestMs, entry.latestMs), + timezone: entry.timezone, + }, + updatedAtMs: nowMs, + }; + continue; + } + const record = candidateToRecord({ + item: params.item, + candidate: entry.candidate, + nowMs, + earliestMs: entry.earliestMs, + latestMs: entry.latestMs, + timezone: entry.timezone, + }); + store.commitments.push(record); + created.push(record); } - const record = candidateToRecord({ - item: params.item, - candidate: entry.candidate, - nowMs, - earliestMs: entry.earliestMs, - latestMs: entry.latestMs, - timezone: entry.timezone, - }); - store.commitments.push(record); - created.push(record); - } - await saveCommitmentStore(undefined, store); - return created; + await saveCommitmentStore(undefined, store); + return created; + }); } function countSentCommitmentsForSession(params: { @@ -529,23 +543,25 @@ export async function markCommitmentsAttempted(params: { } const idSet = new Set(params.ids); const nowMs = params.nowMs ?? Date.now(); - const store = await loadCommitmentStore(); - let changed = false; - store.commitments = store.commitments.map((commitment) => { - if (!idSet.has(commitment.id)) { - return commitment; + await runExclusiveCommitmentsStoreWrite(resolveCommitmentStorePath(), async () => { + const store = await loadCommitmentStore(); + let changed = false; + store.commitments = store.commitments.map((commitment) => { + if (!idSet.has(commitment.id)) { + return commitment; + } + changed = true; + return { + ...commitment, + attempts: commitment.attempts + 1, + lastAttemptAtMs: nowMs, + updatedAtMs: nowMs, + }; + }); + if (changed) { + await saveCommitmentStore(undefined, store); } - changed = true; - return { - ...commitment, - attempts: commitment.attempts + 1, - lastAttemptAtMs: nowMs, - updatedAtMs: nowMs, - }; }); - if (changed) { - await saveCommitmentStore(undefined, store); - } } export async function markCommitmentsStatus(params: { @@ -559,25 +575,27 @@ export async function markCommitmentsStatus(params: { } const idSet = new Set(params.ids); const nowMs = params.nowMs ?? Date.now(); - const store = await loadCommitmentStore(); - let changed = false; - store.commitments = store.commitments.map((commitment) => { - if (!idSet.has(commitment.id) || !isActiveStatus(commitment.status)) { - return commitment; + await runExclusiveCommitmentsStoreWrite(resolveCommitmentStorePath(), async () => { + const store = await loadCommitmentStore(); + let changed = false; + store.commitments = store.commitments.map((commitment) => { + if (!idSet.has(commitment.id) || !isActiveStatus(commitment.status)) { + return commitment; + } + changed = true; + return { + ...commitment, + status: params.status, + updatedAtMs: nowMs, + ...(params.status === "sent" ? { sentAtMs: nowMs } : {}), + ...(params.status === "dismissed" ? { dismissedAtMs: nowMs } : {}), + ...(params.status === "expired" ? { expiredAtMs: nowMs } : {}), + }; + }); + if (changed) { + await saveCommitmentStore(undefined, store); } - changed = true; - return { - ...commitment, - status: params.status, - updatedAtMs: nowMs, - ...(params.status === "sent" ? { sentAtMs: nowMs } : {}), - ...(params.status === "dismissed" ? { dismissedAtMs: nowMs } : {}), - ...(params.status === "expired" ? { expiredAtMs: nowMs } : {}), - }; }); - if (changed) { - await saveCommitmentStore(undefined, store); - } } export async function listCommitments(params?: {