mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 22:54:46 +00:00
fix: serialize config mutation writes
This commit is contained in:
@@ -598,6 +598,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Config: serialize and retry semantic config mutations centrally, so concurrent commands can rebase safe changes instead of clobbering or hand-rolling command-local retry loops.
|
||||
- Models/auth: keep `agents.defaults.model` when `openclaw models auth login` runs without `--set-default`, so provider onboarding patches add models without silently switching the primary. Fixes #78162. (#78241) Thanks @neeravmakwana.
|
||||
- Control UI/chat: localize the remaining chat welcome, composer, run-control, session/model/thinking selector, and zh-CN Skills labels through the Control UI i18n pipeline so non-English browser locales no longer see those chat controls in English. Fixes #79937. Thanks @BunsDev.
|
||||
- Control UI: surface browser-blocked WebSocket security failures with wss:// and loopback dashboard guidance instead of leaving the connection on a dead security error. Thanks @BunsDev.
|
||||
|
||||
@@ -1,5 +1,14 @@
|
||||
import { isDeepStrictEqual } from "node:util";
|
||||
import { replaceConfigFile } from "../config/config.js";
|
||||
import {
|
||||
replaceConfigFile,
|
||||
resolveConfigWriteAfterWrite,
|
||||
transformConfigFileWithRetry,
|
||||
type ConfigMutationCommit,
|
||||
type ConfigMutationResult,
|
||||
type ConfigMutationContext,
|
||||
type ConfigTransformResult,
|
||||
type TransformConfigFileWithRetryParams,
|
||||
} from "../config/config.js";
|
||||
import type { ConfigWriteOptions } from "../config/io.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import type { PluginInstallRecord } from "../config/types.plugins.js";
|
||||
@@ -21,6 +30,19 @@ function mergeUnsetPaths(
|
||||
type ConfigCommit = (config: OpenClawConfig, writeOptions?: ConfigWriteOptions) => Promise<void>;
|
||||
const PLUGIN_SOURCE_CHANGED_RESTART_REASON = "plugin source changed";
|
||||
|
||||
function mergeAfterWrite(
|
||||
writeOptions: ConfigWriteOptions | undefined,
|
||||
afterWrite: ConfigWriteOptions["afterWrite"],
|
||||
): ConfigWriteOptions | undefined {
|
||||
if (afterWrite === undefined) {
|
||||
return writeOptions;
|
||||
}
|
||||
return {
|
||||
...writeOptions,
|
||||
afterWrite,
|
||||
};
|
||||
}
|
||||
|
||||
async function commitPluginInstallRecordsWithWriter(params: {
|
||||
previousInstallRecords?: Record<string, PluginInstallRecord>;
|
||||
nextInstallRecords: Record<string, PluginInstallRecord>;
|
||||
@@ -141,3 +163,53 @@ export async function commitConfigWithPendingPluginInstalls(params: {
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function transformConfigWithPendingPluginInstalls<T = void>(
|
||||
params: Omit<TransformConfigFileWithRetryParams<T>, "commit">,
|
||||
): Promise<ConfigMutationResult<T>> {
|
||||
const commit: ConfigMutationCommit = async ({ nextConfig, snapshot, baseHash, writeOptions }) => {
|
||||
const requestedAfterWrite = params.afterWrite ?? params.writeOptions?.afterWrite;
|
||||
const committed = await commitConfigWriteWithPendingPluginInstalls({
|
||||
nextConfig,
|
||||
...(writeOptions ? { writeOptions: mergeAfterWrite(writeOptions, params.afterWrite) } : {}),
|
||||
commit: async (config, commitWriteOptions) => {
|
||||
await replaceConfigFile({
|
||||
nextConfig: config,
|
||||
snapshot,
|
||||
writeOptions: commitWriteOptions ?? {},
|
||||
...(baseHash !== undefined ? { baseHash } : {}),
|
||||
});
|
||||
},
|
||||
});
|
||||
const afterWrite = resolveConfigWriteAfterWrite(
|
||||
requestedAfterWrite ??
|
||||
(committed.movedInstallRecords
|
||||
? { mode: "restart", reason: PLUGIN_SOURCE_CHANGED_RESTART_REASON }
|
||||
: undefined),
|
||||
);
|
||||
return {
|
||||
config: committed.config,
|
||||
afterWrite,
|
||||
};
|
||||
};
|
||||
|
||||
return await transformConfigFileWithRetry<T>({
|
||||
...params,
|
||||
commit,
|
||||
});
|
||||
}
|
||||
|
||||
export async function mutateConfigWithPendingPluginInstalls<T = void>(
|
||||
params: Omit<TransformConfigFileWithRetryParams<T>, "commit" | "transform"> & {
|
||||
mutate: (draft: OpenClawConfig, context: ConfigMutationContext) => Promise<T | void> | T | void;
|
||||
},
|
||||
): Promise<ConfigMutationResult<T>> {
|
||||
return await transformConfigWithPendingPluginInstalls<T>({
|
||||
...params,
|
||||
transform: async (currentConfig, context): Promise<ConfigTransformResult<T>> => {
|
||||
const draft = structuredClone(currentConfig) as OpenClawConfig;
|
||||
const result = (await params.mutate(draft, context)) as T | undefined;
|
||||
return { nextConfig: draft, result };
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
@@ -13,6 +13,45 @@ const writeConfigFileMock = vi.hoisted(() => vi.fn().mockResolvedValue(undefined
|
||||
const replaceConfigFileMock = vi.hoisted(() =>
|
||||
vi.fn(async (params: { nextConfig: unknown }) => await writeConfigFileMock(params.nextConfig)),
|
||||
);
|
||||
const transformConfigWithPendingPluginInstallsMock = vi.hoisted(() =>
|
||||
vi.fn(
|
||||
async (params: {
|
||||
transform: (
|
||||
config: Record<string, unknown>,
|
||||
context: {
|
||||
snapshot: Record<string, unknown>;
|
||||
previousHash: string | null;
|
||||
attempt: number;
|
||||
},
|
||||
) =>
|
||||
| Promise<{ nextConfig: unknown; result?: unknown }>
|
||||
| { nextConfig: unknown; result?: unknown };
|
||||
}) => {
|
||||
const snapshot = (await readConfigFileSnapshotMock()) as {
|
||||
path?: string;
|
||||
hash?: string;
|
||||
config?: Record<string, unknown>;
|
||||
sourceConfig?: Record<string, unknown>;
|
||||
};
|
||||
const transformed = await params.transform(snapshot.sourceConfig ?? snapshot.config ?? {}, {
|
||||
snapshot,
|
||||
previousHash: snapshot.hash ?? null,
|
||||
attempt: 0,
|
||||
});
|
||||
await writeConfigFileMock(transformed.nextConfig);
|
||||
return {
|
||||
path: snapshot.path ?? "/tmp/openclaw.json",
|
||||
previousHash: snapshot.hash ?? null,
|
||||
snapshot,
|
||||
nextConfig: transformed.nextConfig,
|
||||
result: transformed.result,
|
||||
attempts: 1,
|
||||
afterWrite: { mode: "auto" },
|
||||
followUp: { mode: "auto", requiresRestart: false },
|
||||
};
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
const wizardMocks = vi.hoisted(() => ({
|
||||
createClackPrompter: vi.fn(),
|
||||
@@ -25,6 +64,13 @@ vi.mock("../config/config.js", async () => ({
|
||||
replaceConfigFile: replaceConfigFileMock,
|
||||
}));
|
||||
|
||||
vi.mock("../cli/plugins-install-record-commit.js", async () => ({
|
||||
...(await vi.importActual<typeof import("../cli/plugins-install-record-commit.js")>(
|
||||
"../cli/plugins-install-record-commit.js",
|
||||
)),
|
||||
transformConfigWithPendingPluginInstalls: transformConfigWithPendingPluginInstallsMock,
|
||||
}));
|
||||
|
||||
vi.mock("../wizard/clack-prompter.js", () => ({
|
||||
createClackPrompter: wizardMocks.createClackPrompter,
|
||||
}));
|
||||
@@ -44,6 +90,7 @@ describe("agents add command", () => {
|
||||
readConfigFileSnapshotMock.mockClear();
|
||||
writeConfigFileMock.mockClear();
|
||||
replaceConfigFileMock.mockClear();
|
||||
transformConfigWithPendingPluginInstallsMock.mockClear();
|
||||
wizardMocks.createClackPrompter.mockClear();
|
||||
runtime.log.mockClear();
|
||||
runtime.error.mockClear();
|
||||
@@ -227,4 +274,98 @@ describe("agents add command", () => {
|
||||
}),
|
||||
).toBe('OAuth profiles stay shared from "main" unless this agent signs in separately.');
|
||||
});
|
||||
|
||||
describe("non-interactive config mutation", () => {
|
||||
it("rebases agent creation on the latest config snapshot", async () => {
|
||||
readConfigFileSnapshotMock
|
||||
.mockResolvedValueOnce({
|
||||
...baseConfigSnapshot,
|
||||
hash: "hash-1",
|
||||
config: { agents: { list: [] } },
|
||||
sourceConfig: { agents: { list: [] } },
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
...baseConfigSnapshot,
|
||||
hash: "hash-2",
|
||||
config: { agents: { list: [{ id: "other-agent" }] } },
|
||||
sourceConfig: { agents: { list: [{ id: "other-agent" }] } },
|
||||
});
|
||||
|
||||
await agentsAddCommand({ name: "Work", workspace: "/tmp/work" }, runtime, {
|
||||
hasFlags: true,
|
||||
});
|
||||
|
||||
expect(transformConfigWithPendingPluginInstallsMock).toHaveBeenCalledOnce();
|
||||
expect(writeConfigFileMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
agents: {
|
||||
list: [
|
||||
{ id: "other-agent" },
|
||||
expect.objectContaining({ id: "work", workspace: "/tmp/work" }),
|
||||
],
|
||||
},
|
||||
}),
|
||||
);
|
||||
expect(runtime.exit).not.toHaveBeenCalled();
|
||||
expect(runtime.error).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("fails instead of overwriting when the same agent appears before commit", async () => {
|
||||
readConfigFileSnapshotMock
|
||||
.mockResolvedValueOnce({
|
||||
...baseConfigSnapshot,
|
||||
hash: "hash-1",
|
||||
config: { agents: { list: [] } },
|
||||
sourceConfig: { agents: { list: [] } },
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
...baseConfigSnapshot,
|
||||
hash: "hash-2",
|
||||
config: { agents: { list: [{ id: "work", workspace: "/tmp/other" }] } },
|
||||
sourceConfig: { agents: { list: [{ id: "work", workspace: "/tmp/other" }] } },
|
||||
});
|
||||
|
||||
await agentsAddCommand({ name: "Work", workspace: "/tmp/work" }, runtime, {
|
||||
hasFlags: true,
|
||||
});
|
||||
|
||||
expect(writeConfigFileMock).not.toHaveBeenCalled();
|
||||
expect(runtime.error).toHaveBeenCalledWith('Agent "work" already exists.');
|
||||
expect(runtime.exit).toHaveBeenCalledWith(1);
|
||||
});
|
||||
|
||||
it("reports binding conflicts from the committed mutation", async () => {
|
||||
readConfigFileSnapshotMock
|
||||
.mockResolvedValueOnce({
|
||||
...baseConfigSnapshot,
|
||||
hash: "hash-1",
|
||||
config: { agents: { list: [] } },
|
||||
sourceConfig: { agents: { list: [] } },
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
...baseConfigSnapshot,
|
||||
hash: "hash-2",
|
||||
config: {
|
||||
agents: { list: [{ id: "other-agent" }] },
|
||||
bindings: [{ type: "route", agentId: "other-agent", match: { channel: "telegram" } }],
|
||||
},
|
||||
sourceConfig: {
|
||||
agents: { list: [{ id: "other-agent" }] },
|
||||
bindings: [{ type: "route", agentId: "other-agent", match: { channel: "telegram" } }],
|
||||
},
|
||||
});
|
||||
|
||||
await agentsAddCommand(
|
||||
{ name: "Work", workspace: "/tmp/work", bind: ["telegram"], json: true },
|
||||
runtime,
|
||||
{ hasFlags: true },
|
||||
);
|
||||
|
||||
const payload = JSON.parse(String(runtime.log.mock.calls.at(-1)?.[0])) as {
|
||||
bindings: { added: string[]; conflicts: string[] };
|
||||
};
|
||||
expect(payload.bindings.added).toEqual([]);
|
||||
expect(payload.bindings.conflicts).toEqual(["telegram (agent=other-agent)"]);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -15,7 +15,10 @@ import {
|
||||
loadPersistedAuthProfileStore,
|
||||
} from "../agents/auth-profiles/persisted.js";
|
||||
import { formatCliCommand } from "../cli/command-format.js";
|
||||
import { commitConfigWithPendingPluginInstalls } from "../cli/plugins-install-record-commit.js";
|
||||
import {
|
||||
commitConfigWithPendingPluginInstalls,
|
||||
transformConfigWithPendingPluginInstalls,
|
||||
} from "../cli/plugins-install-record-commit.js";
|
||||
import { logConfigUpdated } from "../config/logging.js";
|
||||
import { pathExists } from "../infra/fs-safe.js";
|
||||
import { saveJsonFile } from "../infra/json-file.js";
|
||||
@@ -53,6 +56,24 @@ type AgentsAddOptions = {
|
||||
json?: boolean;
|
||||
};
|
||||
|
||||
type AgentBindingResult = ReturnType<typeof applyAgentBindings>;
|
||||
|
||||
type AgentsAddMutationResult = {
|
||||
agentDir: string;
|
||||
bindingResult: AgentBindingResult;
|
||||
};
|
||||
|
||||
class AgentsAddMutationError extends Error {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
this.name = "AgentsAddMutationError";
|
||||
}
|
||||
}
|
||||
|
||||
function emptyBindingResult(config: Parameters<typeof applyAgentBindings>[0]): AgentBindingResult {
|
||||
return { config, added: [], updated: [], skipped: [], conflicts: [] };
|
||||
}
|
||||
|
||||
async function copyPortableAuthProfiles(params: {
|
||||
destAuthPath: string;
|
||||
sourceAgentDir: string;
|
||||
@@ -147,44 +168,64 @@ export async function agentsAddCommand(
|
||||
}
|
||||
|
||||
const workspaceDir = resolveUserPath(workspaceFlag);
|
||||
const agentDir = opts.agentDir?.trim()
|
||||
const explicitAgentDir = opts.agentDir?.trim()
|
||||
? resolveUserPath(opts.agentDir.trim())
|
||||
: resolveAgentDir(cfg, agentId);
|
||||
: undefined;
|
||||
const model = opts.model?.trim();
|
||||
const nextConfig = applyAgentConfig(cfg, {
|
||||
agentId,
|
||||
name: nameInput,
|
||||
workspace: workspaceDir,
|
||||
agentDir,
|
||||
...(model ? { model } : {}),
|
||||
});
|
||||
|
||||
const bindingParse = parseBindingSpecs({
|
||||
agentId,
|
||||
specs: opts.bind,
|
||||
config: nextConfig,
|
||||
});
|
||||
if (bindingParse.errors.length > 0) {
|
||||
runtime.error(bindingParse.errors.join("\n"));
|
||||
runtime.exit(1);
|
||||
return;
|
||||
let committed;
|
||||
try {
|
||||
committed = await transformConfigWithPendingPluginInstalls<AgentsAddMutationResult>({
|
||||
transform: (latestConfig) => {
|
||||
if (findAgentEntryIndex(listAgentEntries(latestConfig), agentId) >= 0) {
|
||||
throw new AgentsAddMutationError(`Agent "${agentId}" already exists.`);
|
||||
}
|
||||
const agentDir = explicitAgentDir ?? resolveAgentDir(latestConfig, agentId);
|
||||
const nextConfig = applyAgentConfig(latestConfig, {
|
||||
agentId,
|
||||
name: nameInput,
|
||||
workspace: workspaceDir,
|
||||
agentDir,
|
||||
...(model ? { model } : {}),
|
||||
});
|
||||
const bindingParse = parseBindingSpecs({
|
||||
agentId,
|
||||
specs: opts.bind,
|
||||
config: nextConfig,
|
||||
});
|
||||
if (bindingParse.errors.length > 0) {
|
||||
throw new AgentsAddMutationError(bindingParse.errors.join("\n"));
|
||||
}
|
||||
const bindingResult =
|
||||
bindingParse.bindings.length > 0
|
||||
? applyAgentBindings(nextConfig, bindingParse.bindings)
|
||||
: emptyBindingResult(nextConfig);
|
||||
return {
|
||||
nextConfig: bindingResult.config,
|
||||
result: { agentDir, bindingResult },
|
||||
};
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
if (err instanceof AgentsAddMutationError) {
|
||||
runtime.error(err.message);
|
||||
runtime.exit(1);
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
const bindingResult =
|
||||
bindingParse.bindings.length > 0
|
||||
? applyAgentBindings(nextConfig, bindingParse.bindings)
|
||||
: { config: nextConfig, added: [], updated: [], skipped: [], conflicts: [] };
|
||||
|
||||
await commitConfigWithPendingPluginInstalls({
|
||||
nextConfig: bindingResult.config,
|
||||
...(baseHash !== undefined ? { baseHash } : {}),
|
||||
});
|
||||
const mutationResult = committed.result;
|
||||
if (!mutationResult) {
|
||||
throw new Error("Agent config mutation did not return a result.");
|
||||
}
|
||||
const { agentDir, bindingResult } = mutationResult;
|
||||
if (!opts.json) {
|
||||
logConfigUpdated(runtime);
|
||||
}
|
||||
const quietRuntime = opts.json ? createQuietRuntime(runtime) : runtime;
|
||||
await ensureWorkspaceAndSessions(workspaceDir, quietRuntime, {
|
||||
skipBootstrap: Boolean(bindingResult.config.agents?.defaults?.skipBootstrap),
|
||||
skipOptionalBootstrapFiles: bindingResult.config.agents?.defaults?.skipOptionalBootstrapFiles,
|
||||
skipBootstrap: Boolean(committed.nextConfig.agents?.defaults?.skipBootstrap),
|
||||
skipOptionalBootstrapFiles: committed.nextConfig.agents?.defaults?.skipOptionalBootstrapFiles,
|
||||
agentId,
|
||||
});
|
||||
|
||||
|
||||
@@ -43,7 +43,25 @@ export type {
|
||||
ConfigWriteNotification,
|
||||
ReadConfigFileSnapshotWithPluginMetadataResult,
|
||||
} from "./io.js";
|
||||
export { ConfigMutationConflictError, mutateConfigFile, replaceConfigFile } from "./mutate.js";
|
||||
export {
|
||||
ConfigMutationConflictError,
|
||||
mutateConfigFile,
|
||||
mutateConfigFileWithRetry,
|
||||
replaceConfigFile,
|
||||
transformConfigFile,
|
||||
transformConfigFileWithRetry,
|
||||
} from "./mutate.js";
|
||||
export type {
|
||||
ConfigMutationCommit,
|
||||
ConfigMutationCommitParams,
|
||||
ConfigMutationCommitResult,
|
||||
ConfigMutationContext,
|
||||
ConfigMutationIO,
|
||||
ConfigMutationResult,
|
||||
ConfigTransformResult,
|
||||
TransformConfigFileParams,
|
||||
TransformConfigFileWithRetryParams,
|
||||
} from "./mutate.js";
|
||||
export {
|
||||
assertConfigWriteAllowedInCurrentMode,
|
||||
NixModeConfigMutationError,
|
||||
|
||||
@@ -2428,6 +2428,7 @@ export async function writeConfigFile(
|
||||
nextCfg = coerceConfig(applyMergePatch(runtimeConfigSourceSnapshot!, runtimePatch));
|
||||
}
|
||||
const writeResult = await io.writeConfigFile(nextCfg, {
|
||||
baseSnapshot: options.baseSnapshot,
|
||||
envSnapshotForRestore: resolveWriteEnvSnapshotForPath({
|
||||
actualConfigPath: io.configPath,
|
||||
expectedConfigPath: options.expectedConfigPath,
|
||||
@@ -2438,6 +2439,7 @@ export async function writeConfigFile(
|
||||
explicitSetValueSource: options.explicitSetPaths
|
||||
? (options.explicitSetValueSource ?? cfg)
|
||||
: undefined,
|
||||
afterWrite: options.afterWrite,
|
||||
allowDestructiveWrite: options.allowDestructiveWrite,
|
||||
allowConfigSizeDrop: options.allowConfigSizeDrop,
|
||||
skipRuntimeSnapshotRefresh: options.skipRuntimeSnapshotRefresh,
|
||||
|
||||
@@ -2,7 +2,12 @@ import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { createSuiteTempRootTracker } from "../test-helpers/temp-dir.js";
|
||||
import { ConfigMutationConflictError, mutateConfigFile, replaceConfigFile } from "./mutate.js";
|
||||
import {
|
||||
ConfigMutationConflictError,
|
||||
mutateConfigFile,
|
||||
replaceConfigFile,
|
||||
transformConfigFileWithRetry,
|
||||
} from "./mutate.js";
|
||||
import { registerRuntimeConfigWriteListener, resetConfigRuntimeState } from "./runtime-snapshot.js";
|
||||
import type { ConfigFileSnapshot, OpenClawConfig } from "./types.js";
|
||||
|
||||
@@ -11,8 +16,16 @@ const ioMocks = vi.hoisted(() => ({
|
||||
resolveConfigSnapshotHash: vi.fn(),
|
||||
writeConfigFile: vi.fn(),
|
||||
}));
|
||||
const validationMocks = vi.hoisted(() => ({
|
||||
validateConfigObjectWithPlugins: vi.fn((config: OpenClawConfig) => ({
|
||||
ok: true,
|
||||
config,
|
||||
warnings: [],
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("./io.js", () => ioMocks);
|
||||
vi.mock("./validation.js", () => validationMocks);
|
||||
|
||||
function createSnapshot(params: {
|
||||
hash: string;
|
||||
@@ -61,6 +74,13 @@ describe("config mutate helpers", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
resetConfigRuntimeState();
|
||||
validationMocks.validateConfigObjectWithPlugins.mockImplementation(
|
||||
(config: OpenClawConfig) => ({
|
||||
ok: true,
|
||||
config,
|
||||
warnings: [],
|
||||
}),
|
||||
);
|
||||
ioMocks.resolveConfigSnapshotHash.mockImplementation(
|
||||
(snapshot: { hash?: string }) => snapshot.hash ?? null,
|
||||
);
|
||||
@@ -103,7 +123,125 @@ describe("config mutate helpers", () => {
|
||||
auth: { mode: "token" },
|
||||
},
|
||||
},
|
||||
{ expectedConfigPath: snapshot.path, afterWrite: { mode: "auto" } },
|
||||
{ baseSnapshot: snapshot, expectedConfigPath: snapshot.path, afterWrite: { mode: "auto" } },
|
||||
);
|
||||
});
|
||||
|
||||
it("retries transform mutations on stale config conflicts", async () => {
|
||||
const initial = createSnapshot({
|
||||
hash: "hash-1",
|
||||
sourceConfig: { agents: { list: [] } },
|
||||
});
|
||||
const fresh = createSnapshot({
|
||||
hash: "hash-2",
|
||||
sourceConfig: { agents: { list: [{ id: "other-agent" }] } },
|
||||
});
|
||||
ioMocks.readConfigFileSnapshotForWrite
|
||||
.mockResolvedValueOnce({
|
||||
snapshot: initial,
|
||||
writeOptions: { expectedConfigPath: initial.path },
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
snapshot: fresh,
|
||||
writeOptions: { expectedConfigPath: fresh.path },
|
||||
});
|
||||
ioMocks.writeConfigFile
|
||||
.mockRejectedValueOnce(new ConfigMutationConflictError("stale", { currentHash: "hash-2" }))
|
||||
.mockResolvedValueOnce(undefined);
|
||||
|
||||
const result = await transformConfigFileWithRetry({
|
||||
io: ioMocks,
|
||||
transform(config, context) {
|
||||
return {
|
||||
nextConfig: {
|
||||
...config,
|
||||
agents: {
|
||||
list: [...(config.agents?.list ?? []), { id: "work" }],
|
||||
},
|
||||
},
|
||||
result: context.attempt,
|
||||
};
|
||||
},
|
||||
});
|
||||
|
||||
expect(result.attempts).toBe(2);
|
||||
expect(result.result).toBe(1);
|
||||
expect(ioMocks.writeConfigFile).toHaveBeenCalledTimes(2);
|
||||
expect(ioMocks.writeConfigFile).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
{
|
||||
agents: {
|
||||
list: [{ id: "other-agent" }, { id: "work" }],
|
||||
},
|
||||
},
|
||||
{ baseSnapshot: fresh, expectedConfigPath: fresh.path, afterWrite: { mode: "auto" } },
|
||||
);
|
||||
});
|
||||
|
||||
it("serializes same-process transform mutations before reading snapshots", async () => {
|
||||
const initial = createSnapshot({
|
||||
hash: "hash-1",
|
||||
sourceConfig: { agents: { list: [] } },
|
||||
});
|
||||
const fresh = createSnapshot({
|
||||
hash: "hash-2",
|
||||
sourceConfig: { agents: { list: [{ id: "first" }] } },
|
||||
});
|
||||
ioMocks.readConfigFileSnapshotForWrite
|
||||
.mockResolvedValueOnce({
|
||||
snapshot: initial,
|
||||
writeOptions: { expectedConfigPath: initial.path },
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
snapshot: fresh,
|
||||
writeOptions: { expectedConfigPath: fresh.path },
|
||||
});
|
||||
ioMocks.writeConfigFile.mockResolvedValue(undefined);
|
||||
|
||||
let releaseFirstTransform!: () => void;
|
||||
let markFirstTransformStarted!: () => void;
|
||||
const firstTransformStarted = new Promise<void>((resolve) => {
|
||||
markFirstTransformStarted = resolve;
|
||||
});
|
||||
const first = transformConfigFileWithRetry({
|
||||
transform: async (config) => {
|
||||
markFirstTransformStarted();
|
||||
await new Promise<void>((release) => {
|
||||
releaseFirstTransform = release;
|
||||
});
|
||||
return {
|
||||
nextConfig: {
|
||||
...config,
|
||||
agents: { list: [{ id: "first" }] },
|
||||
},
|
||||
};
|
||||
},
|
||||
});
|
||||
await firstTransformStarted;
|
||||
const second = transformConfigFileWithRetry({
|
||||
transform: (config) => ({
|
||||
nextConfig: {
|
||||
...config,
|
||||
agents: {
|
||||
list: [...(config.agents?.list ?? []), { id: "second" }],
|
||||
},
|
||||
},
|
||||
}),
|
||||
});
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
expect(ioMocks.readConfigFileSnapshotForWrite).toHaveBeenCalledTimes(1);
|
||||
|
||||
releaseFirstTransform();
|
||||
await Promise.all([first, second]);
|
||||
expect(ioMocks.writeConfigFile).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
{
|
||||
agents: {
|
||||
list: [{ id: "first" }, { id: "second" }],
|
||||
},
|
||||
},
|
||||
{ baseSnapshot: fresh, expectedConfigPath: fresh.path, afterWrite: { mode: "auto" } },
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import { AsyncLocalStorage } from "node:async_hooks";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { isDeepStrictEqual } from "node:util";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { withFileLock } from "../infra/file-lock.js";
|
||||
import { replaceFileAtomic } from "../infra/replace-file.js";
|
||||
import { isPathInside } from "../security/scan-paths.js";
|
||||
import { isRecord } from "../utils.js";
|
||||
@@ -16,6 +18,7 @@ import {
|
||||
} from "./io.js";
|
||||
import { applyUnsetPathsForWrite, resolveManagedUnsetPathsForWrite } from "./io.write-prepare.js";
|
||||
import { assertConfigWriteAllowedInCurrentMode } from "./nix-mode-write-guard.js";
|
||||
import { resolveConfigPath } from "./paths.js";
|
||||
import {
|
||||
createRuntimeConfigWriteNotification,
|
||||
finalizeRuntimeSnapshotWrite,
|
||||
@@ -33,6 +36,21 @@ import { validateConfigObjectWithPlugins } from "./validation.js";
|
||||
|
||||
export type ConfigMutationBase = "runtime" | "source";
|
||||
|
||||
const CONFIG_MUTATION_LOCK_OPTIONS = {
|
||||
retries: {
|
||||
retries: 80,
|
||||
factor: 1.2,
|
||||
minTimeout: 25,
|
||||
maxTimeout: 250,
|
||||
randomize: true,
|
||||
},
|
||||
stale: 30_000,
|
||||
} as const;
|
||||
|
||||
const DEFAULT_CONFIG_MUTATION_RETRY_ATTEMPTS = 5;
|
||||
const activeConfigMutationLocks = new AsyncLocalStorage<Set<string>>();
|
||||
const configMutationQueueTails = new Map<string, Promise<void>>();
|
||||
|
||||
export class ConfigMutationConflictError extends Error {
|
||||
readonly currentHash: string | null;
|
||||
|
||||
@@ -52,11 +70,62 @@ export type ConfigReplaceResult = {
|
||||
followUp: ConfigWriteFollowUp;
|
||||
};
|
||||
|
||||
type ConfigMutationIO = {
|
||||
export type ConfigMutationIO = {
|
||||
readConfigFileSnapshotForWrite: typeof readConfigFileSnapshotForWrite;
|
||||
writeConfigFile: (cfg: OpenClawConfig, options?: ConfigWriteOptions) => Promise<unknown>;
|
||||
};
|
||||
|
||||
export type ConfigMutationContext = {
|
||||
snapshot: ConfigFileSnapshot;
|
||||
previousHash: string | null;
|
||||
attempt: number;
|
||||
};
|
||||
|
||||
export type ConfigTransformResult<T> = {
|
||||
nextConfig: OpenClawConfig;
|
||||
result?: T;
|
||||
};
|
||||
|
||||
export type ConfigMutationCommitParams = {
|
||||
nextConfig: OpenClawConfig;
|
||||
snapshot: ConfigFileSnapshot;
|
||||
baseHash?: string;
|
||||
writeOptions?: ConfigWriteOptions;
|
||||
afterWrite: ConfigWriteAfterWrite;
|
||||
io?: ConfigMutationIO;
|
||||
};
|
||||
|
||||
export type ConfigMutationCommitResult = {
|
||||
config: OpenClawConfig;
|
||||
afterWrite?: ConfigWriteAfterWrite;
|
||||
};
|
||||
|
||||
export type ConfigMutationCommit = (
|
||||
params: ConfigMutationCommitParams,
|
||||
) => Promise<ConfigMutationCommitResult>;
|
||||
|
||||
export type TransformConfigFileParams<T> = {
|
||||
base?: ConfigMutationBase;
|
||||
baseHash?: string;
|
||||
afterWrite?: ConfigWriteOptions["afterWrite"];
|
||||
writeOptions?: ConfigWriteOptions;
|
||||
io?: ConfigMutationIO;
|
||||
commit?: ConfigMutationCommit;
|
||||
transform: (
|
||||
currentConfig: OpenClawConfig,
|
||||
context: ConfigMutationContext,
|
||||
) => Promise<ConfigTransformResult<T>> | ConfigTransformResult<T>;
|
||||
};
|
||||
|
||||
export type TransformConfigFileWithRetryParams<T> = TransformConfigFileParams<T> & {
|
||||
maxAttempts?: number;
|
||||
};
|
||||
|
||||
export type ConfigMutationResult<T> = ConfigReplaceResult & {
|
||||
result: T | undefined;
|
||||
attempts: number;
|
||||
};
|
||||
|
||||
function assertBaseHashMatches(snapshot: ConfigFileSnapshot, expectedHash?: string): string | null {
|
||||
const currentHash = resolveConfigSnapshotHash(snapshot) ?? null;
|
||||
if (expectedHash !== undefined && expectedHash !== currentHash) {
|
||||
@@ -67,6 +136,49 @@ function assertBaseHashMatches(snapshot: ConfigFileSnapshot, expectedHash?: stri
|
||||
return currentHash;
|
||||
}
|
||||
|
||||
async function withConfigMutationLock<T>(
|
||||
params: { io?: ConfigMutationIO; lockPath?: string },
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
if (params.io) {
|
||||
return await fn();
|
||||
}
|
||||
const configPath = path.resolve(params.lockPath ?? resolveConfigPath());
|
||||
const activeLocks = activeConfigMutationLocks.getStore();
|
||||
if (activeLocks?.has(configPath)) {
|
||||
return await fn();
|
||||
}
|
||||
assertConfigWriteAllowedInCurrentMode({ configPath });
|
||||
await fs.mkdir(path.dirname(configPath), { recursive: true, mode: 0o700 });
|
||||
|
||||
const previousTail = configMutationQueueTails.get(configPath) ?? Promise.resolve();
|
||||
let releaseQueueSlot!: () => void;
|
||||
const currentRun = new Promise<void>((resolve) => {
|
||||
releaseQueueSlot = resolve;
|
||||
});
|
||||
const currentTail = previousTail.catch(() => undefined).then(() => currentRun);
|
||||
configMutationQueueTails.set(configPath, currentTail);
|
||||
|
||||
await previousTail.catch(() => undefined);
|
||||
try {
|
||||
const nextActiveLocks = new Set(activeLocks ?? []);
|
||||
nextActiveLocks.add(configPath);
|
||||
return await activeConfigMutationLocks.run(
|
||||
nextActiveLocks,
|
||||
async () => await withFileLock(configPath, CONFIG_MUTATION_LOCK_OPTIONS, fn),
|
||||
);
|
||||
} finally {
|
||||
releaseQueueSlot();
|
||||
if (configMutationQueueTails.get(configPath) === currentTail) {
|
||||
configMutationQueueTails.delete(configPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function markActiveConfigMutationPath(configPath: string): void {
|
||||
activeConfigMutationLocks.getStore()?.add(path.resolve(configPath));
|
||||
}
|
||||
|
||||
function getChangedTopLevelKeys(base: unknown, next: unknown): string[] {
|
||||
if (!isRecord(base) || !isRecord(next)) {
|
||||
return isDeepStrictEqual(base, next) ? [] : ["<root>"];
|
||||
@@ -223,6 +335,20 @@ export async function replaceConfigFile(params: {
|
||||
afterWrite?: ConfigWriteOptions["afterWrite"];
|
||||
writeOptions?: ConfigWriteOptions;
|
||||
io?: ConfigMutationIO;
|
||||
}): Promise<ConfigReplaceResult> {
|
||||
return await withConfigMutationLock(
|
||||
{ io: params.io, lockPath: params.snapshot?.path },
|
||||
async () => await replaceConfigFileUnlocked(params),
|
||||
);
|
||||
}
|
||||
|
||||
async function replaceConfigFileUnlocked(params: {
|
||||
nextConfig: OpenClawConfig;
|
||||
baseHash?: string;
|
||||
snapshot?: ConfigFileSnapshot;
|
||||
afterWrite?: ConfigWriteOptions["afterWrite"];
|
||||
writeOptions?: ConfigWriteOptions;
|
||||
io?: ConfigMutationIO;
|
||||
}): Promise<ConfigReplaceResult> {
|
||||
const prepared =
|
||||
params.snapshot && params.writeOptions
|
||||
@@ -230,6 +356,7 @@ export async function replaceConfigFile(params: {
|
||||
: await (params.io?.readConfigFileSnapshotForWrite ?? readConfigFileSnapshotForWrite)();
|
||||
const { snapshot, writeOptions } = prepared;
|
||||
assertConfigWriteAllowedInCurrentMode({ configPath: snapshot.path });
|
||||
markActiveConfigMutationPath(snapshot.path);
|
||||
const previousHash = assertBaseHashMatches(snapshot, params.baseHash);
|
||||
const afterWrite = resolveConfigWriteAfterWrite(
|
||||
params.afterWrite ?? params.writeOptions?.afterWrite,
|
||||
@@ -259,52 +386,138 @@ export async function replaceConfigFile(params: {
|
||||
};
|
||||
}
|
||||
|
||||
async function commitPreparedConfigMutation(
|
||||
params: ConfigMutationCommitParams,
|
||||
): Promise<ConfigMutationCommitResult> {
|
||||
const result = await replaceConfigFileUnlocked({
|
||||
nextConfig: params.nextConfig,
|
||||
snapshot: params.snapshot,
|
||||
baseHash: params.baseHash,
|
||||
writeOptions: {
|
||||
...params.writeOptions,
|
||||
afterWrite: params.afterWrite,
|
||||
},
|
||||
io: params.io,
|
||||
});
|
||||
return {
|
||||
config: result.nextConfig,
|
||||
afterWrite: result.afterWrite,
|
||||
};
|
||||
}
|
||||
|
||||
async function transformConfigFileAttempt<T>(
|
||||
params: TransformConfigFileParams<T>,
|
||||
attempt: number,
|
||||
): Promise<ConfigMutationResult<T>> {
|
||||
const { snapshot, writeOptions } = await (
|
||||
params.io?.readConfigFileSnapshotForWrite ?? readConfigFileSnapshotForWrite
|
||||
)();
|
||||
assertConfigWriteAllowedInCurrentMode({ configPath: snapshot.path });
|
||||
markActiveConfigMutationPath(snapshot.path);
|
||||
const previousHash = assertBaseHashMatches(snapshot, params.baseHash);
|
||||
const baseConfig = params.base === "runtime" ? snapshot.runtimeConfig : snapshot.sourceConfig;
|
||||
const afterWrite = resolveConfigWriteAfterWrite(
|
||||
params.afterWrite ?? params.writeOptions?.afterWrite,
|
||||
);
|
||||
const mergedWriteOptions = {
|
||||
...writeOptions,
|
||||
...params.writeOptions,
|
||||
};
|
||||
const transformed = await params.transform(baseConfig, { snapshot, previousHash, attempt });
|
||||
const committed = await (params.commit ?? commitPreparedConfigMutation)({
|
||||
nextConfig: transformed.nextConfig,
|
||||
snapshot,
|
||||
...(previousHash !== null ? { baseHash: previousHash } : {}),
|
||||
writeOptions: mergedWriteOptions,
|
||||
afterWrite,
|
||||
io: params.io,
|
||||
});
|
||||
const committedAfterWrite = committed.afterWrite ?? afterWrite;
|
||||
return {
|
||||
path: snapshot.path,
|
||||
previousHash,
|
||||
snapshot,
|
||||
nextConfig: committed.config,
|
||||
result: transformed.result,
|
||||
attempts: attempt + 1,
|
||||
afterWrite: committedAfterWrite,
|
||||
followUp: resolveConfigWriteFollowUp(committedAfterWrite),
|
||||
};
|
||||
}
|
||||
|
||||
export async function transformConfigFile<T = void>(
|
||||
params: TransformConfigFileParams<T>,
|
||||
): Promise<ConfigMutationResult<T>> {
|
||||
return await withConfigMutationLock(
|
||||
{ io: params.io },
|
||||
async () => await transformConfigFileAttempt(params, 0),
|
||||
);
|
||||
}
|
||||
|
||||
export async function transformConfigFileWithRetry<T = void>(
|
||||
params: TransformConfigFileWithRetryParams<T>,
|
||||
): Promise<ConfigMutationResult<T>> {
|
||||
const maxAttempts = params.maxAttempts ?? DEFAULT_CONFIG_MUTATION_RETRY_ATTEMPTS;
|
||||
if (!Number.isInteger(maxAttempts) || maxAttempts < 1) {
|
||||
throw new Error("Config mutation maxAttempts must be a positive integer.");
|
||||
}
|
||||
return await withConfigMutationLock({ io: params.io }, async () => {
|
||||
for (let attempt = 0; attempt < maxAttempts; attempt += 1) {
|
||||
try {
|
||||
return await transformConfigFileAttempt(params, attempt);
|
||||
} catch (err) {
|
||||
if (err instanceof ConfigMutationConflictError && attempt < maxAttempts - 1) {
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
throw new Error("Config mutation retry loop exhausted unexpectedly.");
|
||||
});
|
||||
}
|
||||
|
||||
export async function mutateConfigFile<T = void>(params: {
|
||||
base?: ConfigMutationBase;
|
||||
baseHash?: string;
|
||||
afterWrite?: ConfigWriteOptions["afterWrite"];
|
||||
writeOptions?: ConfigWriteOptions;
|
||||
io?: ConfigMutationIO;
|
||||
mutate: (
|
||||
draft: OpenClawConfig,
|
||||
context: { snapshot: ConfigFileSnapshot; previousHash: string | null },
|
||||
) => Promise<T | void> | T | void;
|
||||
}): Promise<ConfigReplaceResult & { result: T | undefined }> {
|
||||
const { snapshot, writeOptions } = await (
|
||||
params.io?.readConfigFileSnapshotForWrite ?? readConfigFileSnapshotForWrite
|
||||
)();
|
||||
assertConfigWriteAllowedInCurrentMode({ configPath: snapshot.path });
|
||||
const previousHash = assertBaseHashMatches(snapshot, params.baseHash);
|
||||
const baseConfig = params.base === "runtime" ? snapshot.runtimeConfig : snapshot.sourceConfig;
|
||||
const draft = structuredClone(baseConfig) as OpenClawConfig;
|
||||
const result = (await params.mutate(draft, { snapshot, previousHash })) as T | undefined;
|
||||
const afterWrite = resolveConfigWriteAfterWrite(
|
||||
params.afterWrite ?? params.writeOptions?.afterWrite,
|
||||
);
|
||||
const wroteInclude = await tryWriteSingleTopLevelIncludeMutation({
|
||||
snapshot,
|
||||
nextConfig: draft,
|
||||
afterWrite,
|
||||
writeOptions: {
|
||||
...writeOptions,
|
||||
...params.writeOptions,
|
||||
},
|
||||
mutate: (draft: OpenClawConfig, context: ConfigMutationContext) => Promise<T | void> | T | void;
|
||||
}): Promise<ConfigMutationResult<T>> {
|
||||
return await transformConfigFile<T>({
|
||||
base: params.base,
|
||||
baseHash: params.baseHash,
|
||||
afterWrite: params.afterWrite,
|
||||
writeOptions: params.writeOptions,
|
||||
io: params.io,
|
||||
transform: async (currentConfig, context) => {
|
||||
const draft = structuredClone(currentConfig) as OpenClawConfig;
|
||||
const result = (await params.mutate(draft, context)) as T | undefined;
|
||||
return { nextConfig: draft, result };
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function mutateConfigFileWithRetry<T = void>(params: {
|
||||
base?: ConfigMutationBase;
|
||||
baseHash?: string;
|
||||
maxAttempts?: number;
|
||||
afterWrite?: ConfigWriteOptions["afterWrite"];
|
||||
writeOptions?: ConfigWriteOptions;
|
||||
io?: ConfigMutationIO;
|
||||
mutate: (draft: OpenClawConfig, context: ConfigMutationContext) => Promise<T | void> | T | void;
|
||||
}): Promise<ConfigMutationResult<T>> {
|
||||
return await transformConfigFileWithRetry<T>({
|
||||
base: params.base,
|
||||
baseHash: params.baseHash,
|
||||
maxAttempts: params.maxAttempts,
|
||||
afterWrite: params.afterWrite,
|
||||
writeOptions: params.writeOptions,
|
||||
io: params.io,
|
||||
transform: async (currentConfig, context) => {
|
||||
const draft = structuredClone(currentConfig) as OpenClawConfig;
|
||||
const result = (await params.mutate(draft, context)) as T | undefined;
|
||||
return { nextConfig: draft, result };
|
||||
},
|
||||
});
|
||||
if (!wroteInclude) {
|
||||
await (params.io?.writeConfigFile ?? writeConfigFile)(draft, {
|
||||
...writeOptions,
|
||||
...params.writeOptions,
|
||||
afterWrite,
|
||||
});
|
||||
}
|
||||
return {
|
||||
path: snapshot.path,
|
||||
previousHash,
|
||||
snapshot,
|
||||
nextConfig: draft,
|
||||
result,
|
||||
afterWrite,
|
||||
followUp: resolveConfigWriteFollowUp(afterWrite),
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user