mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 09:10:45 +00:00
fix(imessage): retry watch.subscribe startup failures (#65482)
* fix(imessage): retry watch.subscribe startup failures * fix(imessage): sanitize watch error logging
This commit is contained in:
@@ -52,6 +52,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Memory/QMD: stop registering the legacy lowercase root memory file as a separate default collection, so QMD now prefers `MEMORY.md` and the `memory/` tree without duplicate collection-add warnings.
|
||||
- Memory/memory-core: watch the `memory` directory directly and ignore non-markdown churn so nested note changes still sync on macOS + Node 25 environments where recursive `memory/**/*.md` glob watching fails. (#64711) Thanks @jasonxargs-boop and @vincentkoc.
|
||||
- WhatsApp: centralize per-account connection ownership so reconnects, login recovery, and outbound readiness stay attached to the live socket instead of drifting across monitor and login paths. (#65290) Thanks @mcaxtr and @vincentkoc.
|
||||
- iMessage: retry transient `watch.subscribe` startup failures before tearing down the monitor, and sanitize startup error logging so brief local transport stalls do not immediately bounce the channel or leak raw imsg RPC payloads into logs. (#65393) Thanks @vincentkoc.
|
||||
|
||||
## 2026.4.11
|
||||
|
||||
@@ -227,6 +228,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Agents/locks: unregister the session write-lock `exit` cleanup handler during teardown so repeated lock lifecycle resets stop stacking process listeners in long-running gateway processes. (#65391) Thanks @adminfedres and @vincentkoc.
|
||||
- CLI/Claude: rename the trusted inbound metadata schema to `openclaw.inbound_meta.v2` so Claude CLI no longer trips Anthropic's blocked `openclaw.inbound_meta.v1` filter on channel-originated turns. (#65399) Thanks @SzyMig and @vincentkoc.
|
||||
- Agents/inbound metadata: strip NUL bytes from serialized inbound context blocks before they reach backend spawn args, so malformed message metadata cannot crash agent spawn with `ERR_INVALID_ARG_VALUE`. (#65389) Thanks @adminfedres and @vincentkoc.
|
||||
- iMessage: retry transient `watch.subscribe` startup failures before tearing down the monitor, so brief local transport stalls do not immediately bounce the channel. (#65393) Thanks @vincentkoc.
|
||||
|
||||
## 2026.4.9
|
||||
|
||||
|
||||
117
extensions/imessage/src/monitor.watch-subscribe-retry.test.ts
Normal file
117
extensions/imessage/src/monitor.watch-subscribe-retry.test.ts
Normal file
@@ -0,0 +1,117 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { monitorIMessageProvider } from "./monitor.js";
|
||||
|
||||
const waitForTransportReadyMock = vi.hoisted(() => vi.fn(async () => {}));
|
||||
const createIMessageRpcClientMock = vi.hoisted(() => vi.fn());
|
||||
const attachIMessageMonitorAbortHandlerMock = vi.hoisted(() => vi.fn(() => () => {}));
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/infra-runtime", () => ({
|
||||
waitForTransportReady: (...args: unknown[]) => waitForTransportReadyMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("./client.js", () => ({
|
||||
createIMessageRpcClient: (...args: unknown[]) => createIMessageRpcClientMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("./monitor/abort-handler.js", () => ({
|
||||
attachIMessageMonitorAbortHandler: (...args: unknown[]) =>
|
||||
attachIMessageMonitorAbortHandlerMock(...args),
|
||||
}));
|
||||
|
||||
function createRuntime() {
|
||||
return {
|
||||
log: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
function createRpcClient(overrides?: {
|
||||
request?: (method: string) => Promise<unknown>;
|
||||
waitForClose?: () => Promise<void>;
|
||||
}) {
|
||||
return {
|
||||
request: vi.fn(
|
||||
overrides?.request ??
|
||||
(async () => {
|
||||
return { subscription: 1 };
|
||||
}),
|
||||
),
|
||||
waitForClose: vi.fn(
|
||||
overrides?.waitForClose ??
|
||||
(async () => {
|
||||
return undefined;
|
||||
}),
|
||||
),
|
||||
stop: vi.fn(async () => {}),
|
||||
};
|
||||
}
|
||||
|
||||
describe("monitorIMessageProvider watch.subscribe startup retry", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
waitForTransportReadyMock.mockReset().mockResolvedValue(undefined);
|
||||
createIMessageRpcClientMock.mockReset();
|
||||
attachIMessageMonitorAbortHandlerMock.mockReset().mockReturnValue(() => {});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("retries a transient watch.subscribe startup timeout without tearing down the monitor", async () => {
|
||||
const runtime = createRuntime();
|
||||
const firstClient = createRpcClient({
|
||||
request: async () => {
|
||||
throw new Error("imsg rpc timeout (watch.subscribe)");
|
||||
},
|
||||
});
|
||||
const secondClient = createRpcClient();
|
||||
|
||||
createIMessageRpcClientMock
|
||||
.mockResolvedValueOnce(firstClient)
|
||||
.mockResolvedValueOnce(secondClient);
|
||||
|
||||
const monitorPromise = monitorIMessageProvider({
|
||||
config: { channels: { imessage: {} } } as never,
|
||||
runtime: runtime as never,
|
||||
});
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
await monitorPromise;
|
||||
|
||||
expect(createIMessageRpcClientMock).toHaveBeenCalledTimes(2);
|
||||
expect(firstClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(secondClient.waitForClose).toHaveBeenCalledTimes(1);
|
||||
expect(secondClient.stop).toHaveBeenCalledTimes(1);
|
||||
expect(runtime.log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("watch.subscribe startup failed"),
|
||||
);
|
||||
expect(runtime.error).not.toHaveBeenCalledWith(
|
||||
expect.stringContaining("imessage: monitor failed"),
|
||||
);
|
||||
});
|
||||
|
||||
it("still fails after bounded startup retries are exhausted", async () => {
|
||||
const runtime = createRuntime();
|
||||
createIMessageRpcClientMock.mockImplementation(async () =>
|
||||
createRpcClient({
|
||||
request: async () => {
|
||||
throw new Error("imsg rpc timeout (watch.subscribe)");
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
const monitorErrorPromise = monitorIMessageProvider({
|
||||
config: { channels: { imessage: {} } } as never,
|
||||
runtime: runtime as never,
|
||||
}).catch((error) => error);
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
const monitorError = await monitorErrorPromise;
|
||||
|
||||
expect(monitorError).toBeInstanceOf(Error);
|
||||
expect((monitorError as Error).message).toContain("imsg rpc timeout (watch.subscribe)");
|
||||
expect(createIMessageRpcClientMock).toHaveBeenCalledTimes(3);
|
||||
expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("imessage: monitor failed"));
|
||||
});
|
||||
});
|
||||
@@ -33,7 +33,7 @@ import { danger, logVerbose, shouldLogVerbose, warn } from "openclaw/plugin-sdk/
|
||||
import { resolvePinnedMainDmOwnerFromAllowlist } from "openclaw/plugin-sdk/security-runtime";
|
||||
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { resolveIMessageAccount } from "../accounts.js";
|
||||
import { createIMessageRpcClient } from "../client.js";
|
||||
import { createIMessageRpcClient, type IMessageRpcClient } from "../client.js";
|
||||
import { DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS } from "../constants.js";
|
||||
import {
|
||||
resolveIMessageAttachmentRoots,
|
||||
@@ -54,6 +54,10 @@ import { parseIMessageNotification } from "./parse-notification.js";
|
||||
import { normalizeAllowList, resolveRuntime } from "./runtime.js";
|
||||
import { createSelfChatCache } from "./self-chat-cache.js";
|
||||
import type { IMessagePayload, MonitorIMessageOpts } from "./types.js";
|
||||
import { sanitizeIMessageWatchErrorPayload } from "./watch-error-log.js";
|
||||
|
||||
const WATCH_SUBSCRIBE_MAX_ATTEMPTS = 3;
|
||||
const WATCH_SUBSCRIBE_RETRY_DELAY_MS = 1_000;
|
||||
|
||||
/**
|
||||
* Try to detect remote host from an SSH wrapper script like:
|
||||
@@ -83,6 +87,33 @@ async function detectRemoteHostFromCliPath(cliPath: string): Promise<string | un
|
||||
}
|
||||
}
|
||||
|
||||
function isRetriableWatchSubscribeStartupError(error: unknown): boolean {
|
||||
return /imsg rpc timeout \(watch\.subscribe\)|imsg rpc (closed|exited|not running)/i.test(
|
||||
String(error),
|
||||
);
|
||||
}
|
||||
|
||||
async function waitForWatchSubscribeRetryDelay(params: {
|
||||
ms: number;
|
||||
abortSignal?: AbortSignal;
|
||||
}): Promise<void> {
|
||||
if (params.ms <= 0) {
|
||||
return;
|
||||
}
|
||||
await new Promise<void>((resolve) => {
|
||||
const timer = setTimeout(() => {
|
||||
params.abortSignal?.removeEventListener("abort", onAbort);
|
||||
resolve();
|
||||
}, params.ms);
|
||||
const onAbort = () => {
|
||||
clearTimeout(timer);
|
||||
params.abortSignal?.removeEventListener("abort", onAbort);
|
||||
resolve();
|
||||
};
|
||||
params.abortSignal?.addEventListener("abort", onAbort, { once: true });
|
||||
});
|
||||
}
|
||||
|
||||
export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): Promise<void> {
|
||||
const runtime = resolveRuntime(opts);
|
||||
const cfg = opts.config ?? loadConfig();
|
||||
@@ -489,35 +520,90 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
const client = await createIMessageRpcClient({
|
||||
cliPath,
|
||||
dbPath,
|
||||
runtime,
|
||||
onNotification: (msg) => {
|
||||
if (msg.method === "message") {
|
||||
void handleMessage(msg.params).catch((err) => {
|
||||
runtime.error?.(`imessage: handler failed: ${String(err)}`);
|
||||
});
|
||||
} else if (msg.method === "error") {
|
||||
runtime.error?.(`imessage: watch error ${JSON.stringify(msg.params)}`);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
let subscriptionId: number | null = null;
|
||||
const abort = opts.abortSignal;
|
||||
const detachAbortHandler = attachIMessageMonitorAbortHandler({
|
||||
abortSignal: abort,
|
||||
client,
|
||||
getSubscriptionId: () => subscriptionId,
|
||||
});
|
||||
const createWatchClient = async () =>
|
||||
await createIMessageRpcClient({
|
||||
cliPath,
|
||||
dbPath,
|
||||
runtime,
|
||||
onNotification: (msg) => {
|
||||
if (msg.method === "message") {
|
||||
void handleMessage(msg.params).catch((err) => {
|
||||
runtime.error?.(`imessage: handler failed: ${String(err)}`);
|
||||
});
|
||||
} else if (msg.method === "error") {
|
||||
runtime.error?.(
|
||||
`imessage: watch error ${JSON.stringify(sanitizeIMessageWatchErrorPayload(msg.params))}`,
|
||||
);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
let client: IMessageRpcClient | null = null;
|
||||
let detachAbortHandler = () => {};
|
||||
|
||||
for (let attempt = 1; attempt <= WATCH_SUBSCRIBE_MAX_ATTEMPTS; attempt++) {
|
||||
if (abort?.aborted) {
|
||||
return;
|
||||
}
|
||||
let attemptClient: IMessageRpcClient | null = null;
|
||||
let attemptDetachAbortHandler = () => {};
|
||||
let keepAttemptClient = false;
|
||||
try {
|
||||
attemptClient = await createWatchClient();
|
||||
let attemptSubscriptionId: number | null = null;
|
||||
attemptDetachAbortHandler = attachIMessageMonitorAbortHandler({
|
||||
abortSignal: abort,
|
||||
client: attemptClient,
|
||||
getSubscriptionId: () => attemptSubscriptionId,
|
||||
});
|
||||
const result = await attemptClient.request<{ subscription?: number }>(
|
||||
"watch.subscribe",
|
||||
{
|
||||
attachments: includeAttachments,
|
||||
},
|
||||
{ timeoutMs: probeTimeoutMs },
|
||||
);
|
||||
attemptSubscriptionId = result?.subscription ?? null;
|
||||
client = attemptClient;
|
||||
detachAbortHandler = attemptDetachAbortHandler;
|
||||
keepAttemptClient = true;
|
||||
break;
|
||||
} catch (err) {
|
||||
if (abort?.aborted) {
|
||||
return;
|
||||
}
|
||||
const shouldRetry =
|
||||
attempt < WATCH_SUBSCRIBE_MAX_ATTEMPTS && isRetriableWatchSubscribeStartupError(err);
|
||||
if (!shouldRetry) {
|
||||
runtime.error?.(danger(`imessage: monitor failed: ${String(err)}`));
|
||||
throw err;
|
||||
}
|
||||
runtime.log?.(
|
||||
warn(
|
||||
`imessage: watch.subscribe startup failed (attempt ${attempt}/${WATCH_SUBSCRIBE_MAX_ATTEMPTS}): ${String(err)}; retrying`,
|
||||
),
|
||||
);
|
||||
await waitForWatchSubscribeRetryDelay({
|
||||
ms: WATCH_SUBSCRIBE_RETRY_DELAY_MS,
|
||||
abortSignal: abort,
|
||||
});
|
||||
if (abort?.aborted) {
|
||||
return;
|
||||
}
|
||||
} finally {
|
||||
if (!keepAttemptClient) {
|
||||
attemptDetachAbortHandler();
|
||||
await attemptClient?.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!client) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await client.request<{ subscription?: number }>("watch.subscribe", {
|
||||
attachments: includeAttachments,
|
||||
});
|
||||
subscriptionId = result?.subscription ?? null;
|
||||
await client.waitForClose();
|
||||
} catch (err) {
|
||||
if (abort?.aborted) {
|
||||
|
||||
30
extensions/imessage/src/monitor/watch-error-log.test.ts
Normal file
30
extensions/imessage/src/monitor/watch-error-log.test.ts
Normal file
@@ -0,0 +1,30 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { sanitizeIMessageWatchErrorPayload } from "./watch-error-log.js";
|
||||
|
||||
describe("sanitizeIMessageWatchErrorPayload", () => {
|
||||
it("keeps only code and a sanitized truncated message", () => {
|
||||
expect(
|
||||
sanitizeIMessageWatchErrorPayload({
|
||||
code: 500,
|
||||
message: `boom\n\t\u001b[2K${"x".repeat(250)}`,
|
||||
chatId: "chat-123",
|
||||
participants: ["+15555550123"],
|
||||
path: "/Users/me/Library/Messages/chat.db",
|
||||
}),
|
||||
).toEqual({
|
||||
code: 500,
|
||||
message: `boom\\n\\t${"x".repeat(191)}…`,
|
||||
});
|
||||
});
|
||||
|
||||
it("drops non-object payloads and unsupported fields", () => {
|
||||
expect(sanitizeIMessageWatchErrorPayload("boom")).toEqual({});
|
||||
expect(
|
||||
sanitizeIMessageWatchErrorPayload({
|
||||
code: Number.POSITIVE_INFINITY,
|
||||
message: 123,
|
||||
data: { sender: "+15555550123" },
|
||||
}),
|
||||
).toEqual({});
|
||||
});
|
||||
});
|
||||
38
extensions/imessage/src/monitor/watch-error-log.ts
Normal file
38
extensions/imessage/src/monitor/watch-error-log.ts
Normal file
@@ -0,0 +1,38 @@
|
||||
import {
|
||||
isRecord,
|
||||
sanitizeTerminalText,
|
||||
truncateUtf16Safe,
|
||||
} from "openclaw/plugin-sdk/text-runtime";
|
||||
|
||||
const MAX_WATCH_ERROR_MESSAGE_CHARS = 200;
|
||||
|
||||
export type SanitizedIMessageWatchErrorPayload = {
|
||||
code?: number;
|
||||
message?: string;
|
||||
};
|
||||
|
||||
export function sanitizeIMessageWatchErrorPayload(
|
||||
payload: unknown,
|
||||
): SanitizedIMessageWatchErrorPayload {
|
||||
if (!isRecord(payload)) {
|
||||
return {};
|
||||
}
|
||||
|
||||
const safe: SanitizedIMessageWatchErrorPayload = {};
|
||||
|
||||
if (typeof payload.code === "number" && Number.isFinite(payload.code)) {
|
||||
safe.code = payload.code;
|
||||
}
|
||||
|
||||
if (typeof payload.message === "string") {
|
||||
const sanitizedMessage = sanitizeTerminalText(payload.message);
|
||||
if (sanitizedMessage) {
|
||||
safe.message =
|
||||
sanitizedMessage.length > MAX_WATCH_ERROR_MESSAGE_CHARS
|
||||
? `${truncateUtf16Safe(sanitizedMessage, MAX_WATCH_ERROR_MESSAGE_CHARS - 1)}…`
|
||||
: sanitizedMessage;
|
||||
}
|
||||
}
|
||||
|
||||
return safe;
|
||||
}
|
||||
Reference in New Issue
Block a user