mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:20:43 +00:00
fix(sandbox): harden sharded registry storage
This commit is contained in:
@@ -53,10 +53,5 @@ export const SANDBOX_AGENT_WORKSPACE_MOUNT = "/agent";
|
||||
export const SANDBOX_STATE_DIR = path.join(STATE_DIR, "sandbox");
|
||||
export const SANDBOX_REGISTRY_PATH = path.join(SANDBOX_STATE_DIR, "containers.json");
|
||||
export const SANDBOX_BROWSER_REGISTRY_PATH = path.join(SANDBOX_STATE_DIR, "browsers.json");
|
||||
|
||||
// Per-entry sharded directories — each container/browser gets its own JSON
|
||||
// file under these dirs, eliminating cross-session lock contention on the
|
||||
// monolithic registry files. The legacy *.json files above are migrated
|
||||
// into these dirs on first read and then removed.
|
||||
export const SANDBOX_CONTAINERS_DIR = path.join(SANDBOX_STATE_DIR, "containers");
|
||||
export const SANDBOX_BROWSERS_DIR = path.join(SANDBOX_STATE_DIR, "browsers");
|
||||
|
||||
@@ -1,5 +1,13 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { afterAll, afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { afterAll, afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
type WriteDelayConfig = {
|
||||
targetFile: "containers.json" | "browsers.json" | null;
|
||||
containerName: string;
|
||||
started: boolean;
|
||||
markStarted: () => void;
|
||||
waitForRelease: Promise<void>;
|
||||
};
|
||||
|
||||
const {
|
||||
TEST_STATE_DIR,
|
||||
@@ -7,17 +15,20 @@ const {
|
||||
SANDBOX_BROWSER_REGISTRY_PATH,
|
||||
SANDBOX_CONTAINERS_DIR,
|
||||
SANDBOX_BROWSERS_DIR,
|
||||
writeGateState,
|
||||
} = vi.hoisted(() => {
|
||||
const p = require("node:path");
|
||||
const path = require("node:path");
|
||||
const { mkdtempSync } = require("node:fs");
|
||||
const { tmpdir } = require("node:os");
|
||||
const baseDir = mkdtempSync(p.join(tmpdir(), "openclaw-sandbox-registry-"));
|
||||
const baseDir = mkdtempSync(path.join(tmpdir(), "openclaw-sandbox-registry-"));
|
||||
|
||||
return {
|
||||
TEST_STATE_DIR: baseDir,
|
||||
SANDBOX_REGISTRY_PATH: p.join(baseDir, "containers.json"),
|
||||
SANDBOX_BROWSER_REGISTRY_PATH: p.join(baseDir, "browsers.json"),
|
||||
SANDBOX_CONTAINERS_DIR: p.join(baseDir, "containers"),
|
||||
SANDBOX_BROWSERS_DIR: p.join(baseDir, "browsers"),
|
||||
SANDBOX_REGISTRY_PATH: path.join(baseDir, "containers.json"),
|
||||
SANDBOX_BROWSER_REGISTRY_PATH: path.join(baseDir, "browsers.json"),
|
||||
SANDBOX_CONTAINERS_DIR: path.join(baseDir, "containers"),
|
||||
SANDBOX_BROWSERS_DIR: path.join(baseDir, "browsers"),
|
||||
writeGateState: { active: null as WriteDelayConfig | null },
|
||||
};
|
||||
});
|
||||
|
||||
@@ -29,6 +40,35 @@ vi.mock("./constants.js", () => ({
|
||||
SANDBOX_BROWSERS_DIR,
|
||||
}));
|
||||
|
||||
vi.mock("../../infra/json-files.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../../infra/json-files.js")>(
|
||||
"../../infra/json-files.js",
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
writeJsonAtomic: async (
|
||||
filePath: string,
|
||||
value: unknown,
|
||||
options?: Parameters<typeof actual.writeJsonAtomic>[2],
|
||||
) => {
|
||||
const payload = JSON.stringify(value);
|
||||
const gate = writeGateState.active;
|
||||
if (
|
||||
gate &&
|
||||
(!gate.targetFile || filePath.includes(gate.targetFile)) &&
|
||||
payloadMentionsContainer(payload, gate.containerName)
|
||||
) {
|
||||
if (!gate.started) {
|
||||
gate.started = true;
|
||||
gate.markStarted();
|
||||
}
|
||||
await gate.waitForRelease;
|
||||
}
|
||||
await actual.writeJsonAtomic(filePath, value, options);
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
import {
|
||||
readBrowserRegistry,
|
||||
readRegistry,
|
||||
@@ -42,6 +82,53 @@ import {
|
||||
type SandboxBrowserRegistryEntry = import("./registry.js").SandboxBrowserRegistryEntry;
|
||||
type SandboxRegistryEntry = import("./registry.js").SandboxRegistryEntry;
|
||||
|
||||
function payloadMentionsContainer(payload: string, containerName: string): boolean {
|
||||
return (
|
||||
payload.includes(`"containerName":"${containerName}"`) ||
|
||||
payload.includes(`"containerName": "${containerName}"`)
|
||||
);
|
||||
}
|
||||
|
||||
async function seedMalformedContainerRegistry(payload: string) {
|
||||
await fs.writeFile(SANDBOX_REGISTRY_PATH, payload, "utf-8");
|
||||
}
|
||||
|
||||
async function seedMalformedBrowserRegistry(payload: string) {
|
||||
await fs.writeFile(SANDBOX_BROWSER_REGISTRY_PATH, payload, "utf-8");
|
||||
}
|
||||
|
||||
function installWriteGate(
|
||||
targetFile: "containers.json" | "browsers.json" | null,
|
||||
containerName: string,
|
||||
): { waitForStart: Promise<void>; release: () => void } {
|
||||
let markStarted = () => {};
|
||||
const waitForStart = new Promise<void>((resolve) => {
|
||||
markStarted = resolve;
|
||||
});
|
||||
let resolveRelease = () => {};
|
||||
const waitForRelease = new Promise<void>((resolve) => {
|
||||
resolveRelease = resolve;
|
||||
});
|
||||
writeGateState.active = {
|
||||
targetFile,
|
||||
containerName,
|
||||
started: false,
|
||||
markStarted,
|
||||
waitForRelease,
|
||||
};
|
||||
return {
|
||||
waitForStart,
|
||||
release: () => {
|
||||
resolveRelease();
|
||||
writeGateState.active = null;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
writeGateState.active = null;
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await fs.rm(SANDBOX_CONTAINERS_DIR, { recursive: true, force: true });
|
||||
await fs.rm(SANDBOX_BROWSERS_DIR, { recursive: true, force: true });
|
||||
@@ -80,102 +167,13 @@ function containerEntry(overrides: Partial<SandboxRegistryEntry> = {}): SandboxR
|
||||
};
|
||||
}
|
||||
|
||||
async function seedMonolithicContainerRegistry(entries: SandboxRegistryEntry[]) {
|
||||
async function seedContainerRegistry(entries: SandboxRegistryEntry[]) {
|
||||
await fs.writeFile(SANDBOX_REGISTRY_PATH, `${JSON.stringify({ entries }, null, 2)}\n`, "utf-8");
|
||||
}
|
||||
|
||||
async function seedMonolithicBrowserRegistry(entries: SandboxBrowserRegistryEntry[]) {
|
||||
await fs.writeFile(
|
||||
SANDBOX_BROWSER_REGISTRY_PATH,
|
||||
`${JSON.stringify({ entries }, null, 2)}\n`,
|
||||
"utf-8",
|
||||
);
|
||||
}
|
||||
|
||||
describe("per-file sharded container registry", () => {
|
||||
it("writes and reads a single container entry", async () => {
|
||||
await updateRegistry(containerEntry({ containerName: "container-a" }));
|
||||
|
||||
const registry = await readRegistry();
|
||||
expect(registry.entries).toHaveLength(1);
|
||||
expect(registry.entries[0].containerName).toBe("container-a");
|
||||
});
|
||||
|
||||
it("keeps both container updates under concurrent writes", async () => {
|
||||
// The old monolithic-file model serialized every writer through a
|
||||
// single lock. Per-entry files must handle concurrent upserts of
|
||||
// different containers without any ordering or contention.
|
||||
await Promise.all([
|
||||
updateRegistry(containerEntry({ containerName: "container-a" })),
|
||||
updateRegistry(containerEntry({ containerName: "container-b" })),
|
||||
]);
|
||||
|
||||
const registry = await readRegistry();
|
||||
expect(
|
||||
registry.entries
|
||||
.map((entry) => entry.containerName)
|
||||
.slice()
|
||||
.toSorted(),
|
||||
).toEqual(["container-a", "container-b"]);
|
||||
});
|
||||
|
||||
it("readRegistryEntry returns a single entry without scanning the whole dir", async () => {
|
||||
await updateRegistry(containerEntry({ containerName: "container-x", sessionKey: "sess:x" }));
|
||||
await updateRegistry(containerEntry({ containerName: "container-y", sessionKey: "sess:y" }));
|
||||
|
||||
const entry = await readRegistryEntry("container-x");
|
||||
expect(entry).not.toBeNull();
|
||||
expect(entry?.containerName).toBe("container-x");
|
||||
expect(entry?.sessionKey).toBe("sess:x");
|
||||
});
|
||||
|
||||
it("readRegistryEntry returns null when the container has no entry file", async () => {
|
||||
const missing = await readRegistryEntry("nonexistent-container");
|
||||
expect(missing).toBeNull();
|
||||
});
|
||||
|
||||
it("removeRegistryEntry deletes only the target container file", async () => {
|
||||
await updateRegistry(containerEntry({ containerName: "container-a" }));
|
||||
await updateRegistry(containerEntry({ containerName: "container-b" }));
|
||||
await removeRegistryEntry("container-a");
|
||||
|
||||
const registry = await readRegistry();
|
||||
expect(registry.entries).toHaveLength(1);
|
||||
expect(registry.entries[0].containerName).toBe("container-b");
|
||||
});
|
||||
|
||||
it("removeRegistryEntry is a no-op for a container that never existed", async () => {
|
||||
// force:true on rm swallows ENOENT — callers should not have to
|
||||
// check existence first.
|
||||
await expect(removeRegistryEntry("never-created")).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("updateRegistry preserves createdAtMs and image from an existing entry", async () => {
|
||||
await updateRegistry(
|
||||
containerEntry({
|
||||
containerName: "c",
|
||||
createdAtMs: 100,
|
||||
lastUsedAtMs: 100,
|
||||
image: "original:tag",
|
||||
}),
|
||||
);
|
||||
await updateRegistry(
|
||||
containerEntry({
|
||||
containerName: "c",
|
||||
createdAtMs: 999,
|
||||
lastUsedAtMs: 200,
|
||||
image: "ignored:tag",
|
||||
}),
|
||||
);
|
||||
|
||||
const entry = await readRegistryEntry("c");
|
||||
expect(entry?.createdAtMs).toBe(100);
|
||||
expect(entry?.lastUsedAtMs).toBe(200);
|
||||
expect(entry?.image).toBe("original:tag");
|
||||
});
|
||||
|
||||
it("readRegistry normalizes legacy entries that lack backendId/runtimeLabel/configLabelKind", async () => {
|
||||
await seedMonolithicContainerRegistry([
|
||||
describe("registry race safety", () => {
|
||||
it("normalizes legacy registry entries on read", async () => {
|
||||
await seedContainerRegistry([
|
||||
{
|
||||
containerName: "legacy-container",
|
||||
sessionKey: "agent:main",
|
||||
@@ -196,31 +194,84 @@ describe("per-file sharded container registry", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("skips a single corrupt per-entry file without hiding the other entries", async () => {
|
||||
// A crashed writer that left behind a partial JSON file must not
|
||||
// prevent other containers from being enumerated.
|
||||
await updateRegistry(containerEntry({ containerName: "good-a" }));
|
||||
await updateRegistry(containerEntry({ containerName: "good-b" }));
|
||||
await fs.writeFile(`${SANDBOX_CONTAINERS_DIR}/corrupt.json`, "{not json", "utf-8");
|
||||
it("reads a single sharded entry without scanning the full registry", async () => {
|
||||
await updateRegistry(containerEntry({ containerName: "container-x", sessionKey: "sess:x" }));
|
||||
await updateRegistry(containerEntry({ containerName: "container-y", sessionKey: "sess:y" }));
|
||||
|
||||
const entry = await readRegistryEntry("container-x");
|
||||
expect(entry).toEqual(
|
||||
expect.objectContaining({
|
||||
containerName: "container-x",
|
||||
sessionKey: "sess:x",
|
||||
}),
|
||||
);
|
||||
await expect(readRegistryEntry("missing-container")).resolves.toBeNull();
|
||||
});
|
||||
|
||||
it("keeps both container updates under concurrent writes", async () => {
|
||||
await Promise.all([
|
||||
updateRegistry(containerEntry({ containerName: "container-a" })),
|
||||
updateRegistry(containerEntry({ containerName: "container-b" })),
|
||||
]);
|
||||
|
||||
const registry = await readRegistry();
|
||||
expect(registry.entries).toHaveLength(2);
|
||||
expect(
|
||||
registry.entries
|
||||
.map((entry) => entry.containerName)
|
||||
.slice()
|
||||
.toSorted(),
|
||||
).toEqual(["good-a", "good-b"]);
|
||||
).toEqual(["container-a", "container-b"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("per-file sharded browser registry", () => {
|
||||
it("writes and reads both browser entries under concurrent writes", async () => {
|
||||
it("prevents concurrent container remove/update from resurrecting deleted entries", async () => {
|
||||
await updateRegistry(containerEntry({ containerName: "container-x" }));
|
||||
const writeGate = installWriteGate(null, "container-x");
|
||||
|
||||
const updatePromise = updateRegistry(
|
||||
containerEntry({ containerName: "container-x", configHash: "updated" }),
|
||||
);
|
||||
await writeGate.waitForStart;
|
||||
const removePromise = removeRegistryEntry("container-x");
|
||||
writeGate.release();
|
||||
await Promise.all([updatePromise, removePromise]);
|
||||
|
||||
const registry = await readRegistry();
|
||||
expect(registry.entries).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("stores unsafe container names as encoded shard filenames", async () => {
|
||||
await seedContainerRegistry([containerEntry({ containerName: "../escape" })]);
|
||||
|
||||
const registry = await readRegistry();
|
||||
|
||||
expect(registry.entries.map((entry) => entry.containerName)).toEqual(["../escape"]);
|
||||
await expect(fs.access(`${TEST_STATE_DIR}/escape.json`)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("returns registry entries in deterministic container-name order", async () => {
|
||||
await Promise.all([
|
||||
updateRegistry(containerEntry({ containerName: "container-c" })),
|
||||
updateRegistry(containerEntry({ containerName: "container-a" })),
|
||||
updateRegistry(containerEntry({ containerName: "container-b" })),
|
||||
]);
|
||||
|
||||
const registry = await readRegistry();
|
||||
expect(registry.entries.map((entry) => entry.containerName)).toEqual([
|
||||
"container-a",
|
||||
"container-b",
|
||||
"container-c",
|
||||
]);
|
||||
});
|
||||
|
||||
it("keeps both browser updates under concurrent writes", async () => {
|
||||
await Promise.all([
|
||||
updateBrowserRegistry(browserEntry({ containerName: "browser-a" })),
|
||||
updateBrowserRegistry(browserEntry({ containerName: "browser-b", cdpPort: 9223 })),
|
||||
]);
|
||||
|
||||
const registry = await readBrowserRegistry();
|
||||
expect(registry.entries).toHaveLength(2);
|
||||
expect(
|
||||
registry.entries
|
||||
.map((entry) => entry.containerName)
|
||||
@@ -229,73 +280,43 @@ describe("per-file sharded browser registry", () => {
|
||||
).toEqual(["browser-a", "browser-b"]);
|
||||
});
|
||||
|
||||
it("removes a single browser entry", async () => {
|
||||
await updateBrowserRegistry(browserEntry({ containerName: "browser-a" }));
|
||||
await removeBrowserRegistryEntry("browser-a");
|
||||
it("prevents concurrent browser remove/update from resurrecting deleted entries", async () => {
|
||||
await updateBrowserRegistry(browserEntry({ containerName: "browser-x" }));
|
||||
const writeGate = installWriteGate(null, "browser-x");
|
||||
|
||||
const updatePromise = updateBrowserRegistry(
|
||||
browserEntry({ containerName: "browser-x", configHash: "updated" }),
|
||||
);
|
||||
await writeGate.waitForStart;
|
||||
const removePromise = removeBrowserRegistryEntry("browser-x");
|
||||
writeGate.release();
|
||||
await Promise.all([updatePromise, removePromise]);
|
||||
|
||||
const registry = await readBrowserRegistry();
|
||||
expect(registry.entries).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("monolithic → per-file migration", () => {
|
||||
it("migrates container entries from the legacy containers.json on first read", async () => {
|
||||
await seedMonolithicContainerRegistry([
|
||||
containerEntry({ containerName: "old-a", sessionKey: "sess:a" }),
|
||||
containerEntry({ containerName: "old-b", sessionKey: "sess:b" }),
|
||||
]);
|
||||
it("quarantines malformed legacy registry files during migration", async () => {
|
||||
await seedMalformedContainerRegistry("{bad json");
|
||||
await seedMalformedBrowserRegistry("{bad json");
|
||||
await expect(updateRegistry(containerEntry())).resolves.toBeUndefined();
|
||||
await expect(updateBrowserRegistry(browserEntry())).resolves.toBeUndefined();
|
||||
|
||||
const registry = await readRegistry();
|
||||
expect(
|
||||
registry.entries
|
||||
.map((entry) => entry.containerName)
|
||||
.slice()
|
||||
.toSorted(),
|
||||
).toEqual(["old-a", "old-b"]);
|
||||
|
||||
// Legacy file is removed once migration succeeds, so subsequent reads
|
||||
// go straight to the sharded dir.
|
||||
await expect(fs.access(SANDBOX_REGISTRY_PATH)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("cleans up the stale .lock file alongside the migrated registry", async () => {
|
||||
await seedMonolithicContainerRegistry([containerEntry({ containerName: "old-a" })]);
|
||||
await fs.writeFile(`${SANDBOX_REGISTRY_PATH}.lock`, "stale", "utf-8");
|
||||
|
||||
await readRegistry();
|
||||
await expect(fs.access(`${SANDBOX_REGISTRY_PATH}.lock`)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("migrates browser entries from the legacy browsers.json", async () => {
|
||||
await seedMonolithicBrowserRegistry([
|
||||
browserEntry({ containerName: "old-br-a" }),
|
||||
browserEntry({ containerName: "old-br-b", cdpPort: 9223 }),
|
||||
]);
|
||||
|
||||
const registry = await readBrowserRegistry();
|
||||
expect(registry.entries).toHaveLength(2);
|
||||
await expect(fs.access(SANDBOX_BROWSER_REGISTRY_PATH)).rejects.toThrow();
|
||||
await expect(readRegistry()).resolves.toEqual({
|
||||
entries: [expect.objectContaining({ containerName: "container-a" })],
|
||||
});
|
||||
await expect(readBrowserRegistry()).resolves.toEqual({
|
||||
entries: [expect.objectContaining({ containerName: "browser-a" })],
|
||||
});
|
||||
});
|
||||
|
||||
it("drops a malformed legacy containers.json rather than throwing forever", async () => {
|
||||
// A corrupt legacy file would otherwise block every sandbox operation
|
||||
// on every boot — dropping it lets the operator recover just by
|
||||
// creating a new container.
|
||||
await fs.writeFile(SANDBOX_REGISTRY_PATH, "{bad json", "utf-8");
|
||||
|
||||
const registry = await readRegistry();
|
||||
expect(registry.entries).toHaveLength(0);
|
||||
await expect(fs.access(SANDBOX_REGISTRY_PATH)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("drops a legacy containers.json whose entries fail schema validation", async () => {
|
||||
// entries missing containerName cannot be migrated safely. Rather
|
||||
// than preserving the bad file, we remove it — the lost data was
|
||||
// unusable anyway, and keeping it around would block future boots.
|
||||
await fs.writeFile(SANDBOX_REGISTRY_PATH, `{"entries":[{"sessionKey":"agent:main"}]}`, "utf-8");
|
||||
|
||||
const registry = await readRegistry();
|
||||
expect(registry.entries).toHaveLength(0);
|
||||
await expect(fs.access(SANDBOX_REGISTRY_PATH)).rejects.toThrow();
|
||||
it("quarantines legacy registry files with invalid entries during migration", async () => {
|
||||
const invalidEntries = `{"entries":[{"sessionKey":"agent:main"}]}`;
|
||||
await seedMalformedContainerRegistry(invalidEntries);
|
||||
await seedMalformedBrowserRegistry(invalidEntries);
|
||||
await expect(updateRegistry(containerEntry())).resolves.toBeUndefined();
|
||||
await expect(updateBrowserRegistry(browserEntry())).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3,12 +3,14 @@ import path from "node:path";
|
||||
import { z } from "zod";
|
||||
import { writeJsonAtomic } from "../../infra/json-files.js";
|
||||
import { safeParseJsonWithSchema } from "../../utils/zod-parse.js";
|
||||
import { acquireSessionWriteLock } from "../session-write-lock.js";
|
||||
import {
|
||||
SANDBOX_BROWSER_REGISTRY_PATH,
|
||||
SANDBOX_BROWSERS_DIR,
|
||||
SANDBOX_CONTAINERS_DIR,
|
||||
SANDBOX_REGISTRY_PATH,
|
||||
} from "./constants.js";
|
||||
import { hashTextSha256 } from "./hash.js";
|
||||
|
||||
export type SandboxRegistryEntry = {
|
||||
containerName: string;
|
||||
@@ -45,13 +47,12 @@ type RegistryEntry = {
|
||||
containerName: string;
|
||||
};
|
||||
|
||||
type RegistryEntryPayload = RegistryEntry & Record<string, unknown>;
|
||||
|
||||
type RegistryFile = {
|
||||
entries: RegistryEntry[];
|
||||
entries: RegistryEntryPayload[];
|
||||
};
|
||||
|
||||
// Schemas are shared between the per-entry files (live writes) and the
|
||||
// legacy monolithic files (one-shot migration). Both shapes must validate
|
||||
// containerName; per-entry files are just the RegistryEntrySchema directly.
|
||||
const RegistryEntrySchema = z
|
||||
.object({
|
||||
containerName: z.string(),
|
||||
@@ -71,25 +72,73 @@ function normalizeSandboxRegistryEntry(entry: SandboxRegistryEntry): SandboxRegi
|
||||
};
|
||||
}
|
||||
|
||||
// ── Per-entry file primitives ──────────────────────────────────────────
|
||||
//
|
||||
// Each container gets its own JSON file under the sharded directory.
|
||||
// Writes use writeJsonAtomic (tmp + rename) for crash-safety. No file
|
||||
// locks are needed — each concurrent writer only touches its own file,
|
||||
// so there is zero cross-session contention on the monolithic lock that
|
||||
// previously serialized every sandbox ensure/remove in the process tree.
|
||||
|
||||
function entryFilePath(dir: string, containerName: string): string {
|
||||
return path.join(dir, `${containerName}.json`);
|
||||
async function withRegistryLock<T>(registryPath: string, fn: () => Promise<T>): Promise<T> {
|
||||
const lock = await acquireSessionWriteLock({
|
||||
sessionFile: registryPath,
|
||||
allowReentrant: false,
|
||||
timeoutMs: 60_000,
|
||||
});
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
await lock.release();
|
||||
}
|
||||
}
|
||||
|
||||
async function readEntryFile<T extends RegistryEntry>(
|
||||
async function readLegacyRegistryFile(registryPath: string): Promise<RegistryFile | null> {
|
||||
try {
|
||||
const raw = await fs.readFile(registryPath, "utf-8");
|
||||
const parsed = safeParseJsonWithSchema(RegistryFileSchema, raw) as RegistryFile | null;
|
||||
return parsed;
|
||||
} catch (error) {
|
||||
const code = (error as { code?: string } | null)?.code;
|
||||
if (code === "ENOENT") {
|
||||
return { entries: [] };
|
||||
}
|
||||
if (error instanceof Error) {
|
||||
throw error;
|
||||
}
|
||||
throw new Error(`Failed to read sandbox registry file: ${registryPath}`, { cause: error });
|
||||
}
|
||||
}
|
||||
|
||||
export async function readRegistry(): Promise<SandboxRegistry> {
|
||||
await migrateMonolithicIfNeeded(SANDBOX_REGISTRY_PATH, SANDBOX_CONTAINERS_DIR);
|
||||
const entries = await readShardedEntries<SandboxRegistryEntry>(SANDBOX_CONTAINERS_DIR);
|
||||
return {
|
||||
entries: entries.map((entry) => normalizeSandboxRegistryEntry(entry)),
|
||||
};
|
||||
}
|
||||
|
||||
function shardedEntryFilePath(dir: string, containerName: string): string {
|
||||
return path.join(dir, `${hashTextSha256(containerName)}.json`);
|
||||
}
|
||||
|
||||
async function withEntryLock<T>(
|
||||
dir: string,
|
||||
containerName: string,
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
const entryPath = shardedEntryFilePath(dir, containerName);
|
||||
const lock = await acquireSessionWriteLock({
|
||||
sessionFile: entryPath,
|
||||
allowReentrant: false,
|
||||
timeoutMs: 60_000,
|
||||
});
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
await lock.release();
|
||||
}
|
||||
}
|
||||
|
||||
async function readShardedEntry<T extends RegistryEntry>(
|
||||
dir: string,
|
||||
containerName: string,
|
||||
): Promise<T | null> {
|
||||
let raw: string;
|
||||
try {
|
||||
raw = await fs.readFile(entryFilePath(dir, containerName), "utf-8");
|
||||
raw = await fs.readFile(shardedEntryFilePath(dir, containerName), "utf-8");
|
||||
} catch (error) {
|
||||
const code = (error as { code?: string } | null)?.code;
|
||||
if (code === "ENOENT") {
|
||||
@@ -98,153 +147,155 @@ async function readEntryFile<T extends RegistryEntry>(
|
||||
throw error;
|
||||
}
|
||||
const parsed = safeParseJsonWithSchema(RegistryEntrySchema, raw) as T | null;
|
||||
return parsed ?? null;
|
||||
return parsed?.containerName === containerName ? parsed : null;
|
||||
}
|
||||
|
||||
async function writeEntryFile(dir: string, entry: RegistryEntry): Promise<void> {
|
||||
async function writeShardedEntry(dir: string, entry: RegistryEntryPayload): Promise<void> {
|
||||
await fs.mkdir(dir, { recursive: true });
|
||||
await writeJsonAtomic(entryFilePath(dir, entry.containerName), entry, { trailingNewline: true });
|
||||
await writeJsonAtomic(shardedEntryFilePath(dir, entry.containerName), entry, {
|
||||
trailingNewline: true,
|
||||
});
|
||||
}
|
||||
|
||||
async function removeEntryFile(dir: string, containerName: string): Promise<void> {
|
||||
try {
|
||||
await fs.rm(entryFilePath(dir, containerName), { force: true });
|
||||
} catch {
|
||||
// A concurrent remove or a missing file is fine — force:true already
|
||||
// swallows ENOENT; any other error (e.g. permission) is non-fatal here
|
||||
// because the caller's intent was "make sure it's gone".
|
||||
}
|
||||
async function removeShardedEntry(dir: string, containerName: string): Promise<void> {
|
||||
await fs.rm(shardedEntryFilePath(dir, containerName), { force: true });
|
||||
}
|
||||
|
||||
/** Scan every per-entry JSON file in a sharded directory. */
|
||||
async function readAllEntries<T extends RegistryEntry>(dir: string): Promise<T[]> {
|
||||
async function readShardedEntries<T extends RegistryEntry>(dir: string): Promise<T[]> {
|
||||
let files: string[];
|
||||
try {
|
||||
files = await fs.readdir(dir);
|
||||
} catch {
|
||||
return [];
|
||||
} catch (error) {
|
||||
const code = (error as { code?: string } | null)?.code;
|
||||
if (code === "ENOENT") {
|
||||
return [];
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
const entries: T[] = [];
|
||||
await Promise.all(
|
||||
|
||||
const entries = await Promise.all(
|
||||
files
|
||||
.filter((name) => name.endsWith(".json"))
|
||||
.toSorted()
|
||||
.map(async (name) => {
|
||||
try {
|
||||
const raw = await fs.readFile(path.join(dir, name), "utf-8");
|
||||
const parsed = safeParseJsonWithSchema(RegistryEntrySchema, raw) as T | null;
|
||||
if (parsed) {
|
||||
entries.push(parsed);
|
||||
}
|
||||
// Corrupt / partially-written files are skipped rather than
|
||||
// aborting the whole read: one bad entry should not hide every
|
||||
// other container the operator has running.
|
||||
return safeParseJsonWithSchema(RegistryEntrySchema, raw) as T | null;
|
||||
} catch {
|
||||
// ignore unreadable files for the same reason
|
||||
return null;
|
||||
}
|
||||
}),
|
||||
);
|
||||
return entries;
|
||||
}
|
||||
|
||||
// ── One-shot migration from monolithic file → per-entry files ──────────
|
||||
|
||||
async function migrateMonolithicIfNeeded(oldPath: string, newDir: string): Promise<void> {
|
||||
let raw: string;
|
||||
try {
|
||||
raw = await fs.readFile(oldPath, "utf-8");
|
||||
} catch {
|
||||
// Old file does not exist (already migrated on a previous boot, or
|
||||
// fresh install). Nothing to do.
|
||||
return;
|
||||
const validEntries: T[] = [];
|
||||
for (const entry of entries) {
|
||||
if (entry) {
|
||||
validEntries.push(entry);
|
||||
}
|
||||
}
|
||||
const parsed = safeParseJsonWithSchema(RegistryFileSchema, raw) as RegistryFile | null;
|
||||
if (!parsed || parsed.entries.length === 0) {
|
||||
// Corrupt or empty — drop it (and its stale lock) so we don't re-attempt
|
||||
// migration every read.
|
||||
await fs.rm(oldPath, { force: true }).catch(() => {});
|
||||
await fs.rm(`${oldPath}.lock`, { force: true }).catch(() => {});
|
||||
return;
|
||||
}
|
||||
await fs.mkdir(newDir, { recursive: true });
|
||||
await Promise.all(
|
||||
parsed.entries.map((entry) =>
|
||||
writeJsonAtomic(entryFilePath(newDir, entry.containerName), entry, {
|
||||
trailingNewline: true,
|
||||
}),
|
||||
),
|
||||
return validEntries.toSorted((left, right) =>
|
||||
left.containerName.localeCompare(right.containerName),
|
||||
);
|
||||
// Migration succeeded: remove the monolithic file and any leftover lock
|
||||
// file from the previous single-writer scheme.
|
||||
await fs.rm(oldPath, { force: true }).catch(() => {});
|
||||
await fs.rm(`${oldPath}.lock`, { force: true }).catch(() => {});
|
||||
}
|
||||
|
||||
// ── Public API: Container Registry ─────────────────────────────────────
|
||||
|
||||
export async function readRegistry(): Promise<SandboxRegistry> {
|
||||
await migrateMonolithicIfNeeded(SANDBOX_REGISTRY_PATH, SANDBOX_CONTAINERS_DIR);
|
||||
const entries = await readAllEntries<SandboxRegistryEntry>(SANDBOX_CONTAINERS_DIR);
|
||||
return { entries: entries.map(normalizeSandboxRegistryEntry) };
|
||||
async function quarantineLegacyRegistry(registryPath: string): Promise<void> {
|
||||
const quarantinePath = `${registryPath}.invalid-${Date.now()}`;
|
||||
await fs.rename(registryPath, quarantinePath).catch(async (error) => {
|
||||
const code = (error as { code?: string } | null)?.code;
|
||||
if (code !== "ENOENT") {
|
||||
await fs.rm(registryPath, { force: true });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function migrateMonolithicIfNeeded(registryPath: string, shardedDir: string): Promise<void> {
|
||||
try {
|
||||
await fs.access(registryPath);
|
||||
} catch (error) {
|
||||
const code = (error as { code?: string } | null)?.code;
|
||||
if (code === "ENOENT") {
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
await withRegistryLock(registryPath, async () => {
|
||||
const registry = await readLegacyRegistryFile(registryPath);
|
||||
if (!registry) {
|
||||
await quarantineLegacyRegistry(registryPath);
|
||||
return;
|
||||
}
|
||||
if (registry.entries.length === 0) {
|
||||
await fs.rm(registryPath, { force: true });
|
||||
return;
|
||||
}
|
||||
await fs.mkdir(shardedDir, { recursive: true });
|
||||
for (const entry of registry.entries) {
|
||||
await withEntryLock(shardedDir, entry.containerName, async () => {
|
||||
await writeShardedEntry(shardedDir, entry);
|
||||
});
|
||||
}
|
||||
await fs.rm(registryPath, { force: true });
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a single container entry by name.
|
||||
*
|
||||
* O(1) file read — avoids scanning the entire sharded directory just to
|
||||
* look up one container, which is the hot path for `ensureSandboxContainer`.
|
||||
*/
|
||||
export async function readRegistryEntry(
|
||||
containerName: string,
|
||||
): Promise<SandboxRegistryEntry | null> {
|
||||
await migrateMonolithicIfNeeded(SANDBOX_REGISTRY_PATH, SANDBOX_CONTAINERS_DIR);
|
||||
const entry = await readEntryFile<SandboxRegistryEntry>(SANDBOX_CONTAINERS_DIR, containerName);
|
||||
const entry = await readShardedEntry<SandboxRegistryEntry>(SANDBOX_CONTAINERS_DIR, containerName);
|
||||
return entry ? normalizeSandboxRegistryEntry(entry) : null;
|
||||
}
|
||||
|
||||
export async function updateRegistry(entry: SandboxRegistryEntry): Promise<void> {
|
||||
export async function updateRegistry(entry: SandboxRegistryEntry) {
|
||||
await migrateMonolithicIfNeeded(SANDBOX_REGISTRY_PATH, SANDBOX_CONTAINERS_DIR);
|
||||
const existing = await readEntryFile<SandboxRegistryEntry>(
|
||||
SANDBOX_CONTAINERS_DIR,
|
||||
entry.containerName,
|
||||
);
|
||||
const merged: SandboxRegistryEntry = {
|
||||
...entry,
|
||||
backendId: entry.backendId ?? existing?.backendId,
|
||||
runtimeLabel: entry.runtimeLabel ?? existing?.runtimeLabel,
|
||||
createdAtMs: existing?.createdAtMs ?? entry.createdAtMs,
|
||||
image: existing?.image ?? entry.image,
|
||||
configLabelKind: entry.configLabelKind ?? existing?.configLabelKind,
|
||||
configHash: entry.configHash ?? existing?.configHash,
|
||||
};
|
||||
await writeEntryFile(SANDBOX_CONTAINERS_DIR, merged);
|
||||
await withEntryLock(SANDBOX_CONTAINERS_DIR, entry.containerName, async () => {
|
||||
const existing = await readShardedEntry<SandboxRegistryEntry>(
|
||||
SANDBOX_CONTAINERS_DIR,
|
||||
entry.containerName,
|
||||
);
|
||||
await writeShardedEntry(SANDBOX_CONTAINERS_DIR, {
|
||||
...entry,
|
||||
backendId: entry.backendId ?? existing?.backendId,
|
||||
runtimeLabel: entry.runtimeLabel ?? existing?.runtimeLabel,
|
||||
createdAtMs: existing?.createdAtMs ?? entry.createdAtMs,
|
||||
image: existing?.image ?? entry.image,
|
||||
configLabelKind: entry.configLabelKind ?? existing?.configLabelKind,
|
||||
configHash: entry.configHash ?? existing?.configHash,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export async function removeRegistryEntry(containerName: string): Promise<void> {
|
||||
await removeEntryFile(SANDBOX_CONTAINERS_DIR, containerName);
|
||||
export async function removeRegistryEntry(containerName: string) {
|
||||
await migrateMonolithicIfNeeded(SANDBOX_REGISTRY_PATH, SANDBOX_CONTAINERS_DIR);
|
||||
await withEntryLock(SANDBOX_CONTAINERS_DIR, containerName, async () => {
|
||||
await removeShardedEntry(SANDBOX_CONTAINERS_DIR, containerName);
|
||||
});
|
||||
}
|
||||
|
||||
// ── Public API: Browser Registry ───────────────────────────────────────
|
||||
|
||||
export async function readBrowserRegistry(): Promise<SandboxBrowserRegistry> {
|
||||
await migrateMonolithicIfNeeded(SANDBOX_BROWSER_REGISTRY_PATH, SANDBOX_BROWSERS_DIR);
|
||||
return { entries: await readAllEntries<SandboxBrowserRegistryEntry>(SANDBOX_BROWSERS_DIR) };
|
||||
return { entries: await readShardedEntries<SandboxBrowserRegistryEntry>(SANDBOX_BROWSERS_DIR) };
|
||||
}
|
||||
|
||||
export async function updateBrowserRegistry(entry: SandboxBrowserRegistryEntry): Promise<void> {
|
||||
export async function updateBrowserRegistry(entry: SandboxBrowserRegistryEntry) {
|
||||
await migrateMonolithicIfNeeded(SANDBOX_BROWSER_REGISTRY_PATH, SANDBOX_BROWSERS_DIR);
|
||||
const existing = await readEntryFile<SandboxBrowserRegistryEntry>(
|
||||
SANDBOX_BROWSERS_DIR,
|
||||
entry.containerName,
|
||||
);
|
||||
const merged: SandboxBrowserRegistryEntry = {
|
||||
...entry,
|
||||
createdAtMs: existing?.createdAtMs ?? entry.createdAtMs,
|
||||
image: existing?.image ?? entry.image,
|
||||
configHash: entry.configHash ?? existing?.configHash,
|
||||
};
|
||||
await writeEntryFile(SANDBOX_BROWSERS_DIR, merged);
|
||||
await withEntryLock(SANDBOX_BROWSERS_DIR, entry.containerName, async () => {
|
||||
const existing = await readShardedEntry<SandboxBrowserRegistryEntry>(
|
||||
SANDBOX_BROWSERS_DIR,
|
||||
entry.containerName,
|
||||
);
|
||||
await writeShardedEntry(SANDBOX_BROWSERS_DIR, {
|
||||
...entry,
|
||||
createdAtMs: existing?.createdAtMs ?? entry.createdAtMs,
|
||||
image: existing?.image ?? entry.image,
|
||||
configHash: entry.configHash ?? existing?.configHash,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export async function removeBrowserRegistryEntry(containerName: string): Promise<void> {
|
||||
await removeEntryFile(SANDBOX_BROWSERS_DIR, containerName);
|
||||
export async function removeBrowserRegistryEntry(containerName: string) {
|
||||
await migrateMonolithicIfNeeded(SANDBOX_BROWSER_REGISTRY_PATH, SANDBOX_BROWSERS_DIR);
|
||||
await withEntryLock(SANDBOX_BROWSERS_DIR, containerName, async () => {
|
||||
await removeShardedEntry(SANDBOX_BROWSERS_DIR, containerName);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user