mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 18:10:45 +00:00
fix: handle EPIPE errors on child process stdin writes (#75602)
Fix three child-process stdin write paths that let async EPIPE errors escape to uncaughtException and crash the gateway. extensions/imessage/src/client.ts (the actual #75438 crash path): - Add child.stdin.on('error') listener in start() to catch async EPIPE and reject all pending requests via failAll(). - Add write callback to request() stdin.write() that rejects the specific pending request on error, instead of leaving it hanging until timeout. src/agents/mcp-stdio-transport.ts: - Fix write callback race in send(): previously resolved the promise immediately when write() returned true, then the write callback with EPIPE would fire after the promise was already fulfilled. Now always settles the promise from the write callback so the outcome is known before resolving. src/process/exec.ts: - Add stdin.on('error') before writing input so EPIPE from a prematurely-exited child is swallowed — the process exit handler reports the real status. One reporter observed a gateway crash after 10.5 hours of stable uptime — a single EPIPE on an iMessage RPC child process stdin write killed the gateway with code 1. Fixes: #75438
This commit is contained in:
@@ -35,6 +35,9 @@ Docs: https://docs.openclaw.ai
|
||||
- Plugins/runtime-deps: prune legacy version-scoped plugin runtime-deps roots during bundled dependency repair and cover the path in Package Acceptance's upgrade-survivor matrix, so upgrades from 2026.4.x no longer leave stale per-plugin runtime trees after doctor runs. Thanks @vincentkoc.
|
||||
- Plugins/runtime-deps: keep Gateway startup plugin imports and runtime plugin fallback loads verify-only after startup/config repair planning, so packaged installs no longer spawn package-manager repair from hot paths after readiness. Refs #75283 and #75069. Thanks @brokemac79 and @xiaohuaxi.
|
||||
- Plugins/runtime-deps: treat package.json runtime-deps manifests as supersets when generated materialization metadata is absent, so bundled plugin activation stops restaging already-installed dependency subsets on every activation. Fixes #75429. (#75431) Thanks @loyur.
|
||||
- iMessage: add stdin write callback and error listener to IMessageRpcClient so async EPIPE from a closed child process rejects the pending request instead of crashing the gateway with uncaughtException. Fixes #75438.
|
||||
- MCP/stdio: settle MCP stdio transport send() from the write callback instead of resolving immediately on buffer acceptance, so async write errors reject the promise instead of being lost. Refs #75438.
|
||||
- Process/exec: add stdin error listener in runCommandWithTimeout so EPIPE from a prematurely-exited child is swallowed instead of escaping to uncaughtException. Refs #75438.
|
||||
- Voice Call/realtime: add default-off fast memory/session context for `openclaw_agent_consult`, giving live calls a bounded answer-or-miss path before the full agent consult. Fixes #71849. Thanks @amzzzzzzz.
|
||||
- Google Meet: interrupt Realtime provider output when local barge-in clears playback, so command-pair audio stops model speech instead of only restarting Chrome playback. Fixes #73850. (#73834) Thanks @shhtheonlyperson.
|
||||
- Gateway/config: cap oversized plugin-owned schemas in the full `config.schema` response so large installed plugin sets cannot balloon Gateway RSS or crash schema clients. Thanks @vincentkoc.
|
||||
|
||||
@@ -108,6 +108,12 @@ export class IMessageRpcClient {
|
||||
this.closedResolve?.();
|
||||
});
|
||||
|
||||
// Without this listener, async EPIPE from a dead child crashes the
|
||||
// gateway via uncaughtException. (#75438)
|
||||
child.stdin.on("error", (err) => {
|
||||
this.failAll(err instanceof Error ? err : new Error(String(err)));
|
||||
});
|
||||
|
||||
child.on("close", (code, signal) => {
|
||||
if (code !== 0 && code !== null) {
|
||||
const reason = signal ? `signal ${signal}` : `code ${code}`;
|
||||
@@ -180,7 +186,21 @@ export class IMessageRpcClient {
|
||||
});
|
||||
});
|
||||
|
||||
this.child.stdin.write(line);
|
||||
// Reject the specific pending request on write error (e.g. EPIPE)
|
||||
// instead of letting it hang until timeout. (#75438)
|
||||
this.child.stdin.write(line, (err) => {
|
||||
if (err) {
|
||||
const key = String(id);
|
||||
const pending = this.pending.get(key);
|
||||
if (pending) {
|
||||
if (pending.timer) {
|
||||
clearTimeout(pending.timer);
|
||||
}
|
||||
this.pending.delete(key);
|
||||
pending.reject(err instanceof Error ? err : new Error(String(err)));
|
||||
}
|
||||
}
|
||||
});
|
||||
return await response;
|
||||
}
|
||||
|
||||
|
||||
@@ -137,4 +137,50 @@ describe("OpenClawStdioClientTransport", () => {
|
||||
result: { ok: true },
|
||||
});
|
||||
});
|
||||
|
||||
it("rejects send() with EPIPE when child stdin is closed (#75438)", async () => {
|
||||
const child = new MockChildProcess();
|
||||
const brokenStdin = new PassThrough();
|
||||
brokenStdin.write = (_chunk: unknown, cbOrEncoding?: unknown, cb?: unknown) => {
|
||||
const callback =
|
||||
typeof cbOrEncoding === "function" ? cbOrEncoding : typeof cb === "function" ? cb : null;
|
||||
const err = Object.assign(new Error("write EPIPE"), { code: "EPIPE" });
|
||||
if (callback) {
|
||||
(callback as (err: Error) => void)(err);
|
||||
}
|
||||
return false;
|
||||
};
|
||||
child.stdin = brokenStdin;
|
||||
spawnMock.mockReturnValue(child);
|
||||
const { OpenClawStdioClientTransport } = await import("./mcp-stdio-transport.js");
|
||||
|
||||
const transport = new OpenClawStdioClientTransport({ command: "npx" });
|
||||
const started = transport.start();
|
||||
child.emit("spawn");
|
||||
await started;
|
||||
|
||||
await expect(
|
||||
transport.send({ jsonrpc: "2.0", id: 2, method: "ping" }),
|
||||
).rejects.toThrow("EPIPE");
|
||||
});
|
||||
|
||||
it("rejects send() when stdin.write throws synchronously (#75438)", async () => {
|
||||
const child = new MockChildProcess();
|
||||
const brokenStdin = new PassThrough();
|
||||
brokenStdin.write = () => {
|
||||
throw Object.assign(new Error("write after end"), { code: "ERR_STREAM_DESTROYED" });
|
||||
};
|
||||
child.stdin = brokenStdin;
|
||||
spawnMock.mockReturnValue(child);
|
||||
const { OpenClawStdioClientTransport } = await import("./mcp-stdio-transport.js");
|
||||
|
||||
const transport = new OpenClawStdioClientTransport({ command: "npx" });
|
||||
const started = transport.start();
|
||||
child.emit("spawn");
|
||||
await started;
|
||||
|
||||
await expect(
|
||||
transport.send({ jsonrpc: "2.0", id: 3, method: "ping" }),
|
||||
).rejects.toThrow("write after end");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -131,16 +131,29 @@ export class OpenClawStdioClientTransport implements Transport {
|
||||
}
|
||||
|
||||
send(message: JSONRPCMessage): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
return new Promise((resolve, reject) => {
|
||||
const stdin = this.process?.stdin;
|
||||
if (!stdin) {
|
||||
throw new Error("Not connected");
|
||||
}
|
||||
const json = serializeMessage(message);
|
||||
if (stdin.write(json)) {
|
||||
resolve();
|
||||
} else {
|
||||
stdin.once("drain", resolve);
|
||||
// Settle from the write callback so async EPIPE rejects instead of
|
||||
// escaping to uncaughtException. (#75438)
|
||||
try {
|
||||
const flushed = stdin.write(json, (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
if (!flushed) {
|
||||
// Back-pressure: drain fires when the buffer empties, but the
|
||||
// write callback above still owns promise settlement.
|
||||
stdin.once("drain", () => {});
|
||||
}
|
||||
} catch (err) {
|
||||
reject(err instanceof Error ? err : new Error(String(err)));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -198,6 +198,22 @@ describe("runCommandWithTimeout", () => {
|
||||
expect(result.code).not.toBe(0);
|
||||
},
|
||||
);
|
||||
|
||||
it.runIf(process.platform !== "win32")(
|
||||
"swallows stdin EPIPE when child exits before input is consumed (#75438)",
|
||||
{ timeout: 5_000 },
|
||||
async () => {
|
||||
await loadExecModules();
|
||||
const result = await runCommandWithTimeout(
|
||||
[process.execPath, "-e", "process.exit(0)"],
|
||||
{
|
||||
timeoutMs: 3_000,
|
||||
input: "this input will EPIPE because the child ignores stdin\n",
|
||||
},
|
||||
);
|
||||
expect(result.code).toBe(0);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe("attachChildProcessBridge", () => {
|
||||
|
||||
@@ -357,6 +357,9 @@ export async function runCommandWithTimeout(
|
||||
armNoOutputTimer();
|
||||
|
||||
if (hasInput && child.stdin) {
|
||||
// Swallow EPIPE from a prematurely-exited child; the exit handler
|
||||
// reports the real status. (#75438)
|
||||
child.stdin.on("error", () => {});
|
||||
child.stdin.write(input ?? "");
|
||||
child.stdin.end();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user