Files
openclaw/extensions/imessage/src/client.ts
Andy Ye 7624b0d16d fix(imessage): surface Full Disk Access probe failures
Preserve canonical iMessage Full Disk Access probe failures through non-sensitive health snapshots and status output, promote imsg denial banners to the public remediation message, and add a narrow audit exception for the reviewed Mistral advisory false-positive.
2026-05-11 21:41:08 -07:00

303 lines
8.6 KiB
TypeScript

import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process";
import { createInterface, type Interface } from "node:readline";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/string-coerce-runtime";
import { resolveUserPath } from "openclaw/plugin-sdk/text-utility-runtime";
import { DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS } from "./constants.js";
export type IMessageRpcError = {
code?: number;
message?: string;
data?: unknown;
};
export type IMessageRpcResponse<T> = {
jsonrpc?: string;
id?: string | number | null;
result?: T;
error?: IMessageRpcError;
method?: string;
params?: unknown;
};
export type IMessageRpcNotification = {
method: string;
params?: unknown;
};
export type IMessageRpcClientOptions = {
cliPath?: string;
dbPath?: string;
runtime?: RuntimeEnv;
onNotification?: (msg: IMessageRpcNotification) => void;
};
type PendingRequest = {
resolve: (value: unknown) => void;
reject: (error: Error) => void;
timer?: NodeJS.Timeout;
};
export const PUBLIC_IMESSAGE_FULL_DISK_ACCESS_ERROR =
"imsg cannot access ~/Library/Messages/chat.db. Grant Full Disk Access to the Gateway/launcher process and restart Gateway.";
function isTestEnv(): boolean {
if (process.env.NODE_ENV === "test") {
return true;
}
const vitest = normalizeLowercaseStringOrEmpty(process.env.VITEST);
return Boolean(vitest);
}
export function normalizeIMessageFullDiskAccessError(message: string): string | undefined {
const normalized = normalizeLowercaseStringOrEmpty(message);
if (!normalized.includes("full disk access") || !normalized.includes("chat.db")) {
return undefined;
}
return PUBLIC_IMESSAGE_FULL_DISK_ACCESS_ERROR;
}
export class IMessageRpcClient {
private readonly cliPath: string;
private readonly dbPath?: string;
private readonly runtime?: RuntimeEnv;
private readonly onNotification?: (msg: IMessageRpcNotification) => void;
private readonly pending = new Map<string, PendingRequest>();
private readonly closed: Promise<void>;
private closedResolve: (() => void) | null = null;
private child: ChildProcessWithoutNullStreams | null = null;
private reader: Interface | null = null;
private nextId = 1;
private publicProcessError: string | null = null;
constructor(opts: IMessageRpcClientOptions = {}) {
this.cliPath = opts.cliPath?.trim() || "imsg";
this.dbPath = opts.dbPath?.trim() ? resolveUserPath(opts.dbPath) : undefined;
this.runtime = opts.runtime;
this.onNotification = opts.onNotification;
this.closed = new Promise((resolve) => {
this.closedResolve = resolve;
});
}
async start(): Promise<void> {
if (this.child) {
return;
}
if (isTestEnv()) {
throw new Error("Refusing to start imsg rpc in test environment; mock iMessage RPC client");
}
const args = ["rpc"];
if (this.dbPath) {
args.push("--db", this.dbPath);
}
const child = spawn(this.cliPath, args, {
stdio: ["pipe", "pipe", "pipe"],
});
this.child = child;
this.reader = createInterface({ input: child.stdout });
this.reader.on("line", (line) => {
const trimmed = line.trim();
if (!trimmed) {
return;
}
this.handleLine(trimmed);
});
child.stderr?.on("data", (chunk) => {
const lines = chunk.toString().split(/\r?\n/);
for (const line of lines) {
if (!line.trim()) {
continue;
}
const trimmed = line.trim();
this.recordProcessDiagnostic(trimmed);
this.runtime?.error?.(`imsg rpc: ${trimmed}`);
}
});
child.on("error", (err) => {
this.failAll(err instanceof Error ? err : new Error(String(err)));
this.closedResolve?.();
});
// Without this listener, async EPIPE from a dead child crashes the
// gateway via uncaughtException. (#75438)
child.stdin.on("error", (err) => {
this.failAll(err instanceof Error ? err : new Error(String(err)));
});
child.on("close", (code, signal) => {
this.failAll(this.buildCloseError(code, signal));
this.closedResolve?.();
});
}
async stop(): Promise<void> {
if (!this.child) {
return;
}
this.reader?.close();
this.reader = null;
this.child.stdin?.end();
const child = this.child;
this.child = null;
await Promise.race([
this.closed,
new Promise<void>((resolve) => {
setTimeout(() => {
if (!child.killed) {
child.kill("SIGTERM");
}
resolve();
}, 500);
}),
]);
}
async waitForClose(): Promise<void> {
await this.closed;
}
async request<T = unknown>(
method: string,
params?: Record<string, unknown>,
opts?: { timeoutMs?: number },
): Promise<T> {
if (!this.child || !this.child.stdin) {
throw new Error("imsg rpc not running");
}
const id = this.nextId++;
const payload = {
jsonrpc: "2.0",
id,
method,
params: params ?? {},
};
const line = `${JSON.stringify(payload)}\n`;
const timeoutMs = opts?.timeoutMs ?? DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS;
const response = new Promise<T>((resolve, reject) => {
const key = String(id);
const timer =
timeoutMs > 0
? setTimeout(() => {
this.pending.delete(key);
reject(new Error(`imsg rpc timeout (${method})`));
}, timeoutMs)
: undefined;
this.pending.set(key, {
resolve: (value) => resolve(value as T),
reject,
timer,
});
});
// Reject the specific pending request on write error (e.g. EPIPE)
// instead of letting it hang until timeout. (#75438)
this.child.stdin.write(line, (err) => {
if (err) {
const key = String(id);
const pending = this.pending.get(key);
if (pending) {
if (pending.timer) {
clearTimeout(pending.timer);
}
this.pending.delete(key);
pending.reject(err instanceof Error ? err : new Error(String(err)));
}
}
});
return await response;
}
private handleLine(line: string) {
let parsed: IMessageRpcResponse<unknown>;
try {
parsed = JSON.parse(line) as IMessageRpcResponse<unknown>;
} catch (err) {
this.recordProcessDiagnostic(line);
const detail = formatErrorMessage(err);
this.runtime?.error?.(`imsg rpc: failed to parse ${line}: ${detail}`);
return;
}
if (parsed.id !== undefined && parsed.id !== null) {
const key = String(parsed.id);
const pending = this.pending.get(key);
if (!pending) {
return;
}
if (pending.timer) {
clearTimeout(pending.timer);
}
this.pending.delete(key);
if (parsed.error) {
const baseMessage = parsed.error.message ?? "imsg rpc error";
const details = parsed.error.data;
const code = parsed.error.code;
const suffixes = [] as string[];
if (typeof code === "number") {
suffixes.push(`code=${code}`);
}
if (details !== undefined) {
const detailText =
typeof details === "string" ? details : JSON.stringify(details, null, 2);
if (detailText) {
suffixes.push(detailText);
}
}
const msg = suffixes.length > 0 ? `${baseMessage}: ${suffixes.join(" ")}` : baseMessage;
pending.reject(new Error(msg));
return;
}
pending.resolve(parsed.result);
return;
}
if (parsed.method) {
this.onNotification?.({
method: parsed.method,
params: parsed.params,
});
}
}
private recordProcessDiagnostic(line: string): void {
this.publicProcessError ??= normalizeIMessageFullDiskAccessError(line) ?? null;
}
private buildCloseError(code: number | null, signal: NodeJS.Signals | null): Error {
if (this.publicProcessError) {
return new Error(this.publicProcessError);
}
if (code !== 0 && code !== null) {
const reason = signal ? `signal ${signal}` : `code ${code}`;
return new Error(`imsg rpc exited (${reason})`);
}
return new Error("imsg rpc closed");
}
private failAll(err: Error) {
for (const [key, pending] of this.pending.entries()) {
if (pending.timer) {
clearTimeout(pending.timer);
}
pending.reject(err);
this.pending.delete(key);
}
}
}
export async function createIMessageRpcClient(
opts: IMessageRpcClientOptions = {},
): Promise<IMessageRpcClient> {
const client = new IMessageRpcClient(opts);
await client.start();
return client;
}