fix: clean up bundled LSP process trees on shutdown

Fixes #72357
This commit is contained in:
NVIDIAN
2026-04-27 08:10:56 -10:00
committed by GitHub
parent d9bef3fe7c
commit dc96886378
5 changed files with 346 additions and 47 deletions

View File

@@ -17,6 +17,10 @@ Docs: https://docs.openclaw.ai
- ACP/runtime: harden the opt-in Coven backend with workspace-confined launch paths, home-expanded Coven socket config, bounded socket responses, sanitized daemon output, and controlled polling failure handling. Thanks @BunsDev.
### Fixes
- Agents/LSP: terminate bundled stdio LSP process trees during runtime disposal and Gateway shutdown, so nested children such as `tsserver` do not survive stop or restart. Fixes #72357. Thanks @ai-hpc and @bittoby.
## 2026.4.26
### Changes

View File

@@ -0,0 +1,141 @@
import { EventEmitter } from "node:events";
import { PassThrough, Writable } from "node:stream";
import { afterEach, describe, expect, it, vi } from "vitest";
const spawnMock = vi.hoisted(() => vi.fn());
const killProcessTreeMock = vi.hoisted(() => vi.fn());
const loadEmbeddedPiLspConfigMock = vi.hoisted(() => vi.fn());
vi.mock("node:child_process", async () => ({
...(await vi.importActual<typeof import("node:child_process")>("node:child_process")),
spawn: spawnMock,
}));
vi.mock("../process/kill-tree.js", () => ({
killProcessTree: killProcessTreeMock,
}));
vi.mock("./embedded-pi-lsp.js", () => ({
loadEmbeddedPiLspConfig: loadEmbeddedPiLspConfigMock,
}));
vi.mock("../logger.js", () => ({
logDebug: vi.fn(),
logWarn: vi.fn(),
}));
function encodeLspMessage(body: unknown): string {
const json = JSON.stringify(body);
return `Content-Length: ${Buffer.byteLength(json, "utf-8")}\r\n\r\n${json}`;
}
function parseWrittenLspBody(text: string): Record<string, unknown> | null {
const bodyStart = text.indexOf("\r\n\r\n");
if (bodyStart === -1) {
return null;
}
return JSON.parse(text.slice(bodyStart + 4)) as Record<string, unknown>;
}
class MockChildProcess extends EventEmitter {
exitCode: number | null = null;
signalCode: NodeJS.Signals | null = null;
killed = false;
pid = 4321;
readonly stdout = new PassThrough();
readonly stderr = new PassThrough();
readonly stdin: Writable;
constructor() {
super();
this.stdin = new Writable({
write: (chunk, _encoding, callback) => {
this.respondToRequest(chunk.toString("utf8"));
callback();
},
});
}
kill = vi.fn((signal: NodeJS.Signals = "SIGTERM") => {
this.killed = true;
this.signalCode = signal;
this.emit("exit", null, signal);
this.emit("close", null, signal);
return true;
});
private respondToRequest(text: string): void {
const body = parseWrittenLspBody(text);
if (!body || typeof body.id !== "number" || typeof body.method !== "string") {
return;
}
const result = body.method === "initialize" ? { capabilities: { hoverProvider: true } } : null;
queueMicrotask(() => {
this.stdout.write(encodeLspMessage({ jsonrpc: "2.0", id: body.id, result }));
});
}
}
function configureSingleLspServer(): void {
loadEmbeddedPiLspConfigMock.mockReturnValue({
lspServers: {
typescript: {
command: "typescript-language-server",
args: ["--stdio"],
},
},
diagnostics: [],
});
}
describe("bundle LSP runtime", () => {
afterEach(async () => {
const { disposeAllBundleLspRuntimes } = await import("./pi-bundle-lsp-runtime.js");
await disposeAllBundleLspRuntimes();
spawnMock.mockReset();
killProcessTreeMock.mockReset();
loadEmbeddedPiLspConfigMock.mockReset();
});
it("starts LSP servers in a disposable process group", async () => {
configureSingleLspServer();
const child = new MockChildProcess();
spawnMock.mockReturnValue(child);
const { createBundleLspToolRuntime } = await import("./pi-bundle-lsp-runtime.js");
const runtime = await createBundleLspToolRuntime({ workspaceDir: "/tmp/workspace" });
expect(spawnMock).toHaveBeenCalledWith(
"typescript-language-server",
["--stdio"],
expect.objectContaining({
detached: process.platform !== "win32",
stdio: ["pipe", "pipe", "pipe"],
windowsHide: process.platform === "win32",
}),
);
expect(runtime.tools.map((tool) => tool.name)).toContain("lsp_hover_typescript");
await runtime.dispose();
expect(killProcessTreeMock).toHaveBeenCalledWith(4321, { graceMs: 1000 });
});
it("disposes active LSP sessions from the global shutdown sweep", async () => {
configureSingleLspServer();
const child = new MockChildProcess();
spawnMock.mockReturnValue(child);
const { createBundleLspToolRuntime, disposeAllBundleLspRuntimes } =
await import("./pi-bundle-lsp-runtime.js");
const runtime = await createBundleLspToolRuntime({ workspaceDir: "/tmp/workspace" });
await disposeAllBundleLspRuntimes();
expect(killProcessTreeMock).toHaveBeenCalledWith(4321, { graceMs: 1000 });
killProcessTreeMock.mockClear();
await runtime.dispose();
expect(killProcessTreeMock).not.toHaveBeenCalled();
});
});

View File

@@ -3,11 +3,13 @@ import type { AgentToolResult } from "@mariozechner/pi-agent-core";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { logDebug, logWarn } from "../logger.js";
import { setPluginToolMeta } from "../plugins/tools.js";
import { killProcessTree } from "../process/kill-tree.js";
import { normalizeOptionalLowercaseString } from "../shared/string-coerce.js";
import { loadEmbeddedPiLspConfig } from "./embedded-pi-lsp.js";
import {
resolveStdioMcpServerLaunchConfig,
describeStdioMcpServerLaunchConfig,
type StdioMcpServerLaunchConfig,
} from "./mcp-stdio.js";
import type { AnyAgentTool } from "./tools/common.js";
@@ -17,10 +19,17 @@ type LspSession = {
serverName: string;
process: ChildProcess;
requestId: number;
pendingRequests: Map<number, { resolve: (v: unknown) => void; reject: (e: Error) => void }>;
pendingRequests: Map<number, PendingLspRequest>;
buffer: string;
initialized: boolean;
capabilities: LspServerCapabilities;
disposed: boolean;
};
type PendingLspRequest = {
resolve: (v: unknown) => void;
reject: (e: Error) => void;
timeout: ReturnType<typeof setTimeout>;
};
type LspServerCapabilities = {
@@ -44,6 +53,55 @@ type LspPositionParams = {
character: number;
};
const LSP_SHUTDOWN_GRACE_MS = 500;
const LSP_PROCESS_TREE_KILL_GRACE_MS = 1_000;
const activeBundleLspSessions = new Set<LspSession>();
function delay(ms: number): Promise<void> {
return new Promise((resolve) => {
const timeout = setTimeout(resolve, Math.max(1, ms));
timeout.unref?.();
});
}
function spawnLspServerProcess(config: StdioMcpServerLaunchConfig): ChildProcess {
return spawn(config.command, config.args ?? [], {
stdio: ["pipe", "pipe", "pipe"],
env: { ...process.env, ...config.env },
cwd: config.cwd,
detached: process.platform !== "win32",
windowsHide: process.platform === "win32",
});
}
function createLspSession(serverName: string, child: ChildProcess): LspSession {
return {
serverName,
process: child,
requestId: 0,
pendingRequests: new Map(),
buffer: "",
initialized: false,
capabilities: {},
disposed: false,
};
}
function registerActiveLspSession(session: LspSession): void {
activeBundleLspSessions.add(session);
}
function attachLspProcessHandlers(session: LspSession): void {
session.process.stdout?.setEncoding("utf-8");
session.process.stdout?.on("data", (chunk: string) => handleIncomingData(session, chunk));
session.process.stderr?.setEncoding("utf-8");
session.process.stderr?.on("data", (chunk: string) => {
for (const line of chunk.split(/\r?\n/).filter(Boolean)) {
logDebug(`bundle-lsp:${session.serverName}: ${line.trim()}`);
}
});
}
function encodeLspMessage(body: unknown): string {
const json = JSON.stringify(body);
return `Content-Length: ${Buffer.byteLength(json, "utf-8")}\r\n\r\n${json}`;
@@ -89,18 +147,17 @@ function parseLspMessages(buffer: string): { messages: unknown[]; remaining: str
function sendRequest(session: LspSession, method: string, params?: unknown): Promise<unknown> {
const id = ++session.requestId;
return new Promise((resolve, reject) => {
session.pendingRequests.set(id, { resolve, reject });
const message = { jsonrpc: "2.0", id, method, params };
const encoded = encodeLspMessage(message);
session.process.stdin?.write(encoded, "utf-8");
// Timeout after 10 seconds
setTimeout(() => {
const timeout = setTimeout(() => {
if (session.pendingRequests.has(id)) {
session.pendingRequests.delete(id);
reject(new Error(`LSP request ${method} timed out`));
}
}, 10_000);
timeout.unref?.();
session.pendingRequests.set(id, { resolve, reject, timeout });
const message = { jsonrpc: "2.0", id, method, params };
const encoded = encodeLspMessage(message);
session.process.stdin?.write(encoded, "utf-8");
});
}
@@ -119,6 +176,7 @@ function handleIncomingData(session: LspSession, chunk: string) {
const pending = session.pendingRequests.get(record.id);
if (pending) {
session.pendingRequests.delete(record.id);
clearTimeout(pending.timeout);
if ("error" in record) {
pending.reject(new Error(JSON.stringify(record.error)));
} else {
@@ -157,10 +215,32 @@ async function initializeSession(session: LspSession): Promise<LspServerCapabili
return result?.capabilities ?? {};
}
function hasLspProcessExited(child: ChildProcess): boolean {
return child.exitCode !== null || child.signalCode !== null;
}
function terminateLspProcessTree(session: LspSession): void {
const pid = session.process.pid;
if (pid && !hasLspProcessExited(session.process)) {
killProcessTree(pid, { graceMs: LSP_PROCESS_TREE_KILL_GRACE_MS });
return;
}
if (!hasLspProcessExited(session.process)) {
session.process.kill("SIGTERM");
}
}
async function disposeSession(session: LspSession) {
if (session.disposed) {
return;
}
session.disposed = true;
activeBundleLspSessions.delete(session);
if (session.initialized) {
try {
await sendRequest(session, "shutdown").catch(() => {});
const shutdown = sendRequest(session, "shutdown").catch(() => undefined);
await Promise.race([shutdown, delay(LSP_SHUTDOWN_GRACE_MS)]);
session.process.stdin?.write(
encodeLspMessage({ jsonrpc: "2.0", method: "exit", params: null }),
"utf-8",
@@ -170,10 +250,15 @@ async function disposeSession(session: LspSession) {
}
}
for (const [, pending] of session.pendingRequests) {
clearTimeout(pending.timeout);
pending.reject(new Error("LSP session disposed"));
}
session.pendingRequests.clear();
session.process.kill();
terminateLspProcessTree(session);
}
async function disposeSessions(sessions: Iterable<LspSession>): Promise<void> {
await Promise.allSettled(Array.from(sessions, (session) => disposeSession(session)));
}
function createLspPositionTool(params: {
@@ -325,32 +410,12 @@ export async function createBundleLspToolRuntime(params: {
continue;
}
const launchConfig = launch.config;
let session: LspSession | undefined;
try {
const child = spawn(launchConfig.command, launchConfig.args ?? [], {
stdio: ["pipe", "pipe", "pipe"],
env: { ...process.env, ...launchConfig.env },
cwd: launchConfig.cwd,
});
const session: LspSession = {
serverName,
process: child,
requestId: 0,
pendingRequests: new Map(),
buffer: "",
initialized: false,
capabilities: {},
};
child.stdout?.setEncoding("utf-8");
child.stdout?.on("data", (chunk: string) => handleIncomingData(session, chunk));
child.stderr?.setEncoding("utf-8");
child.stderr?.on("data", (chunk: string) => {
for (const line of chunk.split(/\r?\n/).filter(Boolean)) {
logDebug(`bundle-lsp:${serverName}: ${line.trim()}`);
}
});
session = createLspSession(serverName, spawnLspServerProcess(launchConfig));
registerActiveLspSession(session);
attachLspProcessHandlers(session);
const capabilities = await initializeSession(session);
session.capabilities = capabilities;
@@ -380,6 +445,9 @@ export async function createBundleLspToolRuntime(params: {
`bundle-lsp: started "${serverName}" (${describeStdioMcpServerLaunchConfig(launchConfig)}) with ${serverTools.length} tools`,
);
} catch (error) {
if (session) {
await disposeSession(session);
}
logWarn(
`bundle-lsp: failed to start server "${serverName}" (${describeStdioMcpServerLaunchConfig(launchConfig)}): ${String(error)}`,
);
@@ -393,11 +461,15 @@ export async function createBundleLspToolRuntime(params: {
capabilities: s.capabilities,
})),
dispose: async () => {
await Promise.allSettled(sessions.map((session) => disposeSession(session)));
await disposeSessions(sessions);
},
};
} catch (error) {
await Promise.allSettled(sessions.map((session) => disposeSession(session)));
await disposeSessions(sessions);
throw error;
}
}
export async function disposeAllBundleLspRuntimes(): Promise<void> {
await disposeSessions(activeBundleLspSessions);
}

View File

@@ -8,6 +8,7 @@ const mocks = {
disposeAgentHarnesses: vi.fn(async () => undefined),
disposeAllSessionMcpRuntimes: vi.fn(async () => undefined),
triggerInternalHook: vi.fn<TriggerInternalHookMock>(async (_event) => undefined),
disposeAllBundleLspRuntimes: vi.fn(async () => undefined),
};
const WEBSOCKET_CLOSE_GRACE_MS = 1_000;
const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250;
@@ -47,6 +48,13 @@ vi.mock("../agents/pi-bundle-mcp-tools.js", async () => ({
disposeAllSessionMcpRuntimes: mocks.disposeAllSessionMcpRuntimes,
}));
vi.mock("../agents/pi-bundle-lsp-runtime.js", async () => ({
...(await vi.importActual<typeof import("../agents/pi-bundle-lsp-runtime.js")>(
"../agents/pi-bundle-lsp-runtime.js",
)),
disposeAllBundleLspRuntimes: mocks.disposeAllBundleLspRuntimes,
}));
vi.mock("../logging/subsystem.js", () => ({
createSubsystemLogger: vi.fn(() => ({
warn: mocks.logWarn,
@@ -105,6 +113,8 @@ describe("createGatewayCloseHandler", () => {
mocks.disposeAllSessionMcpRuntimes.mockResolvedValue(undefined);
mocks.triggerInternalHook.mockReset();
mocks.triggerInternalHook.mockResolvedValue(undefined);
mocks.disposeAllBundleLspRuntimes.mockClear();
mocks.disposeAllBundleLspRuntimes.mockResolvedValue(undefined);
});
afterEach(() => {
@@ -206,6 +216,35 @@ describe("createGatewayCloseHandler", () => {
expect(stopTaskRegistryMaintenance).toHaveBeenCalledTimes(1);
expect(mocks.disposeAgentHarnesses).toHaveBeenCalledTimes(1);
expect(mocks.disposeAllSessionMcpRuntimes).toHaveBeenCalledTimes(1);
expect(mocks.disposeAllBundleLspRuntimes).toHaveBeenCalledTimes(1);
});
it("starts bundle MCP and LSP runtime disposal concurrently", async () => {
const disposalOrder: string[] = [];
let releaseMcp: (() => void) | undefined;
const mcpBlocked = new Promise<void>((resolve) => {
releaseMcp = resolve;
});
mocks.disposeAllSessionMcpRuntimes.mockImplementation(async () => {
disposalOrder.push("mcp-start");
await mcpBlocked;
disposalOrder.push("mcp-end");
});
mocks.disposeAllBundleLspRuntimes.mockImplementation(async () => {
disposalOrder.push("lsp-start");
});
const close = createGatewayCloseHandler(createGatewayCloseTestDeps());
const closePromise = close({ reason: "test shutdown" });
try {
await vi.waitFor(() => {
expect(disposalOrder).toContain("lsp-start");
});
expect(disposalOrder).toEqual(["mcp-start", "lsp-start"]);
} finally {
releaseMcp?.();
await closePromise;
}
});
it("continues shutdown when bundle MCP runtime disposal hangs", async () => {
@@ -224,6 +263,22 @@ describe("createGatewayCloseHandler", () => {
).toBe(true);
});
it("continues shutdown when bundle LSP runtime disposal hangs", async () => {
vi.useFakeTimers();
mocks.disposeAllBundleLspRuntimes.mockReturnValue(new Promise(() => undefined));
const close = createGatewayCloseHandler(createGatewayCloseTestDeps());
const closePromise = close({ reason: "test shutdown" });
await vi.advanceTimersByTimeAsync(5_000);
await closePromise;
expect(
mocks.logWarn.mock.calls.some(([message]) =>
String(message).includes("bundle-lsp runtime disposal exceeded 5000ms"),
),
).toBe(true);
});
it("terminates lingering websocket clients when websocket close exceeds the grace window", async () => {
vi.useFakeTimers();

View File

@@ -19,6 +19,7 @@ const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250;
const HTTP_CLOSE_GRACE_MS = 1_000;
const HTTP_CLOSE_FORCE_WAIT_MS = 5_000;
const MCP_RUNTIME_CLOSE_GRACE_MS = 5_000;
const LSP_RUNTIME_CLOSE_GRACE_MS = 5_000;
function createTimeoutRace<T>(timeoutMs: number, onTimeout: () => T) {
let timer: ReturnType<typeof setTimeout> | null = null;
@@ -75,6 +76,30 @@ async function triggerGatewayLifecycleHookWithTimeout(params: {
}
}
async function disposeRuntimeWithShutdownGrace(params: {
label: "bundle-mcp" | "bundle-lsp";
dispose: () => Promise<void>;
graceMs: number;
}): Promise<void> {
const disposePromise = Promise.resolve()
.then(params.dispose)
.catch((err: unknown) => {
shutdownLog.warn(`${params.label} runtime disposal failed during shutdown: ${String(err)}`);
});
const disposeTimeout = createTimeoutRace(params.graceMs, () => {
shutdownLog.warn(
`${params.label} runtime disposal exceeded ${params.graceMs}ms; continuing shutdown`,
);
});
await Promise.race([disposePromise, disposeTimeout.promise]);
disposeTimeout.clear();
}
async function disposeAllBundleLspRuntimesOnDemand(): Promise<void> {
const { disposeAllBundleLspRuntimes } = await import("../agents/pi-bundle-lsp-runtime.js");
await disposeAllBundleLspRuntimes();
}
export async function runGatewayClosePrelude(params: {
stopDiagnostics?: () => void;
clearSkillsRefreshTimer?: () => void;
@@ -115,6 +140,7 @@ export function createGatewayCloseHandler(params: {
stopChannel: (name: ChannelId, accountId?: string) => Promise<void>;
pluginServices: PluginServicesHandle | null;
disposeSessionMcpRuntimes?: () => Promise<void>;
disposeBundleLspRuntimes?: () => Promise<void>;
cron: { stop: () => void };
heartbeatRunner: HeartbeatRunner;
updateCheckStop?: (() => void) | null;
@@ -201,17 +227,18 @@ export function createGatewayCloseHandler(params: {
await params.stopChannel(plugin.id);
}
await disposeRegisteredAgentHarnesses();
const disposeMcpRuntimes = params.disposeSessionMcpRuntimes ?? disposeAllSessionMcpRuntimes;
const mcpDisposePromise = disposeMcpRuntimes().catch((err: unknown) => {
shutdownLog.warn(`bundle-mcp runtime disposal failed during shutdown: ${String(err)}`);
});
const mcpDisposeTimeout = createTimeoutRace(MCP_RUNTIME_CLOSE_GRACE_MS, () => {
shutdownLog.warn(
`bundle-mcp runtime disposal exceeded ${MCP_RUNTIME_CLOSE_GRACE_MS}ms; continuing shutdown`,
);
});
await Promise.race([mcpDisposePromise, mcpDisposeTimeout.promise]);
mcpDisposeTimeout.clear();
await Promise.all([
disposeRuntimeWithShutdownGrace({
label: "bundle-mcp",
dispose: params.disposeSessionMcpRuntimes ?? disposeAllSessionMcpRuntimes,
graceMs: MCP_RUNTIME_CLOSE_GRACE_MS,
}),
disposeRuntimeWithShutdownGrace({
label: "bundle-lsp",
dispose: params.disposeBundleLspRuntimes ?? disposeAllBundleLspRuntimesOnDemand,
graceMs: LSP_RUNTIME_CLOSE_GRACE_MS,
}),
]);
if (params.pluginServices) {
await params.pluginServices.stop().catch(() => {});
}