From e1a7c5b86030273db0fa59b11b67992276206f6e Mon Sep 17 00:00:00 2001 From: Alex Knight Date: Fri, 1 May 2026 21:45:12 +1000 Subject: [PATCH] fix: handle EPIPE errors on child process stdin writes (#75602) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix three child-process stdin write paths that let async EPIPE errors escape to uncaughtException and crash the gateway. extensions/imessage/src/client.ts (the actual #75438 crash path): - Add child.stdin.on('error') listener in start() to catch async EPIPE and reject all pending requests via failAll(). - Add write callback to request() stdin.write() that rejects the specific pending request on error, instead of leaving it hanging until timeout. src/agents/mcp-stdio-transport.ts: - Fix write callback race in send(): previously resolved the promise immediately when write() returned true, then the write callback with EPIPE would fire after the promise was already fulfilled. Now always settles the promise from the write callback so the outcome is known before resolving. src/process/exec.ts: - Add stdin.on('error') before writing input so EPIPE from a prematurely-exited child is swallowed — the process exit handler reports the real status. One reporter observed a gateway crash after 10.5 hours of stable uptime — a single EPIPE on an iMessage RPC child process stdin write killed the gateway with code 1. Fixes: #75438 --- CHANGELOG.md | 3 ++ extensions/imessage/src/client.ts | 22 +++++++++++- src/agents/mcp-stdio-transport.test.ts | 46 ++++++++++++++++++++++++++ src/agents/mcp-stdio-transport.ts | 23 ++++++++++--- src/process/exec.test.ts | 16 +++++++++ src/process/exec.ts | 3 ++ 6 files changed, 107 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a2f2b24a6b..f73999b2c28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,9 @@ Docs: https://docs.openclaw.ai - Plugins/runtime-deps: prune legacy version-scoped plugin runtime-deps roots during bundled dependency repair and cover the path in Package Acceptance's upgrade-survivor matrix, so upgrades from 2026.4.x no longer leave stale per-plugin runtime trees after doctor runs. Thanks @vincentkoc. - Plugins/runtime-deps: keep Gateway startup plugin imports and runtime plugin fallback loads verify-only after startup/config repair planning, so packaged installs no longer spawn package-manager repair from hot paths after readiness. Refs #75283 and #75069. Thanks @brokemac79 and @xiaohuaxi. - Plugins/runtime-deps: treat package.json runtime-deps manifests as supersets when generated materialization metadata is absent, so bundled plugin activation stops restaging already-installed dependency subsets on every activation. Fixes #75429. (#75431) Thanks @loyur. +- iMessage: add stdin write callback and error listener to IMessageRpcClient so async EPIPE from a closed child process rejects the pending request instead of crashing the gateway with uncaughtException. Fixes #75438. +- MCP/stdio: settle MCP stdio transport send() from the write callback instead of resolving immediately on buffer acceptance, so async write errors reject the promise instead of being lost. Refs #75438. +- Process/exec: add stdin error listener in runCommandWithTimeout so EPIPE from a prematurely-exited child is swallowed instead of escaping to uncaughtException. Refs #75438. - Voice Call/realtime: add default-off fast memory/session context for `openclaw_agent_consult`, giving live calls a bounded answer-or-miss path before the full agent consult. Fixes #71849. Thanks @amzzzzzzz. - Google Meet: interrupt Realtime provider output when local barge-in clears playback, so command-pair audio stops model speech instead of only restarting Chrome playback. Fixes #73850. (#73834) Thanks @shhtheonlyperson. - Gateway/config: cap oversized plugin-owned schemas in the full `config.schema` response so large installed plugin sets cannot balloon Gateway RSS or crash schema clients. Thanks @vincentkoc. diff --git a/extensions/imessage/src/client.ts b/extensions/imessage/src/client.ts index e1282c0360c..500c310a567 100644 --- a/extensions/imessage/src/client.ts +++ b/extensions/imessage/src/client.ts @@ -108,6 +108,12 @@ export class IMessageRpcClient { this.closedResolve?.(); }); + // Without this listener, async EPIPE from a dead child crashes the + // gateway via uncaughtException. (#75438) + child.stdin.on("error", (err) => { + this.failAll(err instanceof Error ? err : new Error(String(err))); + }); + child.on("close", (code, signal) => { if (code !== 0 && code !== null) { const reason = signal ? `signal ${signal}` : `code ${code}`; @@ -180,7 +186,21 @@ export class IMessageRpcClient { }); }); - this.child.stdin.write(line); + // Reject the specific pending request on write error (e.g. EPIPE) + // instead of letting it hang until timeout. (#75438) + this.child.stdin.write(line, (err) => { + if (err) { + const key = String(id); + const pending = this.pending.get(key); + if (pending) { + if (pending.timer) { + clearTimeout(pending.timer); + } + this.pending.delete(key); + pending.reject(err instanceof Error ? err : new Error(String(err))); + } + } + }); return await response; } diff --git a/src/agents/mcp-stdio-transport.test.ts b/src/agents/mcp-stdio-transport.test.ts index 2eebd52ba5f..c30df8e00e0 100644 --- a/src/agents/mcp-stdio-transport.test.ts +++ b/src/agents/mcp-stdio-transport.test.ts @@ -137,4 +137,50 @@ describe("OpenClawStdioClientTransport", () => { result: { ok: true }, }); }); + + it("rejects send() with EPIPE when child stdin is closed (#75438)", async () => { + const child = new MockChildProcess(); + const brokenStdin = new PassThrough(); + brokenStdin.write = (_chunk: unknown, cbOrEncoding?: unknown, cb?: unknown) => { + const callback = + typeof cbOrEncoding === "function" ? cbOrEncoding : typeof cb === "function" ? cb : null; + const err = Object.assign(new Error("write EPIPE"), { code: "EPIPE" }); + if (callback) { + (callback as (err: Error) => void)(err); + } + return false; + }; + child.stdin = brokenStdin; + spawnMock.mockReturnValue(child); + const { OpenClawStdioClientTransport } = await import("./mcp-stdio-transport.js"); + + const transport = new OpenClawStdioClientTransport({ command: "npx" }); + const started = transport.start(); + child.emit("spawn"); + await started; + + await expect( + transport.send({ jsonrpc: "2.0", id: 2, method: "ping" }), + ).rejects.toThrow("EPIPE"); + }); + + it("rejects send() when stdin.write throws synchronously (#75438)", async () => { + const child = new MockChildProcess(); + const brokenStdin = new PassThrough(); + brokenStdin.write = () => { + throw Object.assign(new Error("write after end"), { code: "ERR_STREAM_DESTROYED" }); + }; + child.stdin = brokenStdin; + spawnMock.mockReturnValue(child); + const { OpenClawStdioClientTransport } = await import("./mcp-stdio-transport.js"); + + const transport = new OpenClawStdioClientTransport({ command: "npx" }); + const started = transport.start(); + child.emit("spawn"); + await started; + + await expect( + transport.send({ jsonrpc: "2.0", id: 3, method: "ping" }), + ).rejects.toThrow("write after end"); + }); }); diff --git a/src/agents/mcp-stdio-transport.ts b/src/agents/mcp-stdio-transport.ts index 5ff242bd28b..d99a7ac5aed 100644 --- a/src/agents/mcp-stdio-transport.ts +++ b/src/agents/mcp-stdio-transport.ts @@ -131,16 +131,29 @@ export class OpenClawStdioClientTransport implements Transport { } send(message: JSONRPCMessage): Promise { - return new Promise((resolve) => { + return new Promise((resolve, reject) => { const stdin = this.process?.stdin; if (!stdin) { throw new Error("Not connected"); } const json = serializeMessage(message); - if (stdin.write(json)) { - resolve(); - } else { - stdin.once("drain", resolve); + // Settle from the write callback so async EPIPE rejects instead of + // escaping to uncaughtException. (#75438) + try { + const flushed = stdin.write(json, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + if (!flushed) { + // Back-pressure: drain fires when the buffer empties, but the + // write callback above still owns promise settlement. + stdin.once("drain", () => {}); + } + } catch (err) { + reject(err instanceof Error ? err : new Error(String(err))); } }); } diff --git a/src/process/exec.test.ts b/src/process/exec.test.ts index aec3299548c..0232057ddaf 100644 --- a/src/process/exec.test.ts +++ b/src/process/exec.test.ts @@ -198,6 +198,22 @@ describe("runCommandWithTimeout", () => { expect(result.code).not.toBe(0); }, ); + + it.runIf(process.platform !== "win32")( + "swallows stdin EPIPE when child exits before input is consumed (#75438)", + { timeout: 5_000 }, + async () => { + await loadExecModules(); + const result = await runCommandWithTimeout( + [process.execPath, "-e", "process.exit(0)"], + { + timeoutMs: 3_000, + input: "this input will EPIPE because the child ignores stdin\n", + }, + ); + expect(result.code).toBe(0); + }, + ); }); describe("attachChildProcessBridge", () => { diff --git a/src/process/exec.ts b/src/process/exec.ts index e5cd7faee8d..c0ed7128900 100644 --- a/src/process/exec.ts +++ b/src/process/exec.ts @@ -357,6 +357,9 @@ export async function runCommandWithTimeout( armNoOutputTimer(); if (hasInput && child.stdin) { + // Swallow EPIPE from a prematurely-exited child; the exit handler + // reports the real status. (#75438) + child.stdin.on("error", () => {}); child.stdin.write(input ?? ""); child.stdin.end(); }