mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-16 20:40:45 +00:00
* fix(gateway): correct launchctl command sequence for gateway restart (closes #20030) * fix(restart): expand HOME and escape label in launchctl plist path * fix(restart): poll port free after SIGKILL to prevent EADDRINUSE restart loop When cleanStaleGatewayProcessesSync() kills a stale gateway process, the kernel may not immediately release the TCP port. Previously the function returned after a fixed 500ms sleep (300ms SIGTERM + 200ms SIGKILL), allowing triggerOpenClawRestart() to hand off to systemd before the port was actually free. The new systemd process then raced the dying socket for port 18789, hit EADDRINUSE, and exited with status 1, causing systemd to retry indefinitely — the zombie restart loop reported in #33103. Fix: add waitForPortFreeSync() that polls lsof at 50ms intervals for up to 2 seconds after SIGKILL. cleanStaleGatewayProcessesSync() now blocks until the port is confirmed free (or the budget expires with a warning) before returning. The increased SIGTERM/SIGKILL wait budgets (600ms / 400ms) also give slow processes more time to exit cleanly. Fixes #33103 Related: #28134 * fix: add EADDRINUSE retry and TIME_WAIT port-bind checks for gateway startup * fix(ports): treat EADDRNOTAVAIL as non-retryable and fix flaky test * fix(gateway): hot-reload agents.defaults.models allowlist changes The reload plan had a rule for `agents.defaults.model` (singular) but not `agents.defaults.models` (plural — the allowlist array). Because `agents.defaults.models` does not prefix-match `agents.defaults.model.`, it fell through to the catch-all `agents` tail rule (kind=none), so allowlist edits in openclaw.json were silently ignored at runtime. Add a dedicated reload rule so changes to the models allowlist trigger a heartbeat restart, which re-reads the config and serves the updated list to clients. Fixes #33600 Co-authored-by: HCL <chenglunhu@gmail.com> Signed-off-by: HCL <chenglunhu@gmail.com> * test(restart): 100% branch coverage — audit round 2 Audit findings fixed: - remove dead guard: terminateStaleProcessesSync pids.length===0 check was unreachable (only caller cleanStaleGatewayProcessesSync already guards) - expose __testing.callSleepSyncRaw so sleepSync's real Atomics.wait path can be unit-tested directly without going through the override - fix broken sleepSync Atomics.wait test: previous test set override=null but cleanStaleGatewayProcessesSync returned before calling sleepSync — replaced with direct callSleepSyncRaw calls that actually exercise L36/L42-47 - fix pid collision: two tests used process.pid+304 (EPERM + dead-at-SIGTERM); EPERM test changed to process.pid+305 - fix misindented tests: 'deduplicates pids' and 'lsof status 1 container edge case' were outside their intended describe blocks; moved to correct scopes (findGatewayPidsOnPortSync and pollPortOnce respectively) - add missing branch tests: - status 1 + non-empty stdout with zero openclaw pids → free:true (L145) - mid-loop non-openclaw cmd in &&-chain (L67) - consecutive p-lines without c-line between them (L67) - invalid PID in p-line (p0 / pNaN) — ternary false branch (L67) - unknown lsof output line (else-if false branch L69) Coverage: 100% stmts / 100% branch / 100% funcs / 100% lines (36 tests) * test(restart): fix stale-pid test typing for tsgo * fix(gateway): address lifecycle review findings * test(update): make restart-helper path assertions windows-safe --------- Signed-off-by: HCL <chenglunhu@gmail.com> Co-authored-by: Glucksberg <markuscontasul@gmail.com> Co-authored-by: Efe Büken <efe@arven.digital> Co-authored-by: Riccardo Marino <rmarino@apple.com> Co-authored-by: HCL <chenglunhu@gmail.com>
388 lines
12 KiB
TypeScript
388 lines
12 KiB
TypeScript
import { execFileSync } from "node:child_process";
|
|
import { createServer } from "node:net";
|
|
import { resolveLsofCommandSync } from "../infra/ports-lsof.js";
|
|
import { tryListenOnPort } from "../infra/ports-probe.js";
|
|
import { sleep } from "../utils.js";
|
|
|
|
export type PortProcess = { pid: number; command?: string };
|
|
|
|
export type ForceFreePortResult = {
|
|
killed: PortProcess[];
|
|
waitedMs: number;
|
|
escalatedToSigkill: boolean;
|
|
};
|
|
|
|
type ExecFileError = NodeJS.ErrnoException & {
|
|
status?: number | null;
|
|
stderr?: string | Buffer;
|
|
stdout?: string | Buffer;
|
|
cause?: unknown;
|
|
};
|
|
|
|
const FUSER_SIGNALS: Record<"SIGTERM" | "SIGKILL", string> = {
|
|
SIGTERM: "TERM",
|
|
SIGKILL: "KILL",
|
|
};
|
|
|
|
function readExecOutput(value: string | Buffer | undefined): string {
|
|
if (typeof value === "string") {
|
|
return value;
|
|
}
|
|
if (value instanceof Buffer) {
|
|
return value.toString("utf8");
|
|
}
|
|
return "";
|
|
}
|
|
|
|
function withErrnoCode(message: string, code: string, cause: unknown): Error {
|
|
const out = new Error(message, { cause: cause instanceof Error ? cause : undefined }) as Error &
|
|
NodeJS.ErrnoException;
|
|
out.code = code;
|
|
return out;
|
|
}
|
|
|
|
function getErrnoCode(err: unknown): string | undefined {
|
|
if (!err || typeof err !== "object") {
|
|
return undefined;
|
|
}
|
|
const direct = (err as { code?: unknown }).code;
|
|
if (typeof direct === "string" && direct.length > 0) {
|
|
return direct;
|
|
}
|
|
const cause = (err as { cause?: unknown }).cause;
|
|
if (cause && typeof cause === "object") {
|
|
const nested = (cause as { code?: unknown }).code;
|
|
if (typeof nested === "string" && nested.length > 0) {
|
|
return nested;
|
|
}
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
function isRecoverableLsofError(err: unknown): boolean {
|
|
const code = getErrnoCode(err);
|
|
if (code === "ENOENT" || code === "EACCES" || code === "EPERM") {
|
|
return true;
|
|
}
|
|
const message = err instanceof Error ? err.message : String(err);
|
|
return /lsof.*(permission denied|not permitted|operation not permitted|eacces|eperm)/i.test(
|
|
message,
|
|
);
|
|
}
|
|
|
|
function parseFuserPidList(output: string): number[] {
|
|
if (!output) {
|
|
return [];
|
|
}
|
|
const values = new Set<number>();
|
|
for (const rawLine of output.split(/\r?\n/)) {
|
|
const line = rawLine.trim();
|
|
if (!line) {
|
|
continue;
|
|
}
|
|
const pidRegion = line.includes(":") ? line.slice(line.indexOf(":") + 1) : line;
|
|
const pidMatches = pidRegion.match(/\d+/g) ?? [];
|
|
for (const match of pidMatches) {
|
|
const pid = Number.parseInt(match, 10);
|
|
if (Number.isFinite(pid) && pid > 0) {
|
|
values.add(pid);
|
|
}
|
|
}
|
|
}
|
|
return [...values];
|
|
}
|
|
|
|
function killPortWithFuser(port: number, signal: "SIGTERM" | "SIGKILL"): PortProcess[] {
|
|
const args = ["-k", `-${FUSER_SIGNALS[signal]}`, `${port}/tcp`];
|
|
try {
|
|
const stdout = execFileSync("fuser", args, {
|
|
encoding: "utf-8",
|
|
stdio: ["ignore", "pipe", "pipe"],
|
|
});
|
|
return parseFuserPidList(stdout).map((pid) => ({ pid }));
|
|
} catch (err: unknown) {
|
|
const execErr = err as ExecFileError;
|
|
const code = execErr.code;
|
|
const status = execErr.status;
|
|
const stdout = readExecOutput(execErr.stdout);
|
|
const stderr = readExecOutput(execErr.stderr);
|
|
const parsed = parseFuserPidList([stdout, stderr].filter(Boolean).join("\n"));
|
|
if (status === 1) {
|
|
// fuser exits 1 if nothing matched; keep any parsed PIDs in case signal succeeded.
|
|
return parsed.map((pid) => ({ pid }));
|
|
}
|
|
if (code === "ENOENT") {
|
|
throw withErrnoCode(
|
|
"fuser not found; required for --force when lsof is unavailable",
|
|
"ENOENT",
|
|
err,
|
|
);
|
|
}
|
|
if (code === "EACCES" || code === "EPERM") {
|
|
throw withErrnoCode("fuser permission denied while forcing gateway port", code, err);
|
|
}
|
|
throw err instanceof Error ? err : new Error(String(err));
|
|
}
|
|
}
|
|
|
|
async function isPortBusy(port: number): Promise<boolean> {
|
|
try {
|
|
await tryListenOnPort({ port, exclusive: true });
|
|
return false;
|
|
} catch (err: unknown) {
|
|
const code = (err as NodeJS.ErrnoException).code;
|
|
if (code === "EADDRINUSE") {
|
|
return true;
|
|
}
|
|
throw err instanceof Error ? err : new Error(String(err));
|
|
}
|
|
}
|
|
|
|
export function parseLsofOutput(output: string): PortProcess[] {
|
|
const lines = output.split(/\r?\n/).filter(Boolean);
|
|
const results: PortProcess[] = [];
|
|
let current: Partial<PortProcess> = {};
|
|
for (const line of lines) {
|
|
if (line.startsWith("p")) {
|
|
if (current.pid) {
|
|
results.push(current as PortProcess);
|
|
}
|
|
current = { pid: Number.parseInt(line.slice(1), 10) };
|
|
} else if (line.startsWith("c")) {
|
|
current.command = line.slice(1);
|
|
}
|
|
}
|
|
if (current.pid) {
|
|
results.push(current as PortProcess);
|
|
}
|
|
return results;
|
|
}
|
|
|
|
export function listPortListeners(port: number): PortProcess[] {
|
|
if (process.platform === "win32") {
|
|
try {
|
|
const out = execFileSync("netstat", ["-ano", "-p", "TCP"], { encoding: "utf-8" });
|
|
const lines = out.split(/\r?\n/).filter(Boolean);
|
|
const results: PortProcess[] = [];
|
|
for (const line of lines) {
|
|
const parts = line.trim().split(/\s+/);
|
|
if (parts.length >= 5 && parts[3] === "LISTENING") {
|
|
const localAddress = parts[1];
|
|
const addressPort = localAddress.split(":").pop();
|
|
if (addressPort === String(port)) {
|
|
const pid = Number.parseInt(parts[4], 10);
|
|
if (!Number.isNaN(pid) && pid > 0) {
|
|
if (!results.some((p) => p.pid === pid)) {
|
|
results.push({ pid });
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return results;
|
|
} catch (err: unknown) {
|
|
throw new Error(`netstat failed: ${String(err)}`, { cause: err });
|
|
}
|
|
}
|
|
|
|
try {
|
|
const lsof = resolveLsofCommandSync();
|
|
const out = execFileSync(lsof, ["-nP", `-iTCP:${port}`, "-sTCP:LISTEN", "-FpFc"], {
|
|
encoding: "utf-8",
|
|
});
|
|
return parseLsofOutput(out);
|
|
} catch (err: unknown) {
|
|
const execErr = err as ExecFileError;
|
|
const status = execErr.status ?? undefined;
|
|
const code = execErr.code;
|
|
if (code === "ENOENT") {
|
|
throw withErrnoCode("lsof not found; required for --force", "ENOENT", err);
|
|
}
|
|
if (code === "EACCES" || code === "EPERM") {
|
|
throw withErrnoCode("lsof permission denied while inspecting gateway port", code, err);
|
|
}
|
|
if (status === 1) {
|
|
const stderr = readExecOutput(execErr.stderr).trim();
|
|
if (
|
|
stderr &&
|
|
/permission denied|not permitted|operation not permitted|can't stat/i.test(stderr)
|
|
) {
|
|
throw withErrnoCode(
|
|
`lsof permission denied while inspecting gateway port: ${stderr}`,
|
|
"EACCES",
|
|
err,
|
|
);
|
|
}
|
|
return [];
|
|
} // no listeners
|
|
throw err instanceof Error ? err : new Error(String(err));
|
|
}
|
|
}
|
|
|
|
export function forceFreePort(port: number): PortProcess[] {
|
|
const listeners = listPortListeners(port);
|
|
for (const proc of listeners) {
|
|
try {
|
|
process.kill(proc.pid, "SIGTERM");
|
|
} catch (err) {
|
|
throw new Error(
|
|
`failed to kill pid ${proc.pid}${proc.command ? ` (${proc.command})` : ""}: ${String(err)}`,
|
|
{ cause: err },
|
|
);
|
|
}
|
|
}
|
|
return listeners;
|
|
}
|
|
|
|
function killPids(listeners: PortProcess[], signal: NodeJS.Signals) {
|
|
for (const proc of listeners) {
|
|
try {
|
|
process.kill(proc.pid, signal);
|
|
} catch (err) {
|
|
throw new Error(
|
|
`failed to kill pid ${proc.pid}${proc.command ? ` (${proc.command})` : ""}: ${String(err)}`,
|
|
{ cause: err },
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
export async function forceFreePortAndWait(
|
|
port: number,
|
|
opts: {
|
|
/** Total wait budget across signals. */
|
|
timeoutMs?: number;
|
|
/** Poll interval for checking whether lsof reports listeners. */
|
|
intervalMs?: number;
|
|
/** How long to wait after SIGTERM before escalating to SIGKILL. */
|
|
sigtermTimeoutMs?: number;
|
|
} = {},
|
|
): Promise<ForceFreePortResult> {
|
|
const timeoutMs = Math.max(opts.timeoutMs ?? 1500, 0);
|
|
const intervalMs = Math.max(opts.intervalMs ?? 100, 1);
|
|
const sigtermTimeoutMs = Math.min(Math.max(opts.sigtermTimeoutMs ?? 600, 0), timeoutMs);
|
|
|
|
let killed: PortProcess[] = [];
|
|
let useFuserFallback = false;
|
|
|
|
try {
|
|
killed = forceFreePort(port);
|
|
} catch (err) {
|
|
if (!isRecoverableLsofError(err)) {
|
|
throw err;
|
|
}
|
|
useFuserFallback = true;
|
|
killed = killPortWithFuser(port, "SIGTERM");
|
|
}
|
|
|
|
const checkBusy = async (): Promise<boolean> =>
|
|
useFuserFallback ? isPortBusy(port) : listPortListeners(port).length > 0;
|
|
|
|
if (!(await checkBusy())) {
|
|
return { killed, waitedMs: 0, escalatedToSigkill: false };
|
|
}
|
|
|
|
let waitedMs = 0;
|
|
const triesSigterm = intervalMs > 0 ? Math.ceil(sigtermTimeoutMs / intervalMs) : 0;
|
|
for (let i = 0; i < triesSigterm; i++) {
|
|
if (!(await checkBusy())) {
|
|
return { killed, waitedMs, escalatedToSigkill: false };
|
|
}
|
|
await sleep(intervalMs);
|
|
waitedMs += intervalMs;
|
|
}
|
|
|
|
if (!(await checkBusy())) {
|
|
return { killed, waitedMs, escalatedToSigkill: false };
|
|
}
|
|
|
|
if (useFuserFallback) {
|
|
killPortWithFuser(port, "SIGKILL");
|
|
} else {
|
|
const remaining = listPortListeners(port);
|
|
killPids(remaining, "SIGKILL");
|
|
}
|
|
|
|
const remainingBudget = Math.max(timeoutMs - waitedMs, 0);
|
|
const triesSigkill = intervalMs > 0 ? Math.ceil(remainingBudget / intervalMs) : 0;
|
|
for (let i = 0; i < triesSigkill; i++) {
|
|
if (!(await checkBusy())) {
|
|
return { killed, waitedMs, escalatedToSigkill: true };
|
|
}
|
|
await sleep(intervalMs);
|
|
waitedMs += intervalMs;
|
|
}
|
|
|
|
if (!(await checkBusy())) {
|
|
return { killed, waitedMs, escalatedToSigkill: true };
|
|
}
|
|
|
|
if (useFuserFallback) {
|
|
throw new Error(`port ${port} still has listeners after --force (fuser fallback)`);
|
|
}
|
|
const still = listPortListeners(port);
|
|
throw new Error(
|
|
`port ${port} still has listeners after --force: ${still.map((p) => p.pid).join(", ")}`,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Attempt a real TCP bind to verify the port is available at the OS level.
|
|
* Catches TIME_WAIT / kernel-level holds that lsof won't show.
|
|
*
|
|
* Resolves false only for EADDRINUSE — a genuinely transient condition
|
|
* (port still in TIME_WAIT after a --force kill) that the caller should retry.
|
|
*
|
|
* All other errors are non-retryable and are rejected immediately:
|
|
* - EADDRNOTAVAIL: the host address doesn't exist on any local interface
|
|
* (hard misconfiguration, not a transient kernel hold).
|
|
* - EACCES: bind to a privileged port as non-root.
|
|
* - EINVAL, etc.: other unrecoverable OS errors.
|
|
*/
|
|
export function probePortFree(port: number, host = "0.0.0.0"): Promise<boolean> {
|
|
return new Promise((resolve, reject) => {
|
|
const srv = createServer();
|
|
srv.unref();
|
|
srv.once("error", (err: NodeJS.ErrnoException) => {
|
|
srv.close();
|
|
if (err.code === "EADDRINUSE") {
|
|
// Genuinely transient — port still in use or TIME_WAIT after a --force kill.
|
|
resolve(false);
|
|
} else {
|
|
// Non-retryable: EADDRNOTAVAIL (bad host address), EACCES (privileged port),
|
|
// EINVAL, and any other OS errors. Surface immediately; no retry loop.
|
|
reject(err);
|
|
}
|
|
});
|
|
srv.listen(port, host, () => {
|
|
srv.close(() => resolve(true));
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Poll until a real test-bind succeeds, up to `timeoutMs`.
|
|
* Returns the number of ms waited, or throws if the port never freed.
|
|
*/
|
|
export async function waitForPortBindable(
|
|
port: number,
|
|
opts: { timeoutMs?: number; intervalMs?: number; host?: string } = {},
|
|
): Promise<number> {
|
|
const timeoutMs = Math.max(opts.timeoutMs ?? 3000, 0);
|
|
const intervalMs = Math.max(opts.intervalMs ?? 150, 1);
|
|
const host = opts.host;
|
|
let waited = 0;
|
|
while (waited < timeoutMs) {
|
|
if (await probePortFree(port, host)) {
|
|
return waited;
|
|
}
|
|
await sleep(intervalMs);
|
|
waited += intervalMs;
|
|
}
|
|
// Final attempt
|
|
if (await probePortFree(port, host)) {
|
|
return waited;
|
|
}
|
|
throw new Error(`port ${port} still not bindable after ${waited}ms (TIME_WAIT or kernel hold)`);
|
|
}
|