diff --git a/CHANGELOG.md b/CHANGELOG.md index dae426d06f2..101c5f74b84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -598,6 +598,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Config: serialize and retry semantic config mutations centrally, so concurrent commands can rebase safe changes instead of clobbering or hand-rolling command-local retry loops. - Models/auth: keep `agents.defaults.model` when `openclaw models auth login` runs without `--set-default`, so provider onboarding patches add models without silently switching the primary. Fixes #78162. (#78241) Thanks @neeravmakwana. - Control UI/chat: localize the remaining chat welcome, composer, run-control, session/model/thinking selector, and zh-CN Skills labels through the Control UI i18n pipeline so non-English browser locales no longer see those chat controls in English. Fixes #79937. Thanks @BunsDev. - Control UI: surface browser-blocked WebSocket security failures with wss:// and loopback dashboard guidance instead of leaving the connection on a dead security error. Thanks @BunsDev. diff --git a/src/cli/plugins-install-record-commit.ts b/src/cli/plugins-install-record-commit.ts index 4cda3f3c36c..468017eb63d 100644 --- a/src/cli/plugins-install-record-commit.ts +++ b/src/cli/plugins-install-record-commit.ts @@ -1,5 +1,14 @@ import { isDeepStrictEqual } from "node:util"; -import { replaceConfigFile } from "../config/config.js"; +import { + replaceConfigFile, + resolveConfigWriteAfterWrite, + transformConfigFileWithRetry, + type ConfigMutationCommit, + type ConfigMutationResult, + type ConfigMutationContext, + type ConfigTransformResult, + type TransformConfigFileWithRetryParams, +} from "../config/config.js"; import type { ConfigWriteOptions } from "../config/io.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import type { PluginInstallRecord } from "../config/types.plugins.js"; @@ -21,6 +30,19 @@ function mergeUnsetPaths( type ConfigCommit = (config: OpenClawConfig, writeOptions?: ConfigWriteOptions) => Promise; const PLUGIN_SOURCE_CHANGED_RESTART_REASON = "plugin source changed"; +function mergeAfterWrite( + writeOptions: ConfigWriteOptions | undefined, + afterWrite: ConfigWriteOptions["afterWrite"], +): ConfigWriteOptions | undefined { + if (afterWrite === undefined) { + return writeOptions; + } + return { + ...writeOptions, + afterWrite, + }; +} + async function commitPluginInstallRecordsWithWriter(params: { previousInstallRecords?: Record; nextInstallRecords: Record; @@ -141,3 +163,53 @@ export async function commitConfigWithPendingPluginInstalls(params: { }, }); } + +export async function transformConfigWithPendingPluginInstalls( + params: Omit, "commit">, +): Promise> { + const commit: ConfigMutationCommit = async ({ nextConfig, snapshot, baseHash, writeOptions }) => { + const requestedAfterWrite = params.afterWrite ?? params.writeOptions?.afterWrite; + const committed = await commitConfigWriteWithPendingPluginInstalls({ + nextConfig, + ...(writeOptions ? { writeOptions: mergeAfterWrite(writeOptions, params.afterWrite) } : {}), + commit: async (config, commitWriteOptions) => { + await replaceConfigFile({ + nextConfig: config, + snapshot, + writeOptions: commitWriteOptions ?? {}, + ...(baseHash !== undefined ? { baseHash } : {}), + }); + }, + }); + const afterWrite = resolveConfigWriteAfterWrite( + requestedAfterWrite ?? + (committed.movedInstallRecords + ? { mode: "restart", reason: PLUGIN_SOURCE_CHANGED_RESTART_REASON } + : undefined), + ); + return { + config: committed.config, + afterWrite, + }; + }; + + return await transformConfigFileWithRetry({ + ...params, + commit, + }); +} + +export async function mutateConfigWithPendingPluginInstalls( + params: Omit, "commit" | "transform"> & { + mutate: (draft: OpenClawConfig, context: ConfigMutationContext) => Promise | T | void; + }, +): Promise> { + return await transformConfigWithPendingPluginInstalls({ + ...params, + transform: async (currentConfig, context): Promise> => { + const draft = structuredClone(currentConfig) as OpenClawConfig; + const result = (await params.mutate(draft, context)) as T | undefined; + return { nextConfig: draft, result }; + }, + }); +} diff --git a/src/commands/agents.add.test.ts b/src/commands/agents.add.test.ts index 76d9b1513a8..fbdda748921 100644 --- a/src/commands/agents.add.test.ts +++ b/src/commands/agents.add.test.ts @@ -13,6 +13,45 @@ const writeConfigFileMock = vi.hoisted(() => vi.fn().mockResolvedValue(undefined const replaceConfigFileMock = vi.hoisted(() => vi.fn(async (params: { nextConfig: unknown }) => await writeConfigFileMock(params.nextConfig)), ); +const transformConfigWithPendingPluginInstallsMock = vi.hoisted(() => + vi.fn( + async (params: { + transform: ( + config: Record, + context: { + snapshot: Record; + previousHash: string | null; + attempt: number; + }, + ) => + | Promise<{ nextConfig: unknown; result?: unknown }> + | { nextConfig: unknown; result?: unknown }; + }) => { + const snapshot = (await readConfigFileSnapshotMock()) as { + path?: string; + hash?: string; + config?: Record; + sourceConfig?: Record; + }; + const transformed = await params.transform(snapshot.sourceConfig ?? snapshot.config ?? {}, { + snapshot, + previousHash: snapshot.hash ?? null, + attempt: 0, + }); + await writeConfigFileMock(transformed.nextConfig); + return { + path: snapshot.path ?? "/tmp/openclaw.json", + previousHash: snapshot.hash ?? null, + snapshot, + nextConfig: transformed.nextConfig, + result: transformed.result, + attempts: 1, + afterWrite: { mode: "auto" }, + followUp: { mode: "auto", requiresRestart: false }, + }; + }, + ), +); const wizardMocks = vi.hoisted(() => ({ createClackPrompter: vi.fn(), @@ -25,6 +64,13 @@ vi.mock("../config/config.js", async () => ({ replaceConfigFile: replaceConfigFileMock, })); +vi.mock("../cli/plugins-install-record-commit.js", async () => ({ + ...(await vi.importActual( + "../cli/plugins-install-record-commit.js", + )), + transformConfigWithPendingPluginInstalls: transformConfigWithPendingPluginInstallsMock, +})); + vi.mock("../wizard/clack-prompter.js", () => ({ createClackPrompter: wizardMocks.createClackPrompter, })); @@ -44,6 +90,7 @@ describe("agents add command", () => { readConfigFileSnapshotMock.mockClear(); writeConfigFileMock.mockClear(); replaceConfigFileMock.mockClear(); + transformConfigWithPendingPluginInstallsMock.mockClear(); wizardMocks.createClackPrompter.mockClear(); runtime.log.mockClear(); runtime.error.mockClear(); @@ -227,4 +274,98 @@ describe("agents add command", () => { }), ).toBe('OAuth profiles stay shared from "main" unless this agent signs in separately.'); }); + + describe("non-interactive config mutation", () => { + it("rebases agent creation on the latest config snapshot", async () => { + readConfigFileSnapshotMock + .mockResolvedValueOnce({ + ...baseConfigSnapshot, + hash: "hash-1", + config: { agents: { list: [] } }, + sourceConfig: { agents: { list: [] } }, + }) + .mockResolvedValueOnce({ + ...baseConfigSnapshot, + hash: "hash-2", + config: { agents: { list: [{ id: "other-agent" }] } }, + sourceConfig: { agents: { list: [{ id: "other-agent" }] } }, + }); + + await agentsAddCommand({ name: "Work", workspace: "/tmp/work" }, runtime, { + hasFlags: true, + }); + + expect(transformConfigWithPendingPluginInstallsMock).toHaveBeenCalledOnce(); + expect(writeConfigFileMock).toHaveBeenCalledWith( + expect.objectContaining({ + agents: { + list: [ + { id: "other-agent" }, + expect.objectContaining({ id: "work", workspace: "/tmp/work" }), + ], + }, + }), + ); + expect(runtime.exit).not.toHaveBeenCalled(); + expect(runtime.error).not.toHaveBeenCalled(); + }); + + it("fails instead of overwriting when the same agent appears before commit", async () => { + readConfigFileSnapshotMock + .mockResolvedValueOnce({ + ...baseConfigSnapshot, + hash: "hash-1", + config: { agents: { list: [] } }, + sourceConfig: { agents: { list: [] } }, + }) + .mockResolvedValueOnce({ + ...baseConfigSnapshot, + hash: "hash-2", + config: { agents: { list: [{ id: "work", workspace: "/tmp/other" }] } }, + sourceConfig: { agents: { list: [{ id: "work", workspace: "/tmp/other" }] } }, + }); + + await agentsAddCommand({ name: "Work", workspace: "/tmp/work" }, runtime, { + hasFlags: true, + }); + + expect(writeConfigFileMock).not.toHaveBeenCalled(); + expect(runtime.error).toHaveBeenCalledWith('Agent "work" already exists.'); + expect(runtime.exit).toHaveBeenCalledWith(1); + }); + + it("reports binding conflicts from the committed mutation", async () => { + readConfigFileSnapshotMock + .mockResolvedValueOnce({ + ...baseConfigSnapshot, + hash: "hash-1", + config: { agents: { list: [] } }, + sourceConfig: { agents: { list: [] } }, + }) + .mockResolvedValueOnce({ + ...baseConfigSnapshot, + hash: "hash-2", + config: { + agents: { list: [{ id: "other-agent" }] }, + bindings: [{ type: "route", agentId: "other-agent", match: { channel: "telegram" } }], + }, + sourceConfig: { + agents: { list: [{ id: "other-agent" }] }, + bindings: [{ type: "route", agentId: "other-agent", match: { channel: "telegram" } }], + }, + }); + + await agentsAddCommand( + { name: "Work", workspace: "/tmp/work", bind: ["telegram"], json: true }, + runtime, + { hasFlags: true }, + ); + + const payload = JSON.parse(String(runtime.log.mock.calls.at(-1)?.[0])) as { + bindings: { added: string[]; conflicts: string[] }; + }; + expect(payload.bindings.added).toEqual([]); + expect(payload.bindings.conflicts).toEqual(["telegram (agent=other-agent)"]); + }); + }); }); diff --git a/src/commands/agents.commands.add.ts b/src/commands/agents.commands.add.ts index e3f562d1e6f..9010301bc43 100644 --- a/src/commands/agents.commands.add.ts +++ b/src/commands/agents.commands.add.ts @@ -15,7 +15,10 @@ import { loadPersistedAuthProfileStore, } from "../agents/auth-profiles/persisted.js"; import { formatCliCommand } from "../cli/command-format.js"; -import { commitConfigWithPendingPluginInstalls } from "../cli/plugins-install-record-commit.js"; +import { + commitConfigWithPendingPluginInstalls, + transformConfigWithPendingPluginInstalls, +} from "../cli/plugins-install-record-commit.js"; import { logConfigUpdated } from "../config/logging.js"; import { pathExists } from "../infra/fs-safe.js"; import { saveJsonFile } from "../infra/json-file.js"; @@ -53,6 +56,24 @@ type AgentsAddOptions = { json?: boolean; }; +type AgentBindingResult = ReturnType; + +type AgentsAddMutationResult = { + agentDir: string; + bindingResult: AgentBindingResult; +}; + +class AgentsAddMutationError extends Error { + constructor(message: string) { + super(message); + this.name = "AgentsAddMutationError"; + } +} + +function emptyBindingResult(config: Parameters[0]): AgentBindingResult { + return { config, added: [], updated: [], skipped: [], conflicts: [] }; +} + async function copyPortableAuthProfiles(params: { destAuthPath: string; sourceAgentDir: string; @@ -147,44 +168,64 @@ export async function agentsAddCommand( } const workspaceDir = resolveUserPath(workspaceFlag); - const agentDir = opts.agentDir?.trim() + const explicitAgentDir = opts.agentDir?.trim() ? resolveUserPath(opts.agentDir.trim()) - : resolveAgentDir(cfg, agentId); + : undefined; const model = opts.model?.trim(); - const nextConfig = applyAgentConfig(cfg, { - agentId, - name: nameInput, - workspace: workspaceDir, - agentDir, - ...(model ? { model } : {}), - }); - const bindingParse = parseBindingSpecs({ - agentId, - specs: opts.bind, - config: nextConfig, - }); - if (bindingParse.errors.length > 0) { - runtime.error(bindingParse.errors.join("\n")); - runtime.exit(1); - return; + let committed; + try { + committed = await transformConfigWithPendingPluginInstalls({ + transform: (latestConfig) => { + if (findAgentEntryIndex(listAgentEntries(latestConfig), agentId) >= 0) { + throw new AgentsAddMutationError(`Agent "${agentId}" already exists.`); + } + const agentDir = explicitAgentDir ?? resolveAgentDir(latestConfig, agentId); + const nextConfig = applyAgentConfig(latestConfig, { + agentId, + name: nameInput, + workspace: workspaceDir, + agentDir, + ...(model ? { model } : {}), + }); + const bindingParse = parseBindingSpecs({ + agentId, + specs: opts.bind, + config: nextConfig, + }); + if (bindingParse.errors.length > 0) { + throw new AgentsAddMutationError(bindingParse.errors.join("\n")); + } + const bindingResult = + bindingParse.bindings.length > 0 + ? applyAgentBindings(nextConfig, bindingParse.bindings) + : emptyBindingResult(nextConfig); + return { + nextConfig: bindingResult.config, + result: { agentDir, bindingResult }, + }; + }, + }); + } catch (err) { + if (err instanceof AgentsAddMutationError) { + runtime.error(err.message); + runtime.exit(1); + return; + } + throw err; } - const bindingResult = - bindingParse.bindings.length > 0 - ? applyAgentBindings(nextConfig, bindingParse.bindings) - : { config: nextConfig, added: [], updated: [], skipped: [], conflicts: [] }; - - await commitConfigWithPendingPluginInstalls({ - nextConfig: bindingResult.config, - ...(baseHash !== undefined ? { baseHash } : {}), - }); + const mutationResult = committed.result; + if (!mutationResult) { + throw new Error("Agent config mutation did not return a result."); + } + const { agentDir, bindingResult } = mutationResult; if (!opts.json) { logConfigUpdated(runtime); } const quietRuntime = opts.json ? createQuietRuntime(runtime) : runtime; await ensureWorkspaceAndSessions(workspaceDir, quietRuntime, { - skipBootstrap: Boolean(bindingResult.config.agents?.defaults?.skipBootstrap), - skipOptionalBootstrapFiles: bindingResult.config.agents?.defaults?.skipOptionalBootstrapFiles, + skipBootstrap: Boolean(committed.nextConfig.agents?.defaults?.skipBootstrap), + skipOptionalBootstrapFiles: committed.nextConfig.agents?.defaults?.skipOptionalBootstrapFiles, agentId, }); diff --git a/src/config/config.ts b/src/config/config.ts index 9ce646c9b02..c67e3021688 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -43,7 +43,25 @@ export type { ConfigWriteNotification, ReadConfigFileSnapshotWithPluginMetadataResult, } from "./io.js"; -export { ConfigMutationConflictError, mutateConfigFile, replaceConfigFile } from "./mutate.js"; +export { + ConfigMutationConflictError, + mutateConfigFile, + mutateConfigFileWithRetry, + replaceConfigFile, + transformConfigFile, + transformConfigFileWithRetry, +} from "./mutate.js"; +export type { + ConfigMutationCommit, + ConfigMutationCommitParams, + ConfigMutationCommitResult, + ConfigMutationContext, + ConfigMutationIO, + ConfigMutationResult, + ConfigTransformResult, + TransformConfigFileParams, + TransformConfigFileWithRetryParams, +} from "./mutate.js"; export { assertConfigWriteAllowedInCurrentMode, NixModeConfigMutationError, diff --git a/src/config/io.ts b/src/config/io.ts index 25761a0b7ad..9722ee47631 100644 --- a/src/config/io.ts +++ b/src/config/io.ts @@ -2428,6 +2428,7 @@ export async function writeConfigFile( nextCfg = coerceConfig(applyMergePatch(runtimeConfigSourceSnapshot!, runtimePatch)); } const writeResult = await io.writeConfigFile(nextCfg, { + baseSnapshot: options.baseSnapshot, envSnapshotForRestore: resolveWriteEnvSnapshotForPath({ actualConfigPath: io.configPath, expectedConfigPath: options.expectedConfigPath, @@ -2438,6 +2439,7 @@ export async function writeConfigFile( explicitSetValueSource: options.explicitSetPaths ? (options.explicitSetValueSource ?? cfg) : undefined, + afterWrite: options.afterWrite, allowDestructiveWrite: options.allowDestructiveWrite, allowConfigSizeDrop: options.allowConfigSizeDrop, skipRuntimeSnapshotRefresh: options.skipRuntimeSnapshotRefresh, diff --git a/src/config/mutate.test.ts b/src/config/mutate.test.ts index 33ae28e235f..64d392a0180 100644 --- a/src/config/mutate.test.ts +++ b/src/config/mutate.test.ts @@ -2,7 +2,12 @@ import fs from "node:fs/promises"; import path from "node:path"; import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { createSuiteTempRootTracker } from "../test-helpers/temp-dir.js"; -import { ConfigMutationConflictError, mutateConfigFile, replaceConfigFile } from "./mutate.js"; +import { + ConfigMutationConflictError, + mutateConfigFile, + replaceConfigFile, + transformConfigFileWithRetry, +} from "./mutate.js"; import { registerRuntimeConfigWriteListener, resetConfigRuntimeState } from "./runtime-snapshot.js"; import type { ConfigFileSnapshot, OpenClawConfig } from "./types.js"; @@ -11,8 +16,16 @@ const ioMocks = vi.hoisted(() => ({ resolveConfigSnapshotHash: vi.fn(), writeConfigFile: vi.fn(), })); +const validationMocks = vi.hoisted(() => ({ + validateConfigObjectWithPlugins: vi.fn((config: OpenClawConfig) => ({ + ok: true, + config, + warnings: [], + })), +})); vi.mock("./io.js", () => ioMocks); +vi.mock("./validation.js", () => validationMocks); function createSnapshot(params: { hash: string; @@ -61,6 +74,13 @@ describe("config mutate helpers", () => { beforeEach(() => { vi.clearAllMocks(); resetConfigRuntimeState(); + validationMocks.validateConfigObjectWithPlugins.mockImplementation( + (config: OpenClawConfig) => ({ + ok: true, + config, + warnings: [], + }), + ); ioMocks.resolveConfigSnapshotHash.mockImplementation( (snapshot: { hash?: string }) => snapshot.hash ?? null, ); @@ -103,7 +123,125 @@ describe("config mutate helpers", () => { auth: { mode: "token" }, }, }, - { expectedConfigPath: snapshot.path, afterWrite: { mode: "auto" } }, + { baseSnapshot: snapshot, expectedConfigPath: snapshot.path, afterWrite: { mode: "auto" } }, + ); + }); + + it("retries transform mutations on stale config conflicts", async () => { + const initial = createSnapshot({ + hash: "hash-1", + sourceConfig: { agents: { list: [] } }, + }); + const fresh = createSnapshot({ + hash: "hash-2", + sourceConfig: { agents: { list: [{ id: "other-agent" }] } }, + }); + ioMocks.readConfigFileSnapshotForWrite + .mockResolvedValueOnce({ + snapshot: initial, + writeOptions: { expectedConfigPath: initial.path }, + }) + .mockResolvedValueOnce({ + snapshot: fresh, + writeOptions: { expectedConfigPath: fresh.path }, + }); + ioMocks.writeConfigFile + .mockRejectedValueOnce(new ConfigMutationConflictError("stale", { currentHash: "hash-2" })) + .mockResolvedValueOnce(undefined); + + const result = await transformConfigFileWithRetry({ + io: ioMocks, + transform(config, context) { + return { + nextConfig: { + ...config, + agents: { + list: [...(config.agents?.list ?? []), { id: "work" }], + }, + }, + result: context.attempt, + }; + }, + }); + + expect(result.attempts).toBe(2); + expect(result.result).toBe(1); + expect(ioMocks.writeConfigFile).toHaveBeenCalledTimes(2); + expect(ioMocks.writeConfigFile).toHaveBeenNthCalledWith( + 2, + { + agents: { + list: [{ id: "other-agent" }, { id: "work" }], + }, + }, + { baseSnapshot: fresh, expectedConfigPath: fresh.path, afterWrite: { mode: "auto" } }, + ); + }); + + it("serializes same-process transform mutations before reading snapshots", async () => { + const initial = createSnapshot({ + hash: "hash-1", + sourceConfig: { agents: { list: [] } }, + }); + const fresh = createSnapshot({ + hash: "hash-2", + sourceConfig: { agents: { list: [{ id: "first" }] } }, + }); + ioMocks.readConfigFileSnapshotForWrite + .mockResolvedValueOnce({ + snapshot: initial, + writeOptions: { expectedConfigPath: initial.path }, + }) + .mockResolvedValueOnce({ + snapshot: fresh, + writeOptions: { expectedConfigPath: fresh.path }, + }); + ioMocks.writeConfigFile.mockResolvedValue(undefined); + + let releaseFirstTransform!: () => void; + let markFirstTransformStarted!: () => void; + const firstTransformStarted = new Promise((resolve) => { + markFirstTransformStarted = resolve; + }); + const first = transformConfigFileWithRetry({ + transform: async (config) => { + markFirstTransformStarted(); + await new Promise((release) => { + releaseFirstTransform = release; + }); + return { + nextConfig: { + ...config, + agents: { list: [{ id: "first" }] }, + }, + }; + }, + }); + await firstTransformStarted; + const second = transformConfigFileWithRetry({ + transform: (config) => ({ + nextConfig: { + ...config, + agents: { + list: [...(config.agents?.list ?? []), { id: "second" }], + }, + }, + }), + }); + await Promise.resolve(); + await Promise.resolve(); + expect(ioMocks.readConfigFileSnapshotForWrite).toHaveBeenCalledTimes(1); + + releaseFirstTransform(); + await Promise.all([first, second]); + expect(ioMocks.writeConfigFile).toHaveBeenNthCalledWith( + 2, + { + agents: { + list: [{ id: "first" }, { id: "second" }], + }, + }, + { baseSnapshot: fresh, expectedConfigPath: fresh.path, afterWrite: { mode: "auto" } }, ); }); diff --git a/src/config/mutate.ts b/src/config/mutate.ts index 3e6110b7775..cba2223ee4b 100644 --- a/src/config/mutate.ts +++ b/src/config/mutate.ts @@ -1,7 +1,9 @@ +import { AsyncLocalStorage } from "node:async_hooks"; import fs from "node:fs/promises"; import path from "node:path"; import { isDeepStrictEqual } from "node:util"; import { formatErrorMessage } from "../infra/errors.js"; +import { withFileLock } from "../infra/file-lock.js"; import { replaceFileAtomic } from "../infra/replace-file.js"; import { isPathInside } from "../security/scan-paths.js"; import { isRecord } from "../utils.js"; @@ -16,6 +18,7 @@ import { } from "./io.js"; import { applyUnsetPathsForWrite, resolveManagedUnsetPathsForWrite } from "./io.write-prepare.js"; import { assertConfigWriteAllowedInCurrentMode } from "./nix-mode-write-guard.js"; +import { resolveConfigPath } from "./paths.js"; import { createRuntimeConfigWriteNotification, finalizeRuntimeSnapshotWrite, @@ -33,6 +36,21 @@ import { validateConfigObjectWithPlugins } from "./validation.js"; export type ConfigMutationBase = "runtime" | "source"; +const CONFIG_MUTATION_LOCK_OPTIONS = { + retries: { + retries: 80, + factor: 1.2, + minTimeout: 25, + maxTimeout: 250, + randomize: true, + }, + stale: 30_000, +} as const; + +const DEFAULT_CONFIG_MUTATION_RETRY_ATTEMPTS = 5; +const activeConfigMutationLocks = new AsyncLocalStorage>(); +const configMutationQueueTails = new Map>(); + export class ConfigMutationConflictError extends Error { readonly currentHash: string | null; @@ -52,11 +70,62 @@ export type ConfigReplaceResult = { followUp: ConfigWriteFollowUp; }; -type ConfigMutationIO = { +export type ConfigMutationIO = { readConfigFileSnapshotForWrite: typeof readConfigFileSnapshotForWrite; writeConfigFile: (cfg: OpenClawConfig, options?: ConfigWriteOptions) => Promise; }; +export type ConfigMutationContext = { + snapshot: ConfigFileSnapshot; + previousHash: string | null; + attempt: number; +}; + +export type ConfigTransformResult = { + nextConfig: OpenClawConfig; + result?: T; +}; + +export type ConfigMutationCommitParams = { + nextConfig: OpenClawConfig; + snapshot: ConfigFileSnapshot; + baseHash?: string; + writeOptions?: ConfigWriteOptions; + afterWrite: ConfigWriteAfterWrite; + io?: ConfigMutationIO; +}; + +export type ConfigMutationCommitResult = { + config: OpenClawConfig; + afterWrite?: ConfigWriteAfterWrite; +}; + +export type ConfigMutationCommit = ( + params: ConfigMutationCommitParams, +) => Promise; + +export type TransformConfigFileParams = { + base?: ConfigMutationBase; + baseHash?: string; + afterWrite?: ConfigWriteOptions["afterWrite"]; + writeOptions?: ConfigWriteOptions; + io?: ConfigMutationIO; + commit?: ConfigMutationCommit; + transform: ( + currentConfig: OpenClawConfig, + context: ConfigMutationContext, + ) => Promise> | ConfigTransformResult; +}; + +export type TransformConfigFileWithRetryParams = TransformConfigFileParams & { + maxAttempts?: number; +}; + +export type ConfigMutationResult = ConfigReplaceResult & { + result: T | undefined; + attempts: number; +}; + function assertBaseHashMatches(snapshot: ConfigFileSnapshot, expectedHash?: string): string | null { const currentHash = resolveConfigSnapshotHash(snapshot) ?? null; if (expectedHash !== undefined && expectedHash !== currentHash) { @@ -67,6 +136,49 @@ function assertBaseHashMatches(snapshot: ConfigFileSnapshot, expectedHash?: stri return currentHash; } +async function withConfigMutationLock( + params: { io?: ConfigMutationIO; lockPath?: string }, + fn: () => Promise, +): Promise { + if (params.io) { + return await fn(); + } + const configPath = path.resolve(params.lockPath ?? resolveConfigPath()); + const activeLocks = activeConfigMutationLocks.getStore(); + if (activeLocks?.has(configPath)) { + return await fn(); + } + assertConfigWriteAllowedInCurrentMode({ configPath }); + await fs.mkdir(path.dirname(configPath), { recursive: true, mode: 0o700 }); + + const previousTail = configMutationQueueTails.get(configPath) ?? Promise.resolve(); + let releaseQueueSlot!: () => void; + const currentRun = new Promise((resolve) => { + releaseQueueSlot = resolve; + }); + const currentTail = previousTail.catch(() => undefined).then(() => currentRun); + configMutationQueueTails.set(configPath, currentTail); + + await previousTail.catch(() => undefined); + try { + const nextActiveLocks = new Set(activeLocks ?? []); + nextActiveLocks.add(configPath); + return await activeConfigMutationLocks.run( + nextActiveLocks, + async () => await withFileLock(configPath, CONFIG_MUTATION_LOCK_OPTIONS, fn), + ); + } finally { + releaseQueueSlot(); + if (configMutationQueueTails.get(configPath) === currentTail) { + configMutationQueueTails.delete(configPath); + } + } +} + +function markActiveConfigMutationPath(configPath: string): void { + activeConfigMutationLocks.getStore()?.add(path.resolve(configPath)); +} + function getChangedTopLevelKeys(base: unknown, next: unknown): string[] { if (!isRecord(base) || !isRecord(next)) { return isDeepStrictEqual(base, next) ? [] : [""]; @@ -223,6 +335,20 @@ export async function replaceConfigFile(params: { afterWrite?: ConfigWriteOptions["afterWrite"]; writeOptions?: ConfigWriteOptions; io?: ConfigMutationIO; +}): Promise { + return await withConfigMutationLock( + { io: params.io, lockPath: params.snapshot?.path }, + async () => await replaceConfigFileUnlocked(params), + ); +} + +async function replaceConfigFileUnlocked(params: { + nextConfig: OpenClawConfig; + baseHash?: string; + snapshot?: ConfigFileSnapshot; + afterWrite?: ConfigWriteOptions["afterWrite"]; + writeOptions?: ConfigWriteOptions; + io?: ConfigMutationIO; }): Promise { const prepared = params.snapshot && params.writeOptions @@ -230,6 +356,7 @@ export async function replaceConfigFile(params: { : await (params.io?.readConfigFileSnapshotForWrite ?? readConfigFileSnapshotForWrite)(); const { snapshot, writeOptions } = prepared; assertConfigWriteAllowedInCurrentMode({ configPath: snapshot.path }); + markActiveConfigMutationPath(snapshot.path); const previousHash = assertBaseHashMatches(snapshot, params.baseHash); const afterWrite = resolveConfigWriteAfterWrite( params.afterWrite ?? params.writeOptions?.afterWrite, @@ -259,52 +386,138 @@ export async function replaceConfigFile(params: { }; } +async function commitPreparedConfigMutation( + params: ConfigMutationCommitParams, +): Promise { + const result = await replaceConfigFileUnlocked({ + nextConfig: params.nextConfig, + snapshot: params.snapshot, + baseHash: params.baseHash, + writeOptions: { + ...params.writeOptions, + afterWrite: params.afterWrite, + }, + io: params.io, + }); + return { + config: result.nextConfig, + afterWrite: result.afterWrite, + }; +} + +async function transformConfigFileAttempt( + params: TransformConfigFileParams, + attempt: number, +): Promise> { + const { snapshot, writeOptions } = await ( + params.io?.readConfigFileSnapshotForWrite ?? readConfigFileSnapshotForWrite + )(); + assertConfigWriteAllowedInCurrentMode({ configPath: snapshot.path }); + markActiveConfigMutationPath(snapshot.path); + const previousHash = assertBaseHashMatches(snapshot, params.baseHash); + const baseConfig = params.base === "runtime" ? snapshot.runtimeConfig : snapshot.sourceConfig; + const afterWrite = resolveConfigWriteAfterWrite( + params.afterWrite ?? params.writeOptions?.afterWrite, + ); + const mergedWriteOptions = { + ...writeOptions, + ...params.writeOptions, + }; + const transformed = await params.transform(baseConfig, { snapshot, previousHash, attempt }); + const committed = await (params.commit ?? commitPreparedConfigMutation)({ + nextConfig: transformed.nextConfig, + snapshot, + ...(previousHash !== null ? { baseHash: previousHash } : {}), + writeOptions: mergedWriteOptions, + afterWrite, + io: params.io, + }); + const committedAfterWrite = committed.afterWrite ?? afterWrite; + return { + path: snapshot.path, + previousHash, + snapshot, + nextConfig: committed.config, + result: transformed.result, + attempts: attempt + 1, + afterWrite: committedAfterWrite, + followUp: resolveConfigWriteFollowUp(committedAfterWrite), + }; +} + +export async function transformConfigFile( + params: TransformConfigFileParams, +): Promise> { + return await withConfigMutationLock( + { io: params.io }, + async () => await transformConfigFileAttempt(params, 0), + ); +} + +export async function transformConfigFileWithRetry( + params: TransformConfigFileWithRetryParams, +): Promise> { + const maxAttempts = params.maxAttempts ?? DEFAULT_CONFIG_MUTATION_RETRY_ATTEMPTS; + if (!Number.isInteger(maxAttempts) || maxAttempts < 1) { + throw new Error("Config mutation maxAttempts must be a positive integer."); + } + return await withConfigMutationLock({ io: params.io }, async () => { + for (let attempt = 0; attempt < maxAttempts; attempt += 1) { + try { + return await transformConfigFileAttempt(params, attempt); + } catch (err) { + if (err instanceof ConfigMutationConflictError && attempt < maxAttempts - 1) { + continue; + } + throw err; + } + } + throw new Error("Config mutation retry loop exhausted unexpectedly."); + }); +} + export async function mutateConfigFile(params: { base?: ConfigMutationBase; baseHash?: string; afterWrite?: ConfigWriteOptions["afterWrite"]; writeOptions?: ConfigWriteOptions; io?: ConfigMutationIO; - mutate: ( - draft: OpenClawConfig, - context: { snapshot: ConfigFileSnapshot; previousHash: string | null }, - ) => Promise | T | void; -}): Promise { - const { snapshot, writeOptions } = await ( - params.io?.readConfigFileSnapshotForWrite ?? readConfigFileSnapshotForWrite - )(); - assertConfigWriteAllowedInCurrentMode({ configPath: snapshot.path }); - const previousHash = assertBaseHashMatches(snapshot, params.baseHash); - const baseConfig = params.base === "runtime" ? snapshot.runtimeConfig : snapshot.sourceConfig; - const draft = structuredClone(baseConfig) as OpenClawConfig; - const result = (await params.mutate(draft, { snapshot, previousHash })) as T | undefined; - const afterWrite = resolveConfigWriteAfterWrite( - params.afterWrite ?? params.writeOptions?.afterWrite, - ); - const wroteInclude = await tryWriteSingleTopLevelIncludeMutation({ - snapshot, - nextConfig: draft, - afterWrite, - writeOptions: { - ...writeOptions, - ...params.writeOptions, - }, + mutate: (draft: OpenClawConfig, context: ConfigMutationContext) => Promise | T | void; +}): Promise> { + return await transformConfigFile({ + base: params.base, + baseHash: params.baseHash, + afterWrite: params.afterWrite, + writeOptions: params.writeOptions, io: params.io, + transform: async (currentConfig, context) => { + const draft = structuredClone(currentConfig) as OpenClawConfig; + const result = (await params.mutate(draft, context)) as T | undefined; + return { nextConfig: draft, result }; + }, + }); +} + +export async function mutateConfigFileWithRetry(params: { + base?: ConfigMutationBase; + baseHash?: string; + maxAttempts?: number; + afterWrite?: ConfigWriteOptions["afterWrite"]; + writeOptions?: ConfigWriteOptions; + io?: ConfigMutationIO; + mutate: (draft: OpenClawConfig, context: ConfigMutationContext) => Promise | T | void; +}): Promise> { + return await transformConfigFileWithRetry({ + base: params.base, + baseHash: params.baseHash, + maxAttempts: params.maxAttempts, + afterWrite: params.afterWrite, + writeOptions: params.writeOptions, + io: params.io, + transform: async (currentConfig, context) => { + const draft = structuredClone(currentConfig) as OpenClawConfig; + const result = (await params.mutate(draft, context)) as T | undefined; + return { nextConfig: draft, result }; + }, }); - if (!wroteInclude) { - await (params.io?.writeConfigFile ?? writeConfigFile)(draft, { - ...writeOptions, - ...params.writeOptions, - afterWrite, - }); - } - return { - path: snapshot.path, - previousHash, - snapshot, - nextConfig: draft, - result, - afterWrite, - followUp: resolveConfigWriteFollowUp(afterWrite), - }; }