fix(commitments): serialize load-modify-save with in-process queue + cross-process file lock (#86326)

Summary:
- The PR adds a commitments-store writer helper, wraps load-modify-save mutators and expiry cleanup with a per-path queue plus `withFileLock`, adds three concurrency regressions, and updates the changelog.
- PR surface: Source +153, Tests +61, Docs +1. Total +215 across 4 files.
- Reproducibility: yes. Source inspection on current main shows the unqueued load-modify-save mutation path, a ... inked proof log shows the Promise.all repro changing from 20/20 lost writes before the patch to 0/20 after.

Automerge notes:
- PR branch already contained follow-up commit before automerge: fix(commitments): serialize load-modify-save with in-process queue + …

Validation:
- ClawSweeper review passed for head a349f41ccf.
- Required merge gates passed before the squash merge.

Prepared head SHA: a349f41ccf
Review: https://github.com/openclaw/openclaw/pull/86326#issuecomment-4531553610

Co-authored-by: ai-hpc <mail.speedy.hpc@hotmail.com>
Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com>
Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com>
Approved-by: takhoffman
Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com>
This commit is contained in:
clawsweeper[bot]
2026-05-25 05:18:19 +00:00
committed by GitHub
parent dd47e479ae
commit d3c293d9c8
4 changed files with 291 additions and 76 deletions

View File

@@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Agents/commitments: serialize commitment store load-modify-save writes so concurrent heartbeat and CLI updates no longer lose dismissal, sent, or attempt state. (#81153) Thanks @ai-hpc.
- Gateway/perf: tighten restart and startup benchmark failure handling so long profiling runs, failed probes, and fresh Linux runners no longer produce false passing or `n/a` results.
- Checks: keep intentional Knip unused-file findings optional so full CI and sparse proof workspaces stay aligned.
- Docker: restore writable `~/.config` in runtime images. Fixes #85968. Thanks @hkoessler and @Bartok9.

View File

@@ -0,0 +1,135 @@
// Per-store-path mutation gate for the commitments store. Mirrors the
// in-process queue + cross-process file-lock pattern in
// src/plugin-sdk/persistent-dedupe.ts (issue #81145).
import fs from "node:fs/promises";
import path from "node:path";
import { type FileLockOptions, withFileLock } from "../plugin-sdk/file-lock.js";
type CommitmentsStoreWriterTask = {
fn: () => Promise<unknown>;
resolve: (value: unknown) => void;
reject: (reason: unknown) => void;
};
type CommitmentsStoreWriterQueue = {
running: boolean;
pending: CommitmentsStoreWriterTask[];
drainPromise: Promise<void> | null;
};
const WRITER_QUEUES = new Map<string, CommitmentsStoreWriterQueue>();
// Matches src/plugin-sdk/persistent-dedupe.ts so both lock-protected stores share tuning.
const DEFAULT_COMMITMENTS_LOCK_OPTIONS: FileLockOptions = {
retries: {
retries: 6,
factor: 1.35,
minTimeout: 8,
maxTimeout: 180,
randomize: true,
},
stale: 60_000,
};
function getOrCreateWriterQueue(storePath: string): CommitmentsStoreWriterQueue {
const existing = WRITER_QUEUES.get(storePath);
if (existing) {
return existing;
}
const created: CommitmentsStoreWriterQueue = {
running: false,
pending: [],
drainPromise: null,
};
WRITER_QUEUES.set(storePath, created);
return created;
}
async function drainCommitmentsStoreWriterQueue(storePath: string): Promise<void> {
const queue = WRITER_QUEUES.get(storePath);
if (!queue) {
return;
}
if (queue.drainPromise) {
await queue.drainPromise;
return;
}
queue.running = true;
queue.drainPromise = (async () => {
try {
while (queue.pending.length > 0) {
const task = queue.pending.shift();
if (!task) {
continue;
}
let result: unknown;
let failed: unknown;
let hasFailure = false;
try {
result = await task.fn();
} catch (err) {
hasFailure = true;
failed = err;
}
if (hasFailure) {
task.reject(failed);
continue;
}
task.resolve(result);
}
} finally {
queue.running = false;
queue.drainPromise = null;
if (queue.pending.length === 0) {
WRITER_QUEUES.delete(storePath);
} else {
queueMicrotask(() => {
void drainCommitmentsStoreWriterQueue(storePath);
});
}
}
})();
await queue.drainPromise;
}
// The advisory lockfile lives next to the data file; create the parent dir up
// front so acquireFileLock does not ENOENT before the user fn ever runs.
async function ensureCommitmentsStoreDir(storePath: string): Promise<void> {
await fs.mkdir(path.dirname(storePath), { recursive: true });
}
export async function runExclusiveCommitmentsStoreWrite<T>(
storePath: string,
fn: () => Promise<T>,
): Promise<T> {
if (!storePath || typeof storePath !== "string") {
throw new Error(
`runExclusiveCommitmentsStoreWrite: storePath must be a non-empty string, got ${JSON.stringify(
storePath,
)}`,
);
}
const queue = getOrCreateWriterQueue(storePath);
return await new Promise<T>((resolve, reject) => {
const task: CommitmentsStoreWriterTask = {
fn: async () => {
await ensureCommitmentsStoreDir(storePath);
return await withFileLock(storePath, DEFAULT_COMMITMENTS_LOCK_OPTIONS, fn);
},
resolve: (value) => resolve(value as T),
reject,
};
queue.pending.push(task);
void drainCommitmentsStoreWriterQueue(storePath);
});
}
export function clearCommitmentsStoreWriterQueuesForTest(): void {
for (const queue of WRITER_QUEUES.values()) {
for (const task of queue.pending) {
task.reject(new Error("commitments store writer queue cleared for test"));
}
}
WRITER_QUEUES.clear();
}

View File

@@ -7,6 +7,8 @@ import {
listDueCommitmentsForSession,
listPendingCommitmentsForScope,
loadCommitmentStore,
markCommitmentsAttempted,
markCommitmentsStatus,
saveCommitmentStore,
} from "./store.js";
import type { CommitmentRecord } from "./types.js";
@@ -232,4 +234,63 @@ describe("commitment store delivery selection", () => {
expect(expiredCommitments[0]?.id).toBe("cm_interview");
expect(expiredCommitments[0]?.status).toBe("expired");
});
it("serializes concurrent markCommitmentsStatus on disjoint ids without losing a write", async () => {
await useTempStateDir();
await saveCommitmentStore(undefined, {
version: 1,
commitments: [
commitment({ id: "cm_raceA", dedupeKey: "race-A" }),
commitment({ id: "cm_raceB", dedupeKey: "race-B" }),
],
});
await Promise.all([
markCommitmentsStatus({ ids: ["cm_raceA"], status: "dismissed", nowMs }),
markCommitmentsStatus({ ids: ["cm_raceB"], status: "dismissed", nowMs }),
]);
const after = await loadCommitmentStore();
const byId = Object.fromEntries(after.commitments.map((c) => [c.id, c.status]));
expect(byId.cm_raceA).toBe("dismissed");
expect(byId.cm_raceB).toBe("dismissed");
});
it("serializes concurrent markCommitmentsAttempted bumps without losing the counter", async () => {
await useTempStateDir();
await saveCommitmentStore(undefined, {
version: 1,
commitments: [commitment({ id: "cm_race_attempts", attempts: 0 })],
});
await Promise.all(
Array.from({ length: 5 }, () =>
markCommitmentsAttempted({ ids: ["cm_race_attempts"], nowMs }),
),
);
const after = await loadCommitmentStore();
expect(after.commitments[0]?.attempts).toBe(5);
});
it("serializes a markCommitmentsStatus dismiss against a concurrent attempted bump", async () => {
await useTempStateDir();
await saveCommitmentStore(undefined, {
version: 1,
commitments: [
commitment({ id: "cm_dismiss_target", dedupeKey: "dismiss-target" }),
commitment({ id: "cm_attempt_target", dedupeKey: "attempt-target", attempts: 2 }),
],
});
await Promise.all([
markCommitmentsStatus({ ids: ["cm_dismiss_target"], status: "dismissed", nowMs }),
markCommitmentsAttempted({ ids: ["cm_attempt_target"], nowMs }),
]);
const after = await loadCommitmentStore();
const byId = Object.fromEntries(after.commitments.map((c) => [c.id, c]));
expect(byId.cm_dismiss_target?.status).toBe("dismissed");
expect(byId.cm_attempt_target?.attempts).toBe(3);
});
});

View File

@@ -9,6 +9,7 @@ import {
DEFAULT_COMMITMENT_MAX_PER_HEARTBEAT,
resolveCommitmentsConfig,
} from "./config.js";
import { runExclusiveCommitmentsStoreWrite } from "./store-writer.js";
import type {
CommitmentCandidate,
CommitmentExtractionItem,
@@ -331,12 +332,24 @@ function expireStaleCommitmentsInStore(store: CommitmentStoreFile, nowMs: number
return changed;
}
async function loadCommitmentStoreWithExpiredMarked(nowMs: number): Promise<CommitmentStoreFile> {
// Unchecked variant — runs without queue protection. Callers that already hold
// the commitments-store writer queue must use this to avoid re-entry deadlock.
async function loadAndMarkExpiredUnchecked(
nowMs: number,
): Promise<{ store: CommitmentStoreFile; needsSave: boolean }> {
const { store, hadLegacySourceText } = await loadCommitmentStoreInternal();
if (expireStaleCommitmentsInStore(store, nowMs) || hadLegacySourceText) {
await saveCommitmentStore(undefined, store);
}
return store;
const expireChanged = expireStaleCommitmentsInStore(store, nowMs);
return { store, needsSave: expireChanged || hadLegacySourceText };
}
async function loadCommitmentStoreWithExpiredMarked(nowMs: number): Promise<CommitmentStoreFile> {
return await runExclusiveCommitmentsStoreWrite(resolveCommitmentStorePath(), async () => {
const { store, needsSave } = await loadAndMarkExpiredUnchecked(nowMs);
if (needsSave) {
await saveCommitmentStore(undefined, store);
}
return store;
});
}
export async function listPendingCommitmentsForScope(params: {
@@ -377,47 +390,48 @@ export async function upsertInferredCommitments(params: {
return [];
}
const nowMs = params.nowMs ?? Date.now();
const store = await loadCommitmentStoreWithExpiredMarked(nowMs);
const created: CommitmentRecord[] = [];
const scopeKey = buildCommitmentScopeKey(params.item);
for (const entry of params.candidates) {
const dedupeKey = entry.candidate.dedupeKey.trim();
const existingIndex = store.commitments.findIndex(
(commitment) =>
buildCommitmentScopeKey(commitment) === scopeKey &&
commitment.dedupeKey === dedupeKey &&
isActiveStatus(commitment.status),
);
if (existingIndex >= 0) {
const existing = store.commitments[existingIndex];
store.commitments[existingIndex] = {
...existing,
reason: entry.candidate.reason.trim() || existing.reason,
suggestedText: entry.candidate.suggestedText.trim() || existing.suggestedText,
confidence: Math.max(existing.confidence, entry.candidate.confidence),
dueWindow: {
earliestMs: Math.min(existing.dueWindow.earliestMs, entry.earliestMs),
latestMs: Math.max(existing.dueWindow.latestMs, entry.latestMs),
timezone: entry.timezone,
},
updatedAtMs: nowMs,
};
continue;
return await runExclusiveCommitmentsStoreWrite(resolveCommitmentStorePath(), async () => {
const { store } = await loadAndMarkExpiredUnchecked(nowMs);
const created: CommitmentRecord[] = [];
for (const entry of params.candidates) {
const dedupeKey = entry.candidate.dedupeKey.trim();
const existingIndex = store.commitments.findIndex(
(commitment) =>
buildCommitmentScopeKey(commitment) === scopeKey &&
commitment.dedupeKey === dedupeKey &&
isActiveStatus(commitment.status),
);
if (existingIndex >= 0) {
const existing = store.commitments[existingIndex];
store.commitments[existingIndex] = {
...existing,
reason: entry.candidate.reason.trim() || existing.reason,
suggestedText: entry.candidate.suggestedText.trim() || existing.suggestedText,
confidence: Math.max(existing.confidence, entry.candidate.confidence),
dueWindow: {
earliestMs: Math.min(existing.dueWindow.earliestMs, entry.earliestMs),
latestMs: Math.max(existing.dueWindow.latestMs, entry.latestMs),
timezone: entry.timezone,
},
updatedAtMs: nowMs,
};
continue;
}
const record = candidateToRecord({
item: params.item,
candidate: entry.candidate,
nowMs,
earliestMs: entry.earliestMs,
latestMs: entry.latestMs,
timezone: entry.timezone,
});
store.commitments.push(record);
created.push(record);
}
const record = candidateToRecord({
item: params.item,
candidate: entry.candidate,
nowMs,
earliestMs: entry.earliestMs,
latestMs: entry.latestMs,
timezone: entry.timezone,
});
store.commitments.push(record);
created.push(record);
}
await saveCommitmentStore(undefined, store);
return created;
await saveCommitmentStore(undefined, store);
return created;
});
}
function countSentCommitmentsForSession(params: {
@@ -529,23 +543,25 @@ export async function markCommitmentsAttempted(params: {
}
const idSet = new Set(params.ids);
const nowMs = params.nowMs ?? Date.now();
const store = await loadCommitmentStore();
let changed = false;
store.commitments = store.commitments.map((commitment) => {
if (!idSet.has(commitment.id)) {
return commitment;
await runExclusiveCommitmentsStoreWrite(resolveCommitmentStorePath(), async () => {
const store = await loadCommitmentStore();
let changed = false;
store.commitments = store.commitments.map((commitment) => {
if (!idSet.has(commitment.id)) {
return commitment;
}
changed = true;
return {
...commitment,
attempts: commitment.attempts + 1,
lastAttemptAtMs: nowMs,
updatedAtMs: nowMs,
};
});
if (changed) {
await saveCommitmentStore(undefined, store);
}
changed = true;
return {
...commitment,
attempts: commitment.attempts + 1,
lastAttemptAtMs: nowMs,
updatedAtMs: nowMs,
};
});
if (changed) {
await saveCommitmentStore(undefined, store);
}
}
export async function markCommitmentsStatus(params: {
@@ -559,25 +575,27 @@ export async function markCommitmentsStatus(params: {
}
const idSet = new Set(params.ids);
const nowMs = params.nowMs ?? Date.now();
const store = await loadCommitmentStore();
let changed = false;
store.commitments = store.commitments.map((commitment) => {
if (!idSet.has(commitment.id) || !isActiveStatus(commitment.status)) {
return commitment;
await runExclusiveCommitmentsStoreWrite(resolveCommitmentStorePath(), async () => {
const store = await loadCommitmentStore();
let changed = false;
store.commitments = store.commitments.map((commitment) => {
if (!idSet.has(commitment.id) || !isActiveStatus(commitment.status)) {
return commitment;
}
changed = true;
return {
...commitment,
status: params.status,
updatedAtMs: nowMs,
...(params.status === "sent" ? { sentAtMs: nowMs } : {}),
...(params.status === "dismissed" ? { dismissedAtMs: nowMs } : {}),
...(params.status === "expired" ? { expiredAtMs: nowMs } : {}),
};
});
if (changed) {
await saveCommitmentStore(undefined, store);
}
changed = true;
return {
...commitment,
status: params.status,
updatedAtMs: nowMs,
...(params.status === "sent" ? { sentAtMs: nowMs } : {}),
...(params.status === "dismissed" ? { dismissedAtMs: nowMs } : {}),
...(params.status === "expired" ? { expiredAtMs: nowMs } : {}),
};
});
if (changed) {
await saveCommitmentStore(undefined, store);
}
}
export async function listCommitments(params?: {