fix(imessage): repair monitor retry type checks

This commit is contained in:
Vincent Koc
2026-04-12 19:57:37 +01:00
parent e4841d767d
commit ea71a59127
2 changed files with 29 additions and 11 deletions

View File

@@ -1,25 +1,27 @@
import type { waitForTransportReady } from "openclaw/plugin-sdk/infra-runtime";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { createIMessageRpcClient } from "./client.js";
import { monitorIMessageProvider } from "./monitor.js";
import type { attachIMessageMonitorAbortHandler } from "./monitor/abort-handler.js";
const waitForTransportReadyMock = vi.hoisted(() =>
vi.fn<(...args: unknown[]) => Promise<void>>(async () => {}),
vi.fn<typeof waitForTransportReady>(async () => {}),
);
const createIMessageRpcClientMock = vi.hoisted(() => vi.fn<(...args: unknown[]) => unknown>());
const createIMessageRpcClientMock = vi.hoisted(() => vi.fn<typeof createIMessageRpcClient>());
const attachIMessageMonitorAbortHandlerMock = vi.hoisted(() =>
vi.fn<(...args: unknown[]) => () => void>(() => () => {}),
vi.fn<typeof attachIMessageMonitorAbortHandler>(() => () => {}),
);
vi.mock("openclaw/plugin-sdk/infra-runtime", () => ({
waitForTransportReady: (...args: unknown[]) => waitForTransportReadyMock(...args),
waitForTransportReady: waitForTransportReadyMock,
}));
vi.mock("./client.js", () => ({
createIMessageRpcClient: (...args: unknown[]) => createIMessageRpcClientMock(...args),
createIMessageRpcClient: createIMessageRpcClientMock,
}));
vi.mock("./monitor/abort-handler.js", () => ({
attachIMessageMonitorAbortHandler: (...args: unknown[]) =>
attachIMessageMonitorAbortHandlerMock(...args),
attachIMessageMonitorAbortHandler: attachIMessageMonitorAbortHandlerMock,
}));
function createRuntime() {

View File

@@ -548,6 +548,15 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
},
});
const requireWatchClient = (
watchClient: IMessageRpcClient | null | undefined,
): IMessageRpcClient => {
if (!watchClient) {
throw new Error("imessage monitor client not initialized");
}
return watchClient;
};
for (let attempt = 1; attempt <= WATCH_SUBSCRIBE_MAX_ATTEMPTS; attempt++) {
if (abort?.aborted) {
return;
@@ -556,7 +565,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
let attemptDetachAbortHandler = () => {};
let keepAttemptClient = false;
try {
attemptClient = await createWatchClient();
attemptClient = requireWatchClient(await createWatchClient());
let attemptSubscriptionId: number | null = null;
attemptDetachAbortHandler = attachIMessageMonitorAbortHandler({
abortSignal: abort,
@@ -590,6 +599,12 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
`imessage: watch.subscribe startup failed (attempt ${attempt}/${WATCH_SUBSCRIBE_MAX_ATTEMPTS}): ${String(err)}; retrying`,
),
);
// Tear down the failed client before waiting so a slow subscribe attempt
// cannot keep emitting notifications into the next retry window.
attemptDetachAbortHandler();
attemptDetachAbortHandler = () => {};
await attemptClient?.stop();
attemptClient = undefined;
await waitForWatchSubscribeRetryDelay({
ms: WATCH_SUBSCRIBE_RETRY_DELAY_MS,
abortSignal: abort,
@@ -605,12 +620,13 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
}
}
if (!client) {
const activeClient = client;
if (!activeClient) {
return;
}
try {
await client.waitForClose();
await activeClient.waitForClose();
} catch (err) {
if (abort?.aborted) {
return;
@@ -619,7 +635,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
throw err;
} finally {
detachAbortHandler();
await client.stop();
await activeClient.stop();
}
}