From 2b98cb6d8bfc20799813fd3014871ac1cfc77cb0 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 4 Mar 2026 10:52:33 -0800 Subject: [PATCH] Fix gateway restart false timeouts on Debian/systemd (#34874) * daemon(systemd): target sudo caller user scope * test(systemd): cover sudo user scope commands * infra(ports): fall back to ss when lsof missing * test(ports): verify ss fallback listener detection * cli(gateway): use probe fallback for restart health * test(gateway): cover restart-health probe fallback --- src/cli/daemon-cli/restart-health.test.ts | 59 +++++++++++ src/cli/daemon-cli/restart-health.ts | 35 ++++++- src/daemon/systemd.test.ts | 25 +++++ src/daemon/systemd.ts | 64 +++++++----- src/infra/ports-inspect.ts | 119 +++++++++++++++++----- src/infra/ports.test.ts | 58 +++++++++++ 6 files changed, 311 insertions(+), 49 deletions(-) diff --git a/src/cli/daemon-cli/restart-health.test.ts b/src/cli/daemon-cli/restart-health.test.ts index 67fb5c0dd4f..6e5d42cf19d 100644 --- a/src/cli/daemon-cli/restart-health.test.ts +++ b/src/cli/daemon-cli/restart-health.test.ts @@ -6,6 +6,7 @@ const inspectPortUsage = vi.hoisted(() => vi.fn<(port: number) => Promise vi.fn<(_listener: unknown, _port: number) => PortListenerKind>(() => "gateway"), ); +const probeGateway = vi.hoisted(() => vi.fn()); vi.mock("../../infra/ports.js", () => ({ classifyPortListener: (listener: unknown, port: number) => classifyPortListener(listener, port), @@ -13,6 +14,10 @@ vi.mock("../../infra/ports.js", () => ({ inspectPortUsage: (port: number) => inspectPortUsage(port), })); +vi.mock("../../gateway/probe.js", () => ({ + probeGateway: (opts: unknown) => probeGateway(opts), +})); + const originalPlatform = process.platform; async function inspectUnknownListenerFallback(params: { @@ -52,6 +57,11 @@ describe("inspectGatewayRestart", () => { }); classifyPortListener.mockReset(); classifyPortListener.mockReturnValue("gateway"); + probeGateway.mockReset(); + probeGateway.mockResolvedValue({ + ok: false, + close: null, + }); }); afterEach(() => { @@ -147,4 +157,53 @@ describe("inspectGatewayRestart", () => { expect(snapshot.staleGatewayPids).toEqual([]); }); + + it("uses a local gateway probe when ownership is ambiguous", async () => { + const service = { + readRuntime: vi.fn(async () => ({ status: "running", pid: 8000 })), + } as unknown as GatewayService; + + inspectPortUsage.mockResolvedValue({ + port: 18789, + status: "busy", + listeners: [{ commandLine: "" }], + hints: [], + }); + classifyPortListener.mockReturnValue("unknown"); + probeGateway.mockResolvedValue({ + ok: true, + close: null, + }); + + const { inspectGatewayRestart } = await import("./restart-health.js"); + const snapshot = await inspectGatewayRestart({ service, port: 18789 }); + + expect(snapshot.healthy).toBe(true); + expect(probeGateway).toHaveBeenCalledWith( + expect.objectContaining({ url: "ws://127.0.0.1:18789" }), + ); + }); + + it("treats auth-closed probe as healthy gateway reachability", async () => { + const service = { + readRuntime: vi.fn(async () => ({ status: "running", pid: 8000 })), + } as unknown as GatewayService; + + inspectPortUsage.mockResolvedValue({ + port: 18789, + status: "busy", + listeners: [{ commandLine: "" }], + hints: [], + }); + classifyPortListener.mockReturnValue("unknown"); + probeGateway.mockResolvedValue({ + ok: false, + close: { code: 1008, reason: "auth required" }, + }); + + const { inspectGatewayRestart } = await import("./restart-health.js"); + const snapshot = await inspectGatewayRestart({ service, port: 18789 }); + + expect(snapshot.healthy).toBe(true); + }); }); diff --git a/src/cli/daemon-cli/restart-health.ts b/src/cli/daemon-cli/restart-health.ts index b6d463a952c..daa83898882 100644 --- a/src/cli/daemon-cli/restart-health.ts +++ b/src/cli/daemon-cli/restart-health.ts @@ -1,5 +1,6 @@ import type { GatewayServiceRuntime } from "../../daemon/service-runtime.js"; import type { GatewayService } from "../../daemon/service.js"; +import { probeGateway } from "../../gateway/probe.js"; import { classifyPortListener, formatPortDiagnostics, @@ -29,6 +30,31 @@ function listenerOwnedByRuntimePid(params: { return params.listener.pid === params.runtimePid || params.listener.ppid === params.runtimePid; } +function looksLikeAuthClose(code: number | undefined, reason: string | undefined): boolean { + if (code !== 1008) { + return false; + } + const normalized = (reason ?? "").toLowerCase(); + return ( + normalized.includes("auth") || + normalized.includes("token") || + normalized.includes("password") || + normalized.includes("scope") || + normalized.includes("role") + ); +} + +async function confirmGatewayReachable(port: number): Promise { + const token = process.env.OPENCLAW_GATEWAY_TOKEN?.trim() || undefined; + const password = process.env.OPENCLAW_GATEWAY_PASSWORD?.trim() || undefined; + const probe = await probeGateway({ + url: `ws://127.0.0.1:${port}`, + auth: token || password ? { token, password } : undefined, + timeoutMs: 1_000, + }); + return probe.ok || looksLikeAuthClose(probe.close?.code, probe.close?.reason); +} + export async function inspectGatewayRestart(params: { service: GatewayService; port: number; @@ -79,7 +105,14 @@ export async function inspectGatewayRestart(params: { ? portUsage.listeners.some((listener) => listenerOwnedByRuntimePid({ listener, runtimePid })) : gatewayListeners.length > 0 || (portUsage.status === "busy" && portUsage.listeners.length === 0); - const healthy = running && ownsPort; + let healthy = running && ownsPort; + if (!healthy && running && portUsage.status === "busy") { + try { + healthy = await confirmGatewayReachable(params.port); + } catch { + // best-effort probe + } + } const staleGatewayPids = Array.from( new Set([ ...gatewayListeners diff --git a/src/daemon/systemd.test.ts b/src/daemon/systemd.test.ts index e5cf1603674..ec1b3b78da2 100644 --- a/src/daemon/systemd.test.ts +++ b/src/daemon/systemd.test.ts @@ -267,4 +267,29 @@ describe("systemd service control", () => { }), ).rejects.toThrow("systemctl stop failed: permission denied"); }); + + it("targets the sudo caller's user scope when SUDO_USER is set", async () => { + execFileMock + .mockImplementationOnce((_cmd, args, _opts, cb) => { + expect(args).toEqual(["--machine", "debian@", "--user", "status"]); + cb(null, "", ""); + }) + .mockImplementationOnce((_cmd, args, _opts, cb) => { + expect(args).toEqual([ + "--machine", + "debian@", + "--user", + "restart", + "openclaw-gateway.service", + ]); + cb(null, "", ""); + }); + const write = vi.fn(); + const stdout = { write } as unknown as NodeJS.WritableStream; + + await restartSystemdService({ stdout, env: { SUDO_USER: "debian" } }); + + expect(write).toHaveBeenCalledTimes(1); + expect(String(write.mock.calls[0]?.[0])).toContain("Restarted systemd service"); + }); }); diff --git a/src/daemon/systemd.ts b/src/daemon/systemd.ts index ec80ea1bc7e..55657561da4 100644 --- a/src/daemon/systemd.ts +++ b/src/daemon/systemd.ts @@ -178,8 +178,25 @@ function isSystemdUnitNotEnabled(detail: string): boolean { ); } -export async function isSystemdUserServiceAvailable(): Promise { - const res = await execSystemctl(["--user", "status"]); +function resolveSystemctlUserScopeArgs(env: GatewayServiceEnv): string[] { + const sudoUser = env.SUDO_USER?.trim(); + if (sudoUser && sudoUser !== "root") { + return ["--machine", `${sudoUser}@`, "--user"]; + } + return ["--user"]; +} + +async function execSystemctlUser( + env: GatewayServiceEnv, + args: string[], +): Promise<{ stdout: string; stderr: string; code: number }> { + return await execSystemctl([...resolveSystemctlUserScopeArgs(env), ...args]); +} + +export async function isSystemdUserServiceAvailable( + env: GatewayServiceEnv = process.env as GatewayServiceEnv, +): Promise { + const res = await execSystemctlUser(env, ["status"]); if (res.code === 0) { return true; } @@ -205,8 +222,8 @@ export async function isSystemdUserServiceAvailable(): Promise { return false; } -async function assertSystemdAvailable() { - const res = await execSystemctl(["--user", "status"]); +async function assertSystemdAvailable(env: GatewayServiceEnv = process.env as GatewayServiceEnv) { + const res = await execSystemctlUser(env, ["status"]); if (res.code === 0) { return; } @@ -225,7 +242,7 @@ export async function installSystemdService({ environment, description, }: GatewayServiceInstallArgs): Promise<{ unitPath: string }> { - await assertSystemdAvailable(); + await assertSystemdAvailable(env); const unitPath = resolveSystemdUnitPath(env); await fs.mkdir(path.dirname(unitPath), { recursive: true }); @@ -252,17 +269,17 @@ export async function installSystemdService({ const serviceName = resolveGatewaySystemdServiceName(env.OPENCLAW_PROFILE); const unitName = `${serviceName}.service`; - const reload = await execSystemctl(["--user", "daemon-reload"]); + const reload = await execSystemctlUser(env, ["daemon-reload"]); if (reload.code !== 0) { throw new Error(`systemctl daemon-reload failed: ${reload.stderr || reload.stdout}`.trim()); } - const enable = await execSystemctl(["--user", "enable", unitName]); + const enable = await execSystemctlUser(env, ["enable", unitName]); if (enable.code !== 0) { throw new Error(`systemctl enable failed: ${enable.stderr || enable.stdout}`.trim()); } - const restart = await execSystemctl(["--user", "restart", unitName]); + const restart = await execSystemctlUser(env, ["restart", unitName]); if (restart.code !== 0) { throw new Error(`systemctl restart failed: ${restart.stderr || restart.stdout}`.trim()); } @@ -293,10 +310,10 @@ export async function uninstallSystemdService({ env, stdout, }: GatewayServiceManageArgs): Promise { - await assertSystemdAvailable(); + await assertSystemdAvailable(env); const serviceName = resolveGatewaySystemdServiceName(env.OPENCLAW_PROFILE); const unitName = `${serviceName}.service`; - await execSystemctl(["--user", "disable", "--now", unitName]); + await execSystemctlUser(env, ["disable", "--now", unitName]); const unitPath = resolveSystemdUnitPath(env); try { @@ -313,10 +330,11 @@ async function runSystemdServiceAction(params: { action: "stop" | "restart"; label: string; }) { - await assertSystemdAvailable(); - const serviceName = resolveSystemdServiceName(params.env ?? {}); + const env = params.env ?? process.env; + await assertSystemdAvailable(env); + const serviceName = resolveSystemdServiceName(env); const unitName = `${serviceName}.service`; - const res = await execSystemctl(["--user", params.action, unitName]); + const res = await execSystemctlUser(env, [params.action, unitName]); if (res.code !== 0) { throw new Error(`systemctl ${params.action} failed: ${res.stderr || res.stdout}`.trim()); } @@ -348,9 +366,10 @@ export async function restartSystemdService({ } export async function isSystemdServiceEnabled(args: GatewayServiceEnvArgs): Promise { + const env = args.env ?? process.env; const serviceName = resolveSystemdServiceName(args.env ?? {}); const unitName = `${serviceName}.service`; - const res = await execSystemctl(["--user", "is-enabled", unitName]); + const res = await execSystemctlUser(env, ["is-enabled", unitName]); if (res.code === 0) { return true; } @@ -365,7 +384,7 @@ export async function readSystemdServiceRuntime( env: GatewayServiceEnv = process.env as GatewayServiceEnv, ): Promise { try { - await assertSystemdAvailable(); + await assertSystemdAvailable(env); } catch (err) { return { status: "unknown", @@ -374,8 +393,7 @@ export async function readSystemdServiceRuntime( } const serviceName = resolveSystemdServiceName(env); const unitName = `${serviceName}.service`; - const res = await execSystemctl([ - "--user", + const res = await execSystemctlUser(env, [ "show", unitName, "--no-page", @@ -410,8 +428,8 @@ export type LegacySystemdUnit = { exists: boolean; }; -async function isSystemctlAvailable(): Promise { - const res = await execSystemctl(["--user", "status"]); +async function isSystemctlAvailable(env: GatewayServiceEnv): Promise { + const res = await execSystemctlUser(env, ["status"]); if (res.code === 0) { return true; } @@ -420,7 +438,7 @@ async function isSystemctlAvailable(): Promise { export async function findLegacySystemdUnits(env: GatewayServiceEnv): Promise { const results: LegacySystemdUnit[] = []; - const systemctlAvailable = await isSystemctlAvailable(); + const systemctlAvailable = await isSystemctlAvailable(env); for (const name of LEGACY_GATEWAY_SYSTEMD_SERVICE_NAMES) { const unitPath = resolveSystemdUnitPathForName(env, name); let exists = false; @@ -432,7 +450,7 @@ export async function findLegacySystemdUnits(env: GatewayServiceEnv): Promise { + await Promise.all( + listeners.map(async (listener) => { + if (!listener.pid) { + return; + } + const [commandLine, user, parentPid] = await Promise.all([ + resolveUnixCommandLine(listener.pid), + resolveUnixUser(listener.pid), + resolveUnixParentPid(listener.pid), + ]); + if (commandLine) { + listener.commandLine = commandLine; + } + if (user) { + listener.user = user; + } + if (parentPid !== undefined) { + listener.ppid = parentPid; + } + }), + ); +} + async function resolveUnixCommandLine(pid: number): Promise { const res = await runCommandSafe(["ps", "-p", String(pid), "-o", "command="]); if (res.code !== 0) { @@ -85,35 +109,45 @@ async function resolveUnixParentPid(pid: number): Promise { return Number.isFinite(parentPid) && parentPid > 0 ? parentPid : undefined; } -async function readUnixListeners( +function parseSsListeners(output: string, port: number): PortListener[] { + const lines = output.split(/\r?\n/).map((line) => line.trim()); + const listeners: PortListener[] = []; + for (const line of lines) { + if (!line || !line.includes("LISTEN")) { + continue; + } + const parts = line.split(/\s+/); + const localAddress = parts.find((part) => part.includes(`:${port}`)); + if (!localAddress) { + continue; + } + const listener: PortListener = { + address: localAddress, + }; + const pidMatch = line.match(/pid=(\d+)/); + if (pidMatch) { + const pid = Number.parseInt(pidMatch[1], 10); + if (Number.isFinite(pid)) { + listener.pid = pid; + } + } + const commandMatch = line.match(/users:\(\("([^"]+)"/); + if (commandMatch?.[1]) { + listener.command = commandMatch[1]; + } + listeners.push(listener); + } + return listeners; +} + +async function readUnixListenersFromSs( port: number, ): Promise<{ listeners: PortListener[]; detail?: string; errors: string[] }> { const errors: string[] = []; - const lsof = await resolveLsofCommand(); - const res = await runCommandSafe([lsof, "-nP", `-iTCP:${port}`, "-sTCP:LISTEN", "-FpFcn"]); + const res = await runCommandSafe(["ss", "-H", "-ltnp", `sport = :${port}`]); if (res.code === 0) { - const listeners = parseLsofFieldOutput(res.stdout); - await Promise.all( - listeners.map(async (listener) => { - if (!listener.pid) { - return; - } - const [commandLine, user, parentPid] = await Promise.all([ - resolveUnixCommandLine(listener.pid), - resolveUnixUser(listener.pid), - resolveUnixParentPid(listener.pid), - ]); - if (commandLine) { - listener.commandLine = commandLine; - } - if (user) { - listener.user = user; - } - if (parentPid !== undefined) { - listener.ppid = parentPid; - } - }), - ); + const listeners = parseSsListeners(res.stdout, port); + await enrichUnixListenerProcessInfo(listeners); return { listeners, detail: res.stdout.trim() || undefined, errors }; } const stderr = res.stderr.trim(); @@ -130,6 +164,41 @@ async function readUnixListeners( return { listeners: [], detail: undefined, errors }; } +async function readUnixListeners( + port: number, +): Promise<{ listeners: PortListener[]; detail?: string; errors: string[] }> { + const lsof = await resolveLsofCommand(); + const res = await runCommandSafe([lsof, "-nP", `-iTCP:${port}`, "-sTCP:LISTEN", "-FpFcn"]); + if (res.code === 0) { + const listeners = parseLsofFieldOutput(res.stdout); + await enrichUnixListenerProcessInfo(listeners); + return { listeners, detail: res.stdout.trim() || undefined, errors: [] }; + } + const lsofErrors: string[] = []; + const stderr = res.stderr.trim(); + if (res.code === 1 && !res.error && !stderr) { + return { listeners: [], detail: undefined, errors: [] }; + } + if (res.error) { + lsofErrors.push(res.error); + } + const detail = [stderr, res.stdout.trim()].filter(Boolean).join("\n"); + if (detail) { + lsofErrors.push(detail); + } + + const ssFallback = await readUnixListenersFromSs(port); + if (ssFallback.listeners.length > 0) { + return ssFallback; + } + + return { + listeners: [], + detail: undefined, + errors: [...lsofErrors, ...ssFallback.errors], + }; +} + function parseNetstatListeners(output: string, port: number): PortListener[] { const listeners: PortListener[] = []; const portToken = `:${port}`; diff --git a/src/infra/ports.test.ts b/src/infra/ports.test.ts index c02834bbbf2..f809662f1ac 100644 --- a/src/infra/ports.test.ts +++ b/src/infra/ports.test.ts @@ -111,4 +111,62 @@ describeUnix("inspectPortUsage", () => { await new Promise((resolve) => server.close(() => resolve())); } }); + + it("falls back to ss when lsof is unavailable", async () => { + const server = net.createServer(); + await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)); + const port = (server.address() as net.AddressInfo).port; + + runCommandWithTimeoutMock.mockImplementation(async (argv: string[]) => { + const command = argv[0]; + if (typeof command !== "string") { + return { stdout: "", stderr: "", code: 1 }; + } + if (command.includes("lsof")) { + throw Object.assign(new Error("spawn lsof ENOENT"), { code: "ENOENT" }); + } + if (command === "ss") { + return { + stdout: `LISTEN 0 511 127.0.0.1:${port} 0.0.0.0:* users:(("node",pid=${process.pid},fd=23))`, + stderr: "", + code: 0, + }; + } + if (command === "ps") { + if (argv.includes("command=")) { + return { + stdout: "node /tmp/openclaw/dist/index.js gateway --port 18789\n", + stderr: "", + code: 0, + }; + } + if (argv.includes("user=")) { + return { + stdout: "debian\n", + stderr: "", + code: 0, + }; + } + if (argv.includes("ppid=")) { + return { + stdout: "1\n", + stderr: "", + code: 0, + }; + } + } + return { stdout: "", stderr: "", code: 1 }; + }); + + try { + const result = await inspectPortUsage(port); + expect(result.status).toBe("busy"); + expect(result.listeners.length).toBeGreaterThan(0); + expect(result.listeners[0]?.pid).toBe(process.pid); + expect(result.listeners[0]?.commandLine).toContain("openclaw"); + expect(result.errors).toBeUndefined(); + } finally { + await new Promise((resolve) => server.close(() => resolve())); + } + }); });