diff --git a/CHANGELOG.md b/CHANGELOG.md index 96b82c2dce0..c0f21dc40b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ Docs: https://docs.openclaw.ai - ACP/runtime: harden the opt-in Coven backend with workspace-confined launch paths, home-expanded Coven socket config, bounded socket responses, sanitized daemon output, and controlled polling failure handling. Thanks @BunsDev. +### Fixes + +- Agents/LSP: terminate bundled stdio LSP process trees during runtime disposal and Gateway shutdown, so nested children such as `tsserver` do not survive stop or restart. Fixes #72357. Thanks @ai-hpc and @bittoby. + ## 2026.4.26 ### Changes diff --git a/src/agents/pi-bundle-lsp-runtime.test.ts b/src/agents/pi-bundle-lsp-runtime.test.ts new file mode 100644 index 00000000000..809ae1145f6 --- /dev/null +++ b/src/agents/pi-bundle-lsp-runtime.test.ts @@ -0,0 +1,141 @@ +import { EventEmitter } from "node:events"; +import { PassThrough, Writable } from "node:stream"; +import { afterEach, describe, expect, it, vi } from "vitest"; + +const spawnMock = vi.hoisted(() => vi.fn()); +const killProcessTreeMock = vi.hoisted(() => vi.fn()); +const loadEmbeddedPiLspConfigMock = vi.hoisted(() => vi.fn()); + +vi.mock("node:child_process", async () => ({ + ...(await vi.importActual("node:child_process")), + spawn: spawnMock, +})); + +vi.mock("../process/kill-tree.js", () => ({ + killProcessTree: killProcessTreeMock, +})); + +vi.mock("./embedded-pi-lsp.js", () => ({ + loadEmbeddedPiLspConfig: loadEmbeddedPiLspConfigMock, +})); + +vi.mock("../logger.js", () => ({ + logDebug: vi.fn(), + logWarn: vi.fn(), +})); + +function encodeLspMessage(body: unknown): string { + const json = JSON.stringify(body); + return `Content-Length: ${Buffer.byteLength(json, "utf-8")}\r\n\r\n${json}`; +} + +function parseWrittenLspBody(text: string): Record | null { + const bodyStart = text.indexOf("\r\n\r\n"); + if (bodyStart === -1) { + return null; + } + return JSON.parse(text.slice(bodyStart + 4)) as Record; +} + +class MockChildProcess extends EventEmitter { + exitCode: number | null = null; + signalCode: NodeJS.Signals | null = null; + killed = false; + pid = 4321; + readonly stdout = new PassThrough(); + readonly stderr = new PassThrough(); + readonly stdin: Writable; + + constructor() { + super(); + this.stdin = new Writable({ + write: (chunk, _encoding, callback) => { + this.respondToRequest(chunk.toString("utf8")); + callback(); + }, + }); + } + + kill = vi.fn((signal: NodeJS.Signals = "SIGTERM") => { + this.killed = true; + this.signalCode = signal; + this.emit("exit", null, signal); + this.emit("close", null, signal); + return true; + }); + + private respondToRequest(text: string): void { + const body = parseWrittenLspBody(text); + if (!body || typeof body.id !== "number" || typeof body.method !== "string") { + return; + } + const result = body.method === "initialize" ? { capabilities: { hoverProvider: true } } : null; + queueMicrotask(() => { + this.stdout.write(encodeLspMessage({ jsonrpc: "2.0", id: body.id, result })); + }); + } +} + +function configureSingleLspServer(): void { + loadEmbeddedPiLspConfigMock.mockReturnValue({ + lspServers: { + typescript: { + command: "typescript-language-server", + args: ["--stdio"], + }, + }, + diagnostics: [], + }); +} + +describe("bundle LSP runtime", () => { + afterEach(async () => { + const { disposeAllBundleLspRuntimes } = await import("./pi-bundle-lsp-runtime.js"); + await disposeAllBundleLspRuntimes(); + spawnMock.mockReset(); + killProcessTreeMock.mockReset(); + loadEmbeddedPiLspConfigMock.mockReset(); + }); + + it("starts LSP servers in a disposable process group", async () => { + configureSingleLspServer(); + const child = new MockChildProcess(); + spawnMock.mockReturnValue(child); + const { createBundleLspToolRuntime } = await import("./pi-bundle-lsp-runtime.js"); + + const runtime = await createBundleLspToolRuntime({ workspaceDir: "/tmp/workspace" }); + + expect(spawnMock).toHaveBeenCalledWith( + "typescript-language-server", + ["--stdio"], + expect.objectContaining({ + detached: process.platform !== "win32", + stdio: ["pipe", "pipe", "pipe"], + windowsHide: process.platform === "win32", + }), + ); + expect(runtime.tools.map((tool) => tool.name)).toContain("lsp_hover_typescript"); + + await runtime.dispose(); + + expect(killProcessTreeMock).toHaveBeenCalledWith(4321, { graceMs: 1000 }); + }); + + it("disposes active LSP sessions from the global shutdown sweep", async () => { + configureSingleLspServer(); + const child = new MockChildProcess(); + spawnMock.mockReturnValue(child); + const { createBundleLspToolRuntime, disposeAllBundleLspRuntimes } = + await import("./pi-bundle-lsp-runtime.js"); + + const runtime = await createBundleLspToolRuntime({ workspaceDir: "/tmp/workspace" }); + + await disposeAllBundleLspRuntimes(); + + expect(killProcessTreeMock).toHaveBeenCalledWith(4321, { graceMs: 1000 }); + + killProcessTreeMock.mockClear(); + await runtime.dispose(); + expect(killProcessTreeMock).not.toHaveBeenCalled(); + }); +}); diff --git a/src/agents/pi-bundle-lsp-runtime.ts b/src/agents/pi-bundle-lsp-runtime.ts index 1e5e8d34274..0b19f663519 100644 --- a/src/agents/pi-bundle-lsp-runtime.ts +++ b/src/agents/pi-bundle-lsp-runtime.ts @@ -3,11 +3,13 @@ import type { AgentToolResult } from "@mariozechner/pi-agent-core"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { logDebug, logWarn } from "../logger.js"; import { setPluginToolMeta } from "../plugins/tools.js"; +import { killProcessTree } from "../process/kill-tree.js"; import { normalizeOptionalLowercaseString } from "../shared/string-coerce.js"; import { loadEmbeddedPiLspConfig } from "./embedded-pi-lsp.js"; import { resolveStdioMcpServerLaunchConfig, describeStdioMcpServerLaunchConfig, + type StdioMcpServerLaunchConfig, } from "./mcp-stdio.js"; import type { AnyAgentTool } from "./tools/common.js"; @@ -17,10 +19,17 @@ type LspSession = { serverName: string; process: ChildProcess; requestId: number; - pendingRequests: Map void; reject: (e: Error) => void }>; + pendingRequests: Map; buffer: string; initialized: boolean; capabilities: LspServerCapabilities; + disposed: boolean; +}; + +type PendingLspRequest = { + resolve: (v: unknown) => void; + reject: (e: Error) => void; + timeout: ReturnType; }; type LspServerCapabilities = { @@ -44,6 +53,55 @@ type LspPositionParams = { character: number; }; +const LSP_SHUTDOWN_GRACE_MS = 500; +const LSP_PROCESS_TREE_KILL_GRACE_MS = 1_000; +const activeBundleLspSessions = new Set(); + +function delay(ms: number): Promise { + return new Promise((resolve) => { + const timeout = setTimeout(resolve, Math.max(1, ms)); + timeout.unref?.(); + }); +} + +function spawnLspServerProcess(config: StdioMcpServerLaunchConfig): ChildProcess { + return spawn(config.command, config.args ?? [], { + stdio: ["pipe", "pipe", "pipe"], + env: { ...process.env, ...config.env }, + cwd: config.cwd, + detached: process.platform !== "win32", + windowsHide: process.platform === "win32", + }); +} + +function createLspSession(serverName: string, child: ChildProcess): LspSession { + return { + serverName, + process: child, + requestId: 0, + pendingRequests: new Map(), + buffer: "", + initialized: false, + capabilities: {}, + disposed: false, + }; +} + +function registerActiveLspSession(session: LspSession): void { + activeBundleLspSessions.add(session); +} + +function attachLspProcessHandlers(session: LspSession): void { + session.process.stdout?.setEncoding("utf-8"); + session.process.stdout?.on("data", (chunk: string) => handleIncomingData(session, chunk)); + session.process.stderr?.setEncoding("utf-8"); + session.process.stderr?.on("data", (chunk: string) => { + for (const line of chunk.split(/\r?\n/).filter(Boolean)) { + logDebug(`bundle-lsp:${session.serverName}: ${line.trim()}`); + } + }); +} + function encodeLspMessage(body: unknown): string { const json = JSON.stringify(body); return `Content-Length: ${Buffer.byteLength(json, "utf-8")}\r\n\r\n${json}`; @@ -89,18 +147,17 @@ function parseLspMessages(buffer: string): { messages: unknown[]; remaining: str function sendRequest(session: LspSession, method: string, params?: unknown): Promise { const id = ++session.requestId; return new Promise((resolve, reject) => { - session.pendingRequests.set(id, { resolve, reject }); - const message = { jsonrpc: "2.0", id, method, params }; - const encoded = encodeLspMessage(message); - session.process.stdin?.write(encoded, "utf-8"); - - // Timeout after 10 seconds - setTimeout(() => { + const timeout = setTimeout(() => { if (session.pendingRequests.has(id)) { session.pendingRequests.delete(id); reject(new Error(`LSP request ${method} timed out`)); } }, 10_000); + timeout.unref?.(); + session.pendingRequests.set(id, { resolve, reject, timeout }); + const message = { jsonrpc: "2.0", id, method, params }; + const encoded = encodeLspMessage(message); + session.process.stdin?.write(encoded, "utf-8"); }); } @@ -119,6 +176,7 @@ function handleIncomingData(session: LspSession, chunk: string) { const pending = session.pendingRequests.get(record.id); if (pending) { session.pendingRequests.delete(record.id); + clearTimeout(pending.timeout); if ("error" in record) { pending.reject(new Error(JSON.stringify(record.error))); } else { @@ -157,10 +215,32 @@ async function initializeSession(session: LspSession): Promise {}); + const shutdown = sendRequest(session, "shutdown").catch(() => undefined); + await Promise.race([shutdown, delay(LSP_SHUTDOWN_GRACE_MS)]); session.process.stdin?.write( encodeLspMessage({ jsonrpc: "2.0", method: "exit", params: null }), "utf-8", @@ -170,10 +250,15 @@ async function disposeSession(session: LspSession) { } } for (const [, pending] of session.pendingRequests) { + clearTimeout(pending.timeout); pending.reject(new Error("LSP session disposed")); } session.pendingRequests.clear(); - session.process.kill(); + terminateLspProcessTree(session); +} + +async function disposeSessions(sessions: Iterable): Promise { + await Promise.allSettled(Array.from(sessions, (session) => disposeSession(session))); } function createLspPositionTool(params: { @@ -325,32 +410,12 @@ export async function createBundleLspToolRuntime(params: { continue; } const launchConfig = launch.config; + let session: LspSession | undefined; try { - const child = spawn(launchConfig.command, launchConfig.args ?? [], { - stdio: ["pipe", "pipe", "pipe"], - env: { ...process.env, ...launchConfig.env }, - cwd: launchConfig.cwd, - }); - - const session: LspSession = { - serverName, - process: child, - requestId: 0, - pendingRequests: new Map(), - buffer: "", - initialized: false, - capabilities: {}, - }; - - child.stdout?.setEncoding("utf-8"); - child.stdout?.on("data", (chunk: string) => handleIncomingData(session, chunk)); - child.stderr?.setEncoding("utf-8"); - child.stderr?.on("data", (chunk: string) => { - for (const line of chunk.split(/\r?\n/).filter(Boolean)) { - logDebug(`bundle-lsp:${serverName}: ${line.trim()}`); - } - }); + session = createLspSession(serverName, spawnLspServerProcess(launchConfig)); + registerActiveLspSession(session); + attachLspProcessHandlers(session); const capabilities = await initializeSession(session); session.capabilities = capabilities; @@ -380,6 +445,9 @@ export async function createBundleLspToolRuntime(params: { `bundle-lsp: started "${serverName}" (${describeStdioMcpServerLaunchConfig(launchConfig)}) with ${serverTools.length} tools`, ); } catch (error) { + if (session) { + await disposeSession(session); + } logWarn( `bundle-lsp: failed to start server "${serverName}" (${describeStdioMcpServerLaunchConfig(launchConfig)}): ${String(error)}`, ); @@ -393,11 +461,15 @@ export async function createBundleLspToolRuntime(params: { capabilities: s.capabilities, })), dispose: async () => { - await Promise.allSettled(sessions.map((session) => disposeSession(session))); + await disposeSessions(sessions); }, }; } catch (error) { - await Promise.allSettled(sessions.map((session) => disposeSession(session))); + await disposeSessions(sessions); throw error; } } + +export async function disposeAllBundleLspRuntimes(): Promise { + await disposeSessions(activeBundleLspSessions); +} diff --git a/src/gateway/server-close.test.ts b/src/gateway/server-close.test.ts index 5c2f17f683a..17dd79fa8d4 100644 --- a/src/gateway/server-close.test.ts +++ b/src/gateway/server-close.test.ts @@ -8,6 +8,7 @@ const mocks = { disposeAgentHarnesses: vi.fn(async () => undefined), disposeAllSessionMcpRuntimes: vi.fn(async () => undefined), triggerInternalHook: vi.fn(async (_event) => undefined), + disposeAllBundleLspRuntimes: vi.fn(async () => undefined), }; const WEBSOCKET_CLOSE_GRACE_MS = 1_000; const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250; @@ -47,6 +48,13 @@ vi.mock("../agents/pi-bundle-mcp-tools.js", async () => ({ disposeAllSessionMcpRuntimes: mocks.disposeAllSessionMcpRuntimes, })); +vi.mock("../agents/pi-bundle-lsp-runtime.js", async () => ({ + ...(await vi.importActual( + "../agents/pi-bundle-lsp-runtime.js", + )), + disposeAllBundleLspRuntimes: mocks.disposeAllBundleLspRuntimes, +})); + vi.mock("../logging/subsystem.js", () => ({ createSubsystemLogger: vi.fn(() => ({ warn: mocks.logWarn, @@ -105,6 +113,8 @@ describe("createGatewayCloseHandler", () => { mocks.disposeAllSessionMcpRuntimes.mockResolvedValue(undefined); mocks.triggerInternalHook.mockReset(); mocks.triggerInternalHook.mockResolvedValue(undefined); + mocks.disposeAllBundleLspRuntimes.mockClear(); + mocks.disposeAllBundleLspRuntimes.mockResolvedValue(undefined); }); afterEach(() => { @@ -206,6 +216,35 @@ describe("createGatewayCloseHandler", () => { expect(stopTaskRegistryMaintenance).toHaveBeenCalledTimes(1); expect(mocks.disposeAgentHarnesses).toHaveBeenCalledTimes(1); expect(mocks.disposeAllSessionMcpRuntimes).toHaveBeenCalledTimes(1); + expect(mocks.disposeAllBundleLspRuntimes).toHaveBeenCalledTimes(1); + }); + + it("starts bundle MCP and LSP runtime disposal concurrently", async () => { + const disposalOrder: string[] = []; + let releaseMcp: (() => void) | undefined; + const mcpBlocked = new Promise((resolve) => { + releaseMcp = resolve; + }); + mocks.disposeAllSessionMcpRuntimes.mockImplementation(async () => { + disposalOrder.push("mcp-start"); + await mcpBlocked; + disposalOrder.push("mcp-end"); + }); + mocks.disposeAllBundleLspRuntimes.mockImplementation(async () => { + disposalOrder.push("lsp-start"); + }); + const close = createGatewayCloseHandler(createGatewayCloseTestDeps()); + + const closePromise = close({ reason: "test shutdown" }); + try { + await vi.waitFor(() => { + expect(disposalOrder).toContain("lsp-start"); + }); + expect(disposalOrder).toEqual(["mcp-start", "lsp-start"]); + } finally { + releaseMcp?.(); + await closePromise; + } }); it("continues shutdown when bundle MCP runtime disposal hangs", async () => { @@ -224,6 +263,22 @@ describe("createGatewayCloseHandler", () => { ).toBe(true); }); + it("continues shutdown when bundle LSP runtime disposal hangs", async () => { + vi.useFakeTimers(); + mocks.disposeAllBundleLspRuntimes.mockReturnValue(new Promise(() => undefined)); + const close = createGatewayCloseHandler(createGatewayCloseTestDeps()); + + const closePromise = close({ reason: "test shutdown" }); + await vi.advanceTimersByTimeAsync(5_000); + await closePromise; + + expect( + mocks.logWarn.mock.calls.some(([message]) => + String(message).includes("bundle-lsp runtime disposal exceeded 5000ms"), + ), + ).toBe(true); + }); + it("terminates lingering websocket clients when websocket close exceeds the grace window", async () => { vi.useFakeTimers(); diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index 00e6382b5e7..f554d18968e 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -19,6 +19,7 @@ const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250; const HTTP_CLOSE_GRACE_MS = 1_000; const HTTP_CLOSE_FORCE_WAIT_MS = 5_000; const MCP_RUNTIME_CLOSE_GRACE_MS = 5_000; +const LSP_RUNTIME_CLOSE_GRACE_MS = 5_000; function createTimeoutRace(timeoutMs: number, onTimeout: () => T) { let timer: ReturnType | null = null; @@ -75,6 +76,30 @@ async function triggerGatewayLifecycleHookWithTimeout(params: { } } +async function disposeRuntimeWithShutdownGrace(params: { + label: "bundle-mcp" | "bundle-lsp"; + dispose: () => Promise; + graceMs: number; +}): Promise { + const disposePromise = Promise.resolve() + .then(params.dispose) + .catch((err: unknown) => { + shutdownLog.warn(`${params.label} runtime disposal failed during shutdown: ${String(err)}`); + }); + const disposeTimeout = createTimeoutRace(params.graceMs, () => { + shutdownLog.warn( + `${params.label} runtime disposal exceeded ${params.graceMs}ms; continuing shutdown`, + ); + }); + await Promise.race([disposePromise, disposeTimeout.promise]); + disposeTimeout.clear(); +} + +async function disposeAllBundleLspRuntimesOnDemand(): Promise { + const { disposeAllBundleLspRuntimes } = await import("../agents/pi-bundle-lsp-runtime.js"); + await disposeAllBundleLspRuntimes(); +} + export async function runGatewayClosePrelude(params: { stopDiagnostics?: () => void; clearSkillsRefreshTimer?: () => void; @@ -115,6 +140,7 @@ export function createGatewayCloseHandler(params: { stopChannel: (name: ChannelId, accountId?: string) => Promise; pluginServices: PluginServicesHandle | null; disposeSessionMcpRuntimes?: () => Promise; + disposeBundleLspRuntimes?: () => Promise; cron: { stop: () => void }; heartbeatRunner: HeartbeatRunner; updateCheckStop?: (() => void) | null; @@ -201,17 +227,18 @@ export function createGatewayCloseHandler(params: { await params.stopChannel(plugin.id); } await disposeRegisteredAgentHarnesses(); - const disposeMcpRuntimes = params.disposeSessionMcpRuntimes ?? disposeAllSessionMcpRuntimes; - const mcpDisposePromise = disposeMcpRuntimes().catch((err: unknown) => { - shutdownLog.warn(`bundle-mcp runtime disposal failed during shutdown: ${String(err)}`); - }); - const mcpDisposeTimeout = createTimeoutRace(MCP_RUNTIME_CLOSE_GRACE_MS, () => { - shutdownLog.warn( - `bundle-mcp runtime disposal exceeded ${MCP_RUNTIME_CLOSE_GRACE_MS}ms; continuing shutdown`, - ); - }); - await Promise.race([mcpDisposePromise, mcpDisposeTimeout.promise]); - mcpDisposeTimeout.clear(); + await Promise.all([ + disposeRuntimeWithShutdownGrace({ + label: "bundle-mcp", + dispose: params.disposeSessionMcpRuntimes ?? disposeAllSessionMcpRuntimes, + graceMs: MCP_RUNTIME_CLOSE_GRACE_MS, + }), + disposeRuntimeWithShutdownGrace({ + label: "bundle-lsp", + dispose: params.disposeBundleLspRuntimes ?? disposeAllBundleLspRuntimesOnDemand, + graceMs: LSP_RUNTIME_CLOSE_GRACE_MS, + }), + ]); if (params.pluginServices) { await params.pluginServices.stop().catch(() => {}); }