diff --git a/extensions/imessage/src/monitor.watch-subscribe-retry.test.ts b/extensions/imessage/src/monitor.watch-subscribe-retry.test.ts index 9812b16dc73..b90b0fb20b7 100644 --- a/extensions/imessage/src/monitor.watch-subscribe-retry.test.ts +++ b/extensions/imessage/src/monitor.watch-subscribe-retry.test.ts @@ -1,25 +1,27 @@ +import type { waitForTransportReady } from "openclaw/plugin-sdk/infra-runtime"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { createIMessageRpcClient } from "./client.js"; import { monitorIMessageProvider } from "./monitor.js"; +import type { attachIMessageMonitorAbortHandler } from "./monitor/abort-handler.js"; const waitForTransportReadyMock = vi.hoisted(() => - vi.fn<(...args: unknown[]) => Promise>(async () => {}), + vi.fn(async () => {}), ); -const createIMessageRpcClientMock = vi.hoisted(() => vi.fn<(...args: unknown[]) => unknown>()); +const createIMessageRpcClientMock = vi.hoisted(() => vi.fn()); const attachIMessageMonitorAbortHandlerMock = vi.hoisted(() => - vi.fn<(...args: unknown[]) => () => void>(() => () => {}), + vi.fn(() => () => {}), ); vi.mock("openclaw/plugin-sdk/infra-runtime", () => ({ - waitForTransportReady: (...args: unknown[]) => waitForTransportReadyMock(...args), + waitForTransportReady: waitForTransportReadyMock, })); vi.mock("./client.js", () => ({ - createIMessageRpcClient: (...args: unknown[]) => createIMessageRpcClientMock(...args), + createIMessageRpcClient: createIMessageRpcClientMock, })); vi.mock("./monitor/abort-handler.js", () => ({ - attachIMessageMonitorAbortHandler: (...args: unknown[]) => - attachIMessageMonitorAbortHandlerMock(...args), + attachIMessageMonitorAbortHandler: attachIMessageMonitorAbortHandlerMock, })); function createRuntime() { diff --git a/extensions/imessage/src/monitor/monitor-provider.ts b/extensions/imessage/src/monitor/monitor-provider.ts index 3467a65ea86..4669d42aec1 100644 --- a/extensions/imessage/src/monitor/monitor-provider.ts +++ b/extensions/imessage/src/monitor/monitor-provider.ts @@ -548,6 +548,15 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P }, }); + const requireWatchClient = ( + watchClient: IMessageRpcClient | null | undefined, + ): IMessageRpcClient => { + if (!watchClient) { + throw new Error("imessage monitor client not initialized"); + } + return watchClient; + }; + for (let attempt = 1; attempt <= WATCH_SUBSCRIBE_MAX_ATTEMPTS; attempt++) { if (abort?.aborted) { return; @@ -556,7 +565,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P let attemptDetachAbortHandler = () => {}; let keepAttemptClient = false; try { - attemptClient = await createWatchClient(); + attemptClient = requireWatchClient(await createWatchClient()); let attemptSubscriptionId: number | null = null; attemptDetachAbortHandler = attachIMessageMonitorAbortHandler({ abortSignal: abort, @@ -590,6 +599,12 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P `imessage: watch.subscribe startup failed (attempt ${attempt}/${WATCH_SUBSCRIBE_MAX_ATTEMPTS}): ${String(err)}; retrying`, ), ); + // Tear down the failed client before waiting so a slow subscribe attempt + // cannot keep emitting notifications into the next retry window. + attemptDetachAbortHandler(); + attemptDetachAbortHandler = () => {}; + await attemptClient?.stop(); + attemptClient = undefined; await waitForWatchSubscribeRetryDelay({ ms: WATCH_SUBSCRIBE_RETRY_DELAY_MS, abortSignal: abort, @@ -605,12 +620,13 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P } } - if (!client) { + const activeClient = client; + if (!activeClient) { return; } try { - await client.waitForClose(); + await activeClient.waitForClose(); } catch (err) { if (abort?.aborted) { return; @@ -619,7 +635,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P throw err; } finally { detachAbortHandler(); - await client.stop(); + await activeClient.stop(); } }