mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:40:44 +00:00
Fix three child-process stdin write paths that let async EPIPE errors escape to uncaughtException and crash the gateway. extensions/imessage/src/client.ts (the actual #75438 crash path): - Add child.stdin.on('error') listener in start() to catch async EPIPE and reject all pending requests via failAll(). - Add write callback to request() stdin.write() that rejects the specific pending request on error, instead of leaving it hanging until timeout. src/agents/mcp-stdio-transport.ts: - Fix write callback race in send(): previously resolved the promise immediately when write() returned true, then the write callback with EPIPE would fire after the promise was already fulfilled. Now always settles the promise from the write callback so the outcome is known before resolving. src/process/exec.ts: - Add stdin.on('error') before writing input so EPIPE from a prematurely-exited child is swallowed — the process exit handler reports the real status. One reporter observed a gateway crash after 10.5 hours of stable uptime — a single EPIPE on an iMessage RPC child process stdin write killed the gateway with code 1. Fixes: #75438
277 lines
7.6 KiB
TypeScript
277 lines
7.6 KiB
TypeScript
import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process";
|
|
import { createInterface, type Interface } from "node:readline";
|
|
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
|
|
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
|
|
import { normalizeLowercaseStringOrEmpty, resolveUserPath } from "openclaw/plugin-sdk/text-runtime";
|
|
import { DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS } from "./constants.js";
|
|
|
|
export type IMessageRpcError = {
|
|
code?: number;
|
|
message?: string;
|
|
data?: unknown;
|
|
};
|
|
|
|
export type IMessageRpcResponse<T> = {
|
|
jsonrpc?: string;
|
|
id?: string | number | null;
|
|
result?: T;
|
|
error?: IMessageRpcError;
|
|
method?: string;
|
|
params?: unknown;
|
|
};
|
|
|
|
export type IMessageRpcNotification = {
|
|
method: string;
|
|
params?: unknown;
|
|
};
|
|
|
|
export type IMessageRpcClientOptions = {
|
|
cliPath?: string;
|
|
dbPath?: string;
|
|
runtime?: RuntimeEnv;
|
|
onNotification?: (msg: IMessageRpcNotification) => void;
|
|
};
|
|
|
|
type PendingRequest = {
|
|
resolve: (value: unknown) => void;
|
|
reject: (error: Error) => void;
|
|
timer?: NodeJS.Timeout;
|
|
};
|
|
|
|
function isTestEnv(): boolean {
|
|
if (process.env.NODE_ENV === "test") {
|
|
return true;
|
|
}
|
|
const vitest = normalizeLowercaseStringOrEmpty(process.env.VITEST);
|
|
return Boolean(vitest);
|
|
}
|
|
|
|
export class IMessageRpcClient {
|
|
private readonly cliPath: string;
|
|
private readonly dbPath?: string;
|
|
private readonly runtime?: RuntimeEnv;
|
|
private readonly onNotification?: (msg: IMessageRpcNotification) => void;
|
|
private readonly pending = new Map<string, PendingRequest>();
|
|
private readonly closed: Promise<void>;
|
|
private closedResolve: (() => void) | null = null;
|
|
private child: ChildProcessWithoutNullStreams | null = null;
|
|
private reader: Interface | null = null;
|
|
private nextId = 1;
|
|
|
|
constructor(opts: IMessageRpcClientOptions = {}) {
|
|
this.cliPath = opts.cliPath?.trim() || "imsg";
|
|
this.dbPath = opts.dbPath?.trim() ? resolveUserPath(opts.dbPath) : undefined;
|
|
this.runtime = opts.runtime;
|
|
this.onNotification = opts.onNotification;
|
|
this.closed = new Promise((resolve) => {
|
|
this.closedResolve = resolve;
|
|
});
|
|
}
|
|
|
|
async start(): Promise<void> {
|
|
if (this.child) {
|
|
return;
|
|
}
|
|
if (isTestEnv()) {
|
|
throw new Error("Refusing to start imsg rpc in test environment; mock iMessage RPC client");
|
|
}
|
|
const args = ["rpc"];
|
|
if (this.dbPath) {
|
|
args.push("--db", this.dbPath);
|
|
}
|
|
const child = spawn(this.cliPath, args, {
|
|
stdio: ["pipe", "pipe", "pipe"],
|
|
});
|
|
this.child = child;
|
|
this.reader = createInterface({ input: child.stdout });
|
|
|
|
this.reader.on("line", (line) => {
|
|
const trimmed = line.trim();
|
|
if (!trimmed) {
|
|
return;
|
|
}
|
|
this.handleLine(trimmed);
|
|
});
|
|
|
|
child.stderr?.on("data", (chunk) => {
|
|
const lines = chunk.toString().split(/\r?\n/);
|
|
for (const line of lines) {
|
|
if (!line.trim()) {
|
|
continue;
|
|
}
|
|
this.runtime?.error?.(`imsg rpc: ${line.trim()}`);
|
|
}
|
|
});
|
|
|
|
child.on("error", (err) => {
|
|
this.failAll(err instanceof Error ? err : new Error(String(err)));
|
|
this.closedResolve?.();
|
|
});
|
|
|
|
// Without this listener, async EPIPE from a dead child crashes the
|
|
// gateway via uncaughtException. (#75438)
|
|
child.stdin.on("error", (err) => {
|
|
this.failAll(err instanceof Error ? err : new Error(String(err)));
|
|
});
|
|
|
|
child.on("close", (code, signal) => {
|
|
if (code !== 0 && code !== null) {
|
|
const reason = signal ? `signal ${signal}` : `code ${code}`;
|
|
this.failAll(new Error(`imsg rpc exited (${reason})`));
|
|
} else {
|
|
this.failAll(new Error("imsg rpc closed"));
|
|
}
|
|
this.closedResolve?.();
|
|
});
|
|
}
|
|
|
|
async stop(): Promise<void> {
|
|
if (!this.child) {
|
|
return;
|
|
}
|
|
this.reader?.close();
|
|
this.reader = null;
|
|
this.child.stdin?.end();
|
|
const child = this.child;
|
|
this.child = null;
|
|
|
|
await Promise.race([
|
|
this.closed,
|
|
new Promise<void>((resolve) => {
|
|
setTimeout(() => {
|
|
if (!child.killed) {
|
|
child.kill("SIGTERM");
|
|
}
|
|
resolve();
|
|
}, 500);
|
|
}),
|
|
]);
|
|
}
|
|
|
|
async waitForClose(): Promise<void> {
|
|
await this.closed;
|
|
}
|
|
|
|
async request<T = unknown>(
|
|
method: string,
|
|
params?: Record<string, unknown>,
|
|
opts?: { timeoutMs?: number },
|
|
): Promise<T> {
|
|
if (!this.child || !this.child.stdin) {
|
|
throw new Error("imsg rpc not running");
|
|
}
|
|
const id = this.nextId++;
|
|
const payload = {
|
|
jsonrpc: "2.0",
|
|
id,
|
|
method,
|
|
params: params ?? {},
|
|
};
|
|
const line = `${JSON.stringify(payload)}\n`;
|
|
const timeoutMs = opts?.timeoutMs ?? DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS;
|
|
|
|
const response = new Promise<T>((resolve, reject) => {
|
|
const key = String(id);
|
|
const timer =
|
|
timeoutMs > 0
|
|
? setTimeout(() => {
|
|
this.pending.delete(key);
|
|
reject(new Error(`imsg rpc timeout (${method})`));
|
|
}, timeoutMs)
|
|
: undefined;
|
|
this.pending.set(key, {
|
|
resolve: (value) => resolve(value as T),
|
|
reject,
|
|
timer,
|
|
});
|
|
});
|
|
|
|
// Reject the specific pending request on write error (e.g. EPIPE)
|
|
// instead of letting it hang until timeout. (#75438)
|
|
this.child.stdin.write(line, (err) => {
|
|
if (err) {
|
|
const key = String(id);
|
|
const pending = this.pending.get(key);
|
|
if (pending) {
|
|
if (pending.timer) {
|
|
clearTimeout(pending.timer);
|
|
}
|
|
this.pending.delete(key);
|
|
pending.reject(err instanceof Error ? err : new Error(String(err)));
|
|
}
|
|
}
|
|
});
|
|
return await response;
|
|
}
|
|
|
|
private handleLine(line: string) {
|
|
let parsed: IMessageRpcResponse<unknown>;
|
|
try {
|
|
parsed = JSON.parse(line) as IMessageRpcResponse<unknown>;
|
|
} catch (err) {
|
|
const detail = formatErrorMessage(err);
|
|
this.runtime?.error?.(`imsg rpc: failed to parse ${line}: ${detail}`);
|
|
return;
|
|
}
|
|
|
|
if (parsed.id !== undefined && parsed.id !== null) {
|
|
const key = String(parsed.id);
|
|
const pending = this.pending.get(key);
|
|
if (!pending) {
|
|
return;
|
|
}
|
|
if (pending.timer) {
|
|
clearTimeout(pending.timer);
|
|
}
|
|
this.pending.delete(key);
|
|
|
|
if (parsed.error) {
|
|
const baseMessage = parsed.error.message ?? "imsg rpc error";
|
|
const details = parsed.error.data;
|
|
const code = parsed.error.code;
|
|
const suffixes = [] as string[];
|
|
if (typeof code === "number") {
|
|
suffixes.push(`code=${code}`);
|
|
}
|
|
if (details !== undefined) {
|
|
const detailText =
|
|
typeof details === "string" ? details : JSON.stringify(details, null, 2);
|
|
if (detailText) {
|
|
suffixes.push(detailText);
|
|
}
|
|
}
|
|
const msg = suffixes.length > 0 ? `${baseMessage}: ${suffixes.join(" ")}` : baseMessage;
|
|
pending.reject(new Error(msg));
|
|
return;
|
|
}
|
|
pending.resolve(parsed.result);
|
|
return;
|
|
}
|
|
|
|
if (parsed.method) {
|
|
this.onNotification?.({
|
|
method: parsed.method,
|
|
params: parsed.params,
|
|
});
|
|
}
|
|
}
|
|
|
|
private failAll(err: Error) {
|
|
for (const [key, pending] of this.pending.entries()) {
|
|
if (pending.timer) {
|
|
clearTimeout(pending.timer);
|
|
}
|
|
pending.reject(err);
|
|
this.pending.delete(key);
|
|
}
|
|
}
|
|
}
|
|
|
|
export async function createIMessageRpcClient(
|
|
opts: IMessageRpcClientOptions = {},
|
|
): Promise<IMessageRpcClient> {
|
|
const client = new IMessageRpcClient(opts);
|
|
await client.start();
|
|
return client;
|
|
}
|