fix(mcp): tear down stdio process trees

This commit is contained in:
Peter Steinberger
2026-04-22 22:02:05 +01:00
parent 2c45879120
commit 5f7b44045d
6 changed files with 241 additions and 3 deletions

View File

@@ -21,6 +21,7 @@ Docs: https://docs.openclaw.ai
- Pi embedded runs: pass real built-in tools into Pi session creation and then narrow active tool names after custom tool registration, so the runner and compaction paths compile cleanly and keep OpenClaw-managed custom tool allowlists without feeding string arrays into `createAgentSession`. Thanks @vincentkoc.
- Agents/OpenAI websocket: route native OpenAI websocket metadata and session-header decisions through the shared endpoint classifier so local mocks and custom `models.providers.openai.baseUrl` endpoints stay out of the native OpenAI path consistently across embedded-runner and websocket transport code. Thanks @vincentkoc.
- MCP/gateway: tear down stdio MCP process trees on transport close and dispose bundled MCP runtimes during session delete/reset, preventing orphaned wrapper/server processes from accumulating. Fixes #68809 and #69465.
- Config: render validation warnings with real line breaks instead of a literal `\n` sequence in CLI/audit output. Fixes #70140.
- Cron/doctor: repair malformed persisted cron job IDs through `openclaw doctor`, including legacy `jobId`, non-string `id`, and missing `id` rows, so `cron list` no longer needs display-layer coercion for corrupt store data. Fixes #70128.
- Discord: normalize prefixed channel targets only at the thread-binding API boundary, so `sessions_spawn({ runtime: "acp", thread: true })` can create child threads from Discord channels without breaking current-channel ACP bindings. (#68034) Thanks @Zetarcos.

View File

@@ -0,0 +1,81 @@
import { EventEmitter } from "node:events";
import { PassThrough } from "node:stream";
import { afterEach, describe, expect, it, vi } from "vitest";
const spawnMock = vi.hoisted(() => vi.fn());
const killProcessTreeMock = vi.hoisted(() => vi.fn());
vi.mock("node:child_process", async () => ({
...(await vi.importActual<typeof import("node:child_process")>("node:child_process")),
spawn: spawnMock,
}));
vi.mock("../process/kill-tree.js", () => ({
killProcessTree: killProcessTreeMock,
}));
class MockChildProcess extends EventEmitter {
exitCode: number | null = null;
pid = 4321;
stdin = new PassThrough();
stdout = new PassThrough();
stderr = new PassThrough();
}
describe("OpenClawStdioClientTransport", () => {
afterEach(() => {
vi.useRealTimers();
spawnMock.mockReset();
killProcessTreeMock.mockReset();
});
it("starts stdio MCP servers in a disposable process group on POSIX", async () => {
const child = new MockChildProcess();
spawnMock.mockReturnValue(child);
const { OpenClawStdioClientTransport } = await import("./mcp-stdio-transport.js");
const transport = new OpenClawStdioClientTransport({
command: "npx",
args: ["-y", "example-mcp"],
env: { EXAMPLE: "1" },
cwd: "/tmp/example",
stderr: "pipe",
});
const started = transport.start();
child.emit("spawn");
await started;
expect(spawnMock).toHaveBeenCalledWith(
"npx",
["-y", "example-mcp"],
expect.objectContaining({
cwd: "/tmp/example",
detached: process.platform !== "win32",
shell: false,
stdio: ["pipe", "pipe", "pipe"],
}),
);
expect(transport.pid).toBe(4321);
expect(transport.stderr).toBeInstanceOf(PassThrough);
});
it("kills the process tree when graceful stdio close does not exit", async () => {
vi.useFakeTimers();
const child = new MockChildProcess();
spawnMock.mockReturnValue(child);
const { OpenClawStdioClientTransport } = await import("./mcp-stdio-transport.js");
const transport = new OpenClawStdioClientTransport({ command: "npx" });
const started = transport.start();
child.emit("spawn");
await started;
const closing = transport.close();
await vi.advanceTimersByTimeAsync(2000);
expect(killProcessTreeMock).toHaveBeenCalledWith(4321);
child.exitCode = 0;
child.emit("close", 0);
await closing;
});
});

View File

@@ -0,0 +1,140 @@
import { spawn, type ChildProcess } from "node:child_process";
import process from "node:process";
import { PassThrough } from "node:stream";
import { getDefaultEnvironment } from "@modelcontextprotocol/sdk/client/stdio.js";
import { ReadBuffer, serializeMessage } from "@modelcontextprotocol/sdk/shared/stdio.js";
import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
import { killProcessTree } from "../process/kill-tree.js";
export type OpenClawStdioServerParameters = {
command: string;
args?: string[];
env?: Record<string, string>;
cwd?: string;
stderr?: "pipe" | "overlapped" | "inherit" | "ignore";
};
const CLOSE_TIMEOUT_MS = 2000;
function delay(ms: number) {
return new Promise<void>((resolve) => {
setTimeout(resolve, ms).unref();
});
}
export class OpenClawStdioClientTransport implements Transport {
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
private readonly readBuffer = new ReadBuffer();
private readonly stderrStream: PassThrough | null = null;
private process?: ChildProcess;
constructor(private readonly serverParams: OpenClawStdioServerParameters) {
if (serverParams.stderr === "pipe" || serverParams.stderr === "overlapped") {
this.stderrStream = new PassThrough();
}
}
async start(): Promise<void> {
if (this.process) {
throw new Error(
"OpenClawStdioClientTransport already started; Client.connect() starts transports automatically.",
);
}
await new Promise<void>((resolve, reject) => {
const child = spawn(this.serverParams.command, this.serverParams.args ?? [], {
cwd: this.serverParams.cwd,
detached: process.platform !== "win32",
env: {
...getDefaultEnvironment(),
...this.serverParams.env,
},
shell: false,
stdio: ["pipe", "pipe", this.serverParams.stderr ?? "inherit"],
windowsHide: process.platform === "win32",
});
this.process = child;
child.on("error", (error: Error) => {
reject(error);
this.onerror?.(error);
});
child.on("spawn", () => resolve());
child.on("close", () => {
this.process = undefined;
this.onclose?.();
});
child.stdin?.on("error", (error: Error) => this.onerror?.(error));
child.stdout?.on("data", (chunk: Buffer) => {
this.readBuffer.append(chunk);
this.processReadBuffer();
});
child.stdout?.on("error", (error: Error) => this.onerror?.(error));
if (this.stderrStream && child.stderr) {
child.stderr.pipe(this.stderrStream);
}
});
}
get stderr() {
return this.stderrStream ?? this.process?.stderr ?? null;
}
get pid() {
return this.process?.pid ?? null;
}
private processReadBuffer() {
while (true) {
try {
const message = this.readBuffer.readMessage();
if (message === null) {
break;
}
this.onmessage?.(message);
} catch (error) {
this.onerror?.(error instanceof Error ? error : new Error(String(error)));
}
}
}
async close(): Promise<void> {
const processToClose = this.process;
this.process = undefined;
if (processToClose) {
const closePromise = new Promise<void>((resolve) => {
processToClose.once("close", () => resolve());
});
try {
processToClose.stdin?.end();
} catch {
// best-effort
}
await Promise.race([closePromise, delay(CLOSE_TIMEOUT_MS)]);
if (processToClose.exitCode === null && processToClose.pid) {
killProcessTree(processToClose.pid);
await Promise.race([closePromise, delay(CLOSE_TIMEOUT_MS)]);
}
}
this.readBuffer.clear();
}
send(message: JSONRPCMessage): Promise<void> {
return new Promise((resolve) => {
const stdin = this.process?.stdin;
if (!stdin) {
throw new Error("Not connected");
}
const json = serializeMessage(message);
if (stdin.write(json)) {
resolve();
} else {
stdin.once("drain", resolve);
}
});
}
}

View File

@@ -2,12 +2,12 @@ import {
SSEClientTransport,
type SSEClientTransportOptions,
} from "@modelcontextprotocol/sdk/client/sse.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import type { FetchLike, Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import { loadUndiciRuntimeDeps } from "../infra/net/undici-runtime.js";
import { logDebug } from "../logger.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { OpenClawStdioClientTransport } from "./mcp-stdio-transport.js";
import { resolveMcpTransportConfig } from "./mcp-transport-config.js";
export type ResolvedMcpTransport = {
@@ -18,7 +18,7 @@ export type ResolvedMcpTransport = {
detachStderr?: () => void;
};
function attachStderrLogging(serverName: string, transport: StdioClientTransport) {
function attachStderrLogging(serverName: string, transport: OpenClawStdioClientTransport) {
const stderr = transport.stderr;
if (!stderr || typeof stderr.on !== "function") {
return undefined;
@@ -84,7 +84,7 @@ export function resolveMcpTransport(
return null;
}
if (resolved.kind === "stdio") {
const transport = new StdioClientTransport({
const transport = new OpenClawStdioClientTransport({
command: resolved.command,
args: resolved.args,
env: resolved.env,

View File

@@ -97,6 +97,9 @@ const acpManagerMocks = vi.hoisted(() => ({
const browserSessionTabMocks = vi.hoisted(() => ({
closeTrackedBrowserTabsForSessions: vi.fn(async () => 0),
}));
const bundleMcpRuntimeMocks = vi.hoisted(() => ({
disposeSessionMcpRuntime: vi.fn(async (_sessionId: string) => {}),
}));
vi.mock("../auto-reply/reply/queue.js", async () => {
const actual = await vi.importActual<typeof import("../auto-reply/reply/queue.js")>(
@@ -202,6 +205,10 @@ vi.mock("../plugin-sdk/browser-maintenance.js", () => ({
movePathToTrash: vi.fn(async () => {}),
}));
vi.mock("../agents/pi-bundle-mcp-tools.js", () => ({
disposeSessionMcpRuntime: bundleMcpRuntimeMocks.disposeSessionMcpRuntime,
}));
installGatewayTestHooks({ scope: "suite" });
let harness: GatewayServerHarness;
@@ -390,6 +397,8 @@ describe("gateway server sessions", () => {
acpManagerMocks.closeSession.mockClear();
browserSessionTabMocks.closeTrackedBrowserTabsForSessions.mockClear();
browserSessionTabMocks.closeTrackedBrowserTabsForSessions.mockResolvedValue(0);
bundleMcpRuntimeMocks.disposeSessionMcpRuntime.mockClear();
bundleMcpRuntimeMocks.disposeSessionMcpRuntime.mockResolvedValue(undefined);
});
test("sessions.create stores dashboard session model and parent linkage, and creates a transcript", async () => {
@@ -2379,6 +2388,7 @@ describe("gateway server sessions", () => {
["discord:group:dev", "agent:main:discord:group:dev", "sess-active"],
"sess-active",
);
expect(bundleMcpRuntimeMocks.disposeSessionMcpRuntime).toHaveBeenCalledWith("sess-active");
expect(browserSessionTabMocks.closeTrackedBrowserTabsForSessions).toHaveBeenCalledTimes(1);
expect(browserSessionTabMocks.closeTrackedBrowserTabsForSessions).toHaveBeenCalledWith({
sessionKeys: expect.arrayContaining([

View File

@@ -7,6 +7,7 @@ import { getAcpRuntimeBackend } from "../acp/runtime/registry.js";
import { readAcpSessionEntry, upsertAcpSessionMeta } from "../acp/runtime/session-meta.js";
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js";
import { clearBootstrapSnapshot } from "../agents/bootstrap-cache.js";
import { disposeSessionMcpRuntime } from "../agents/pi-bundle-mcp-tools.js";
import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../agents/pi-embedded.js";
import { stopSubagentsForRequester } from "../auto-reply/reply/abort.js";
import { clearSessionQueues } from "../auto-reply/reply/queue.js";
@@ -225,6 +226,11 @@ async function ensureSessionRuntimeCleanup(params: {
const ended = await waitForEmbeddedPiRunEnd(params.sessionId, 15_000);
clearBootstrapSnapshot(params.target.canonicalKey);
if (ended) {
await disposeSessionMcpRuntime(params.sessionId).catch((error) => {
logVerbose(
`sessions cleanup: failed to dispose bundle MCP runtime for ${params.sessionId}: ${String(error)}`,
);
});
await closeTrackedBrowserTabs();
return undefined;
}