diff --git a/CHANGELOG.md b/CHANGELOG.md index 8480dc9b549..6e65f84dd19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Docs: https://docs.openclaw.ai - Pi embedded runs: pass real built-in tools into Pi session creation and then narrow active tool names after custom tool registration, so the runner and compaction paths compile cleanly and keep OpenClaw-managed custom tool allowlists without feeding string arrays into `createAgentSession`. Thanks @vincentkoc. - Agents/OpenAI websocket: route native OpenAI websocket metadata and session-header decisions through the shared endpoint classifier so local mocks and custom `models.providers.openai.baseUrl` endpoints stay out of the native OpenAI path consistently across embedded-runner and websocket transport code. Thanks @vincentkoc. +- MCP/gateway: tear down stdio MCP process trees on transport close and dispose bundled MCP runtimes during session delete/reset, preventing orphaned wrapper/server processes from accumulating. Fixes #68809 and #69465. - Config: render validation warnings with real line breaks instead of a literal `\n` sequence in CLI/audit output. Fixes #70140. - Cron/doctor: repair malformed persisted cron job IDs through `openclaw doctor`, including legacy `jobId`, non-string `id`, and missing `id` rows, so `cron list` no longer needs display-layer coercion for corrupt store data. Fixes #70128. - Discord: normalize prefixed channel targets only at the thread-binding API boundary, so `sessions_spawn({ runtime: "acp", thread: true })` can create child threads from Discord channels without breaking current-channel ACP bindings. (#68034) Thanks @Zetarcos. diff --git a/src/agents/mcp-stdio-transport.test.ts b/src/agents/mcp-stdio-transport.test.ts new file mode 100644 index 00000000000..c2ba1e1d88f --- /dev/null +++ b/src/agents/mcp-stdio-transport.test.ts @@ -0,0 +1,81 @@ +import { EventEmitter } from "node:events"; +import { PassThrough } from "node:stream"; +import { afterEach, describe, expect, it, vi } from "vitest"; + +const spawnMock = vi.hoisted(() => vi.fn()); +const killProcessTreeMock = vi.hoisted(() => vi.fn()); + +vi.mock("node:child_process", async () => ({ + ...(await vi.importActual("node:child_process")), + spawn: spawnMock, +})); + +vi.mock("../process/kill-tree.js", () => ({ + killProcessTree: killProcessTreeMock, +})); + +class MockChildProcess extends EventEmitter { + exitCode: number | null = null; + pid = 4321; + stdin = new PassThrough(); + stdout = new PassThrough(); + stderr = new PassThrough(); +} + +describe("OpenClawStdioClientTransport", () => { + afterEach(() => { + vi.useRealTimers(); + spawnMock.mockReset(); + killProcessTreeMock.mockReset(); + }); + + it("starts stdio MCP servers in a disposable process group on POSIX", async () => { + const child = new MockChildProcess(); + spawnMock.mockReturnValue(child); + const { OpenClawStdioClientTransport } = await import("./mcp-stdio-transport.js"); + + const transport = new OpenClawStdioClientTransport({ + command: "npx", + args: ["-y", "example-mcp"], + env: { EXAMPLE: "1" }, + cwd: "/tmp/example", + stderr: "pipe", + }); + const started = transport.start(); + child.emit("spawn"); + await started; + + expect(spawnMock).toHaveBeenCalledWith( + "npx", + ["-y", "example-mcp"], + expect.objectContaining({ + cwd: "/tmp/example", + detached: process.platform !== "win32", + shell: false, + stdio: ["pipe", "pipe", "pipe"], + }), + ); + expect(transport.pid).toBe(4321); + expect(transport.stderr).toBeInstanceOf(PassThrough); + }); + + it("kills the process tree when graceful stdio close does not exit", async () => { + vi.useFakeTimers(); + const child = new MockChildProcess(); + spawnMock.mockReturnValue(child); + const { OpenClawStdioClientTransport } = await import("./mcp-stdio-transport.js"); + + const transport = new OpenClawStdioClientTransport({ command: "npx" }); + const started = transport.start(); + child.emit("spawn"); + await started; + + const closing = transport.close(); + await vi.advanceTimersByTimeAsync(2000); + expect(killProcessTreeMock).toHaveBeenCalledWith(4321); + + child.exitCode = 0; + child.emit("close", 0); + await closing; + }); +}); diff --git a/src/agents/mcp-stdio-transport.ts b/src/agents/mcp-stdio-transport.ts new file mode 100644 index 00000000000..00049eb73a6 --- /dev/null +++ b/src/agents/mcp-stdio-transport.ts @@ -0,0 +1,140 @@ +import { spawn, type ChildProcess } from "node:child_process"; +import process from "node:process"; +import { PassThrough } from "node:stream"; +import { getDefaultEnvironment } from "@modelcontextprotocol/sdk/client/stdio.js"; +import { ReadBuffer, serializeMessage } from "@modelcontextprotocol/sdk/shared/stdio.js"; +import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; +import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js"; +import { killProcessTree } from "../process/kill-tree.js"; + +export type OpenClawStdioServerParameters = { + command: string; + args?: string[]; + env?: Record; + cwd?: string; + stderr?: "pipe" | "overlapped" | "inherit" | "ignore"; +}; + +const CLOSE_TIMEOUT_MS = 2000; + +function delay(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms).unref(); + }); +} + +export class OpenClawStdioClientTransport implements Transport { + onclose?: () => void; + onerror?: (error: Error) => void; + onmessage?: (message: JSONRPCMessage) => void; + + private readonly readBuffer = new ReadBuffer(); + private readonly stderrStream: PassThrough | null = null; + private process?: ChildProcess; + + constructor(private readonly serverParams: OpenClawStdioServerParameters) { + if (serverParams.stderr === "pipe" || serverParams.stderr === "overlapped") { + this.stderrStream = new PassThrough(); + } + } + + async start(): Promise { + if (this.process) { + throw new Error( + "OpenClawStdioClientTransport already started; Client.connect() starts transports automatically.", + ); + } + + await new Promise((resolve, reject) => { + const child = spawn(this.serverParams.command, this.serverParams.args ?? [], { + cwd: this.serverParams.cwd, + detached: process.platform !== "win32", + env: { + ...getDefaultEnvironment(), + ...this.serverParams.env, + }, + shell: false, + stdio: ["pipe", "pipe", this.serverParams.stderr ?? "inherit"], + windowsHide: process.platform === "win32", + }); + this.process = child; + + child.on("error", (error: Error) => { + reject(error); + this.onerror?.(error); + }); + child.on("spawn", () => resolve()); + child.on("close", () => { + this.process = undefined; + this.onclose?.(); + }); + child.stdin?.on("error", (error: Error) => this.onerror?.(error)); + child.stdout?.on("data", (chunk: Buffer) => { + this.readBuffer.append(chunk); + this.processReadBuffer(); + }); + child.stdout?.on("error", (error: Error) => this.onerror?.(error)); + if (this.stderrStream && child.stderr) { + child.stderr.pipe(this.stderrStream); + } + }); + } + + get stderr() { + return this.stderrStream ?? this.process?.stderr ?? null; + } + + get pid() { + return this.process?.pid ?? null; + } + + private processReadBuffer() { + while (true) { + try { + const message = this.readBuffer.readMessage(); + if (message === null) { + break; + } + this.onmessage?.(message); + } catch (error) { + this.onerror?.(error instanceof Error ? error : new Error(String(error))); + } + } + } + + async close(): Promise { + const processToClose = this.process; + this.process = undefined; + if (processToClose) { + const closePromise = new Promise((resolve) => { + processToClose.once("close", () => resolve()); + }); + try { + processToClose.stdin?.end(); + } catch { + // best-effort + } + await Promise.race([closePromise, delay(CLOSE_TIMEOUT_MS)]); + if (processToClose.exitCode === null && processToClose.pid) { + killProcessTree(processToClose.pid); + await Promise.race([closePromise, delay(CLOSE_TIMEOUT_MS)]); + } + } + this.readBuffer.clear(); + } + + send(message: JSONRPCMessage): Promise { + return new Promise((resolve) => { + const stdin = this.process?.stdin; + if (!stdin) { + throw new Error("Not connected"); + } + const json = serializeMessage(message); + if (stdin.write(json)) { + resolve(); + } else { + stdin.once("drain", resolve); + } + }); + } +} diff --git a/src/agents/mcp-transport.ts b/src/agents/mcp-transport.ts index 6c35f9710d3..2429284d7f1 100644 --- a/src/agents/mcp-transport.ts +++ b/src/agents/mcp-transport.ts @@ -2,12 +2,12 @@ import { SSEClientTransport, type SSEClientTransportOptions, } from "@modelcontextprotocol/sdk/client/sse.js"; -import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; import type { FetchLike, Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; import { loadUndiciRuntimeDeps } from "../infra/net/undici-runtime.js"; import { logDebug } from "../logger.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; +import { OpenClawStdioClientTransport } from "./mcp-stdio-transport.js"; import { resolveMcpTransportConfig } from "./mcp-transport-config.js"; export type ResolvedMcpTransport = { @@ -18,7 +18,7 @@ export type ResolvedMcpTransport = { detachStderr?: () => void; }; -function attachStderrLogging(serverName: string, transport: StdioClientTransport) { +function attachStderrLogging(serverName: string, transport: OpenClawStdioClientTransport) { const stderr = transport.stderr; if (!stderr || typeof stderr.on !== "function") { return undefined; @@ -84,7 +84,7 @@ export function resolveMcpTransport( return null; } if (resolved.kind === "stdio") { - const transport = new StdioClientTransport({ + const transport = new OpenClawStdioClientTransport({ command: resolved.command, args: resolved.args, env: resolved.env, diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts index 5d6d9f190ae..08208697f4e 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts @@ -97,6 +97,9 @@ const acpManagerMocks = vi.hoisted(() => ({ const browserSessionTabMocks = vi.hoisted(() => ({ closeTrackedBrowserTabsForSessions: vi.fn(async () => 0), })); +const bundleMcpRuntimeMocks = vi.hoisted(() => ({ + disposeSessionMcpRuntime: vi.fn(async (_sessionId: string) => {}), +})); vi.mock("../auto-reply/reply/queue.js", async () => { const actual = await vi.importActual( @@ -202,6 +205,10 @@ vi.mock("../plugin-sdk/browser-maintenance.js", () => ({ movePathToTrash: vi.fn(async () => {}), })); +vi.mock("../agents/pi-bundle-mcp-tools.js", () => ({ + disposeSessionMcpRuntime: bundleMcpRuntimeMocks.disposeSessionMcpRuntime, +})); + installGatewayTestHooks({ scope: "suite" }); let harness: GatewayServerHarness; @@ -390,6 +397,8 @@ describe("gateway server sessions", () => { acpManagerMocks.closeSession.mockClear(); browserSessionTabMocks.closeTrackedBrowserTabsForSessions.mockClear(); browserSessionTabMocks.closeTrackedBrowserTabsForSessions.mockResolvedValue(0); + bundleMcpRuntimeMocks.disposeSessionMcpRuntime.mockClear(); + bundleMcpRuntimeMocks.disposeSessionMcpRuntime.mockResolvedValue(undefined); }); test("sessions.create stores dashboard session model and parent linkage, and creates a transcript", async () => { @@ -2379,6 +2388,7 @@ describe("gateway server sessions", () => { ["discord:group:dev", "agent:main:discord:group:dev", "sess-active"], "sess-active", ); + expect(bundleMcpRuntimeMocks.disposeSessionMcpRuntime).toHaveBeenCalledWith("sess-active"); expect(browserSessionTabMocks.closeTrackedBrowserTabsForSessions).toHaveBeenCalledTimes(1); expect(browserSessionTabMocks.closeTrackedBrowserTabsForSessions).toHaveBeenCalledWith({ sessionKeys: expect.arrayContaining([ diff --git a/src/gateway/session-reset-service.ts b/src/gateway/session-reset-service.ts index 6e485973b6b..3d17bda0be9 100644 --- a/src/gateway/session-reset-service.ts +++ b/src/gateway/session-reset-service.ts @@ -7,6 +7,7 @@ import { getAcpRuntimeBackend } from "../acp/runtime/registry.js"; import { readAcpSessionEntry, upsertAcpSessionMeta } from "../acp/runtime/session-meta.js"; import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js"; import { clearBootstrapSnapshot } from "../agents/bootstrap-cache.js"; +import { disposeSessionMcpRuntime } from "../agents/pi-bundle-mcp-tools.js"; import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../agents/pi-embedded.js"; import { stopSubagentsForRequester } from "../auto-reply/reply/abort.js"; import { clearSessionQueues } from "../auto-reply/reply/queue.js"; @@ -225,6 +226,11 @@ async function ensureSessionRuntimeCleanup(params: { const ended = await waitForEmbeddedPiRunEnd(params.sessionId, 15_000); clearBootstrapSnapshot(params.target.canonicalKey); if (ended) { + await disposeSessionMcpRuntime(params.sessionId).catch((error) => { + logVerbose( + `sessions cleanup: failed to dispose bundle MCP runtime for ${params.sessionId}: ${String(error)}`, + ); + }); await closeTrackedBrowserTabs(); return undefined; }