fix: prevent duplicate gateway watchers

This commit is contained in:
Peter Steinberger
2026-04-05 23:22:34 +01:00
parent e91405ebf9
commit cef64f0b5a
4 changed files with 260 additions and 11 deletions

View File

@@ -1,5 +1,7 @@
#!/usr/bin/env node
import { spawn } from "node:child_process";
import { createHash } from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import process from "node:process";
import { pathToFileURL } from "node:url";
@@ -11,6 +13,9 @@ const WATCH_RESTART_SIGNAL = "SIGTERM";
const WATCH_RESTARTABLE_CHILD_EXIT_CODES = new Set([143]);
const WATCH_RESTARTABLE_CHILD_SIGNALS = new Set(["SIGTERM"]);
const WATCH_IGNORED_PATH_SEGMENTS = new Set([".git", "dist", "node_modules"]);
const WATCH_LOCK_WAIT_MS = 5_000;
const WATCH_LOCK_POLL_MS = 100;
const WATCH_LOCK_DIR = path.join(".local", "watch-node");
const buildRunnerArgs = (args) => [WATCH_NODE_RUNNER, ...args];
@@ -65,6 +70,129 @@ const shouldRestartAfterChildExit = (exitCode, exitSignal) =>
(typeof exitCode === "number" && WATCH_RESTARTABLE_CHILD_EXIT_CODES.has(exitCode)) ||
(typeof exitSignal === "string" && WATCH_RESTARTABLE_CHILD_SIGNALS.has(exitSignal));
const isProcessAlive = (pid, signalProcess) => {
if (!Number.isInteger(pid) || pid <= 0) {
return false;
}
try {
signalProcess(pid, 0);
} catch {
return false;
}
return true;
};
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
const createWatchLockKey = (cwd, args) =>
createHash("sha256").update(cwd).update("\0").update(args.join("\0")).digest("hex").slice(0, 12);
export const resolveWatchLockPath = (cwd, args = []) =>
path.join(cwd, WATCH_LOCK_DIR, `${createWatchLockKey(cwd, args)}.json`);
const readWatchLock = (lockPath) => {
try {
return JSON.parse(fs.readFileSync(lockPath, "utf8"));
} catch {
return null;
}
};
const removeWatchLock = (lockPath) => {
try {
fs.unlinkSync(lockPath);
} catch (error) {
if (error?.code !== "ENOENT") {
throw error;
}
}
};
const writeWatchLock = (lockPath, payload) => {
fs.mkdirSync(path.dirname(lockPath), { recursive: true });
fs.writeFileSync(lockPath, `${JSON.stringify(payload)}\n`, {
encoding: "utf8",
flag: "wx",
});
};
const logWatcher = (message, deps) => {
deps.process.stderr?.write?.(`[openclaw] ${message}\n`);
};
const waitForWatcherRelease = async (lockPath, pid, deps) => {
const deadline = deps.now() + WATCH_LOCK_WAIT_MS;
while (deps.now() < deadline) {
if (!isProcessAlive(pid, deps.signalProcess)) {
return true;
}
if (!fs.existsSync(lockPath)) {
return true;
}
await deps.sleep(WATCH_LOCK_POLL_MS);
}
return !isProcessAlive(pid, deps.signalProcess);
};
const acquireWatchLock = async (deps, watchSession) => {
const lockPath = resolveWatchLockPath(deps.cwd, deps.args);
const payload = {
pid: deps.process.pid,
command: deps.args.join(" "),
createdAt: new Date(deps.now()).toISOString(),
cwd: deps.cwd,
watchSession,
};
while (true) {
try {
writeWatchLock(lockPath, payload);
return { lockPath, pid: deps.process.pid };
} catch (error) {
if (error?.code !== "EEXIST") {
throw error;
}
}
const existing = readWatchLock(lockPath);
const existingPid = existing?.pid;
if (!isProcessAlive(existingPid, deps.signalProcess)) {
removeWatchLock(lockPath);
continue;
}
logWatcher(`Replacing existing watcher pid ${existingPid}.`, deps);
try {
deps.signalProcess(existingPid, WATCH_RESTART_SIGNAL);
} catch (error) {
if (isProcessAlive(existingPid, deps.signalProcess)) {
logWatcher(
`Failed to stop existing watcher pid ${existingPid}: ${error?.message ?? "unknown error"}`,
deps,
);
return null;
}
}
const released = await waitForWatcherRelease(lockPath, existingPid, deps);
if (!released) {
logWatcher(`Timed out waiting for watcher pid ${existingPid} to exit.`, deps);
return null;
}
removeWatchLock(lockPath);
}
};
const releaseWatchLock = (lockHandle) => {
if (!lockHandle) {
return;
}
const current = readWatchLock(lockHandle.lockPath);
if (current?.pid === lockHandle.pid) {
removeWatchLock(lockHandle.lockPath);
}
};
/**
* @param {{
* spawn?: typeof spawn;
@@ -73,6 +201,9 @@ const shouldRestartAfterChildExit = (exitCode, exitSignal) =>
* args?: string[];
* env?: NodeJS.ProcessEnv;
* now?: () => number;
* sleep?: (ms: number) => Promise<void>;
* signalProcess?: (pid: number, signal: string | number) => void;
* lockDisabled?: boolean;
* createWatcher?: (
* watchPaths: string[],
* options: { ignoreInitial: boolean; ignored: (watchPath: string) => boolean },
@@ -88,6 +219,9 @@ export async function runWatchMain(params = {}) {
args: params.args ?? process.argv.slice(2),
env: params.env ? { ...params.env } : { ...process.env },
now: params.now ?? Date.now,
sleep: params.sleep ?? sleep,
signalProcess: params.signalProcess ?? ((pid, signal) => process.kill(pid, signal)),
lockDisabled: params.lockDisabled === true,
createWatcher:
params.createWatcher ?? ((watchPaths, options) => chokidar.watch(watchPaths, options)),
watchPaths: params.watchPaths ?? runNodeWatchedPaths,
@@ -109,6 +243,7 @@ export async function runWatchMain(params = {}) {
let shuttingDown = false;
let restartRequested = false;
let watchProcess = null;
let lockHandle = null;
let onSigInt;
let onSigTerm;
@@ -129,6 +264,7 @@ export async function runWatchMain(params = {}) {
if (onSigTerm) {
deps.process.off("SIGTERM", onSigTerm);
}
releaseWatchLock(lockHandle);
watcher.close?.().catch?.(() => {});
resolve(code);
};
@@ -139,6 +275,11 @@ export async function runWatchMain(params = {}) {
env: childEnv,
stdio: "inherit",
});
watchProcess.on("error", (error) => {
watchProcess = null;
logWatcher(`Failed to spawn watcher child: ${error?.message ?? "unknown error"}`, deps);
settle(1);
});
watchProcess.on("exit", (exitCode, exitSignal) => {
watchProcess = null;
if (shuttingDown) {
@@ -178,8 +319,6 @@ export async function runWatchMain(params = {}) {
settle(1);
});
startRunner();
onSigInt = () => {
shuttingDown = true;
if (watchProcess && typeof watchProcess.kill === "function") {
@@ -197,6 +336,26 @@ export async function runWatchMain(params = {}) {
deps.process.on("SIGINT", onSigInt);
deps.process.on("SIGTERM", onSigTerm);
if (deps.lockDisabled) {
lockHandle = { lockPath: "", pid: deps.process.pid };
startRunner();
return;
}
void acquireWatchLock(deps, watchSession)
.then((handle) => {
if (!handle) {
settle(1);
return;
}
lockHandle = handle;
startRunner();
})
.catch((error) => {
logWatcher(`Failed to acquire watcher lock: ${error?.message ?? "unknown error"}`, deps);
settle(1);
});
});
}