diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cfa308854f..9998a9b5d0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai - Auto-reply/NO_REPLY: strip glued leading `NO_REPLY` tokens before reply normalization and ACP-visible streaming so silent sentinel text no longer leaks into user-visible replies while preserving substantive `NO_REPLY ...` text. Thanks @frankekn. - Gateway/sessions: clear auto-fallback-pinned model overrides on `/reset` and `/new` while still preserving explicit user model selections, including legacy sessions created before override-source tracking existed. (#63155) Thanks @frankekn. - Codex CLI: pass OpenClaw's system prompt through Codex's `model_instructions_file` config override so fresh Codex CLI sessions receive the same prompt guidance as Claude CLI sessions. +- Matrix/gateway: wait for Matrix sync readiness before marking startup successful, keep Matrix background handler failures contained, and route fatal Matrix sync stops through channel-level restart handling instead of crashing the whole gateway. (#62779) Thanks @gumadeiras. ## 2026.4.8 @@ -40,6 +41,8 @@ Docs: https://docs.openclaw.ai - Slack/actions: pass the already resolved read token into `downloadFile` so SecretRef-backed bot tokens no longer fail after a raw config re-read. (#62097) Thanks @martingarramon. - Network/fetch guard: skip target DNS pinning when trusted env-proxy mode is active so proxy-only sandboxes can let the trusted proxy resolve outbound hosts. (#59007) Thanks @cluster2600. +## 2026.4.7-1 + ## 2026.4.7 ### Changes diff --git a/extensions/matrix/src/channel.ts b/extensions/matrix/src/channel.ts index 0b03aa2547e..2e28612e0c7 100644 --- a/extensions/matrix/src/channel.ts +++ b/extensions/matrix/src/channel.ts @@ -496,6 +496,7 @@ export const matrixPlugin: ChannelPlugin = initialSyncLimit: account.config.initialSyncLimit, replyToMode: account.config.replyToMode, accountId: account.accountId, + setStatus: ctx.setStatus, }); }, }, diff --git a/extensions/matrix/src/matrix/client/shared.test.ts b/extensions/matrix/src/matrix/client/shared.test.ts index 0df17821f6a..158cba4a8e5 100644 --- a/extensions/matrix/src/matrix/client/shared.test.ts +++ b/extensions/matrix/src/matrix/client/shared.test.ts @@ -233,6 +233,86 @@ describe("resolveSharedMatrixClient", () => { ).rejects.toThrow("Matrix shared client account mismatch"); }); + it("lets a later waiter abort while shared startup continues for the owner", async () => { + const mainAuth = authFor("main"); + let resolveStartup: (() => void) | undefined; + const mainClient = { + ...createMockClient("main"), + start: vi.fn( + async () => + await new Promise((resolve) => { + resolveStartup = resolve; + }), + ), + }; + + resolveMatrixAuthMock.mockResolvedValue(mainAuth); + createMatrixClientMock.mockResolvedValue(mainClient); + + const ownerPromise = resolveSharedMatrixClient({ accountId: "main" }); + await vi.waitFor(() => { + expect(mainClient.start).toHaveBeenCalledTimes(1); + expect(resolveStartup).toEqual(expect.any(Function)); + }); + + const abortController = new AbortController(); + const canceledWaiter = resolveSharedMatrixClient({ + accountId: "main", + abortSignal: abortController.signal, + }); + abortController.abort(); + + await expect(canceledWaiter).rejects.toMatchObject({ + message: "Matrix startup aborted", + name: "AbortError", + }); + + resolveStartup?.(); + await expect(ownerPromise).resolves.toBe(mainClient); + }); + + it("keeps the shared startup lock while an aborted waiter exits early", async () => { + const mainAuth = authFor("main"); + let resolveStartup: (() => void) | undefined; + const mainClient = { + ...createMockClient("main"), + start: vi.fn( + async () => + await new Promise((resolve) => { + resolveStartup = resolve; + }), + ), + }; + + resolveMatrixAuthMock.mockResolvedValue(mainAuth); + createMatrixClientMock.mockResolvedValue(mainClient); + + const ownerPromise = resolveSharedMatrixClient({ accountId: "main" }); + await vi.waitFor(() => { + expect(mainClient.start).toHaveBeenCalledTimes(1); + expect(resolveStartup).toEqual(expect.any(Function)); + }); + + const abortController = new AbortController(); + const abortedWaiter = resolveSharedMatrixClient({ + accountId: "main", + abortSignal: abortController.signal, + }); + abortController.abort(); + await expect(abortedWaiter).rejects.toMatchObject({ + message: "Matrix startup aborted", + name: "AbortError", + }); + + const followerPromise = resolveSharedMatrixClient({ accountId: "main" }); + expect(mainClient.start).toHaveBeenCalledTimes(1); + + resolveStartup?.(); + await expect(ownerPromise).resolves.toBe(mainClient); + await expect(followerPromise).resolves.toBe(mainClient); + expect(mainClient.start).toHaveBeenCalledTimes(1); + }); + it("recreates the shared client when dispatcherPolicy changes", async () => { const firstAuth = { ...authFor("main"), diff --git a/extensions/matrix/src/matrix/client/shared.ts b/extensions/matrix/src/matrix/client/shared.ts index 5573a60888d..ff13bb0e6c9 100644 --- a/extensions/matrix/src/matrix/client/shared.ts +++ b/extensions/matrix/src/matrix/client/shared.ts @@ -2,6 +2,7 @@ import { normalizeOptionalAccountId } from "openclaw/plugin-sdk/account-id"; import type { CoreConfig } from "../../types.js"; import type { MatrixClient } from "../sdk.js"; import { LogService } from "../sdk/logger.js"; +import { awaitMatrixStartupWithAbort } from "../startup-abort.js"; import { resolveMatrixAuth, resolveMatrixAuthContext } from "./config.js"; import type { MatrixAuth } from "./types.js"; @@ -91,19 +92,22 @@ function deleteSharedClientState(state: SharedMatrixClientState): void { async function ensureSharedClientStarted(params: { state: SharedMatrixClientState; - timeoutMs?: number; - initialSyncLimit?: number; encryption?: boolean; + abortSignal?: AbortSignal; }): Promise { + const waitForStart = async (startPromise: Promise) => { + await awaitMatrixStartupWithAbort(startPromise, params.abortSignal); + }; + if (params.state.started) { return; } if (params.state.startPromise) { - await params.state.startPromise; + await waitForStart(params.state.startPromise); return; } - params.state.startPromise = (async () => { + const startPromise = (async () => { const client = params.state.client; // Initialize crypto if enabled @@ -119,15 +123,19 @@ async function ensureSharedClientStarted(params: { } } - await client.start(); + await client.start({ abortSignal: params.abortSignal }); params.state.started = true; })(); + // Keep the shared startup lock until the underlying start fully settles, even + // if one waiter aborts early while another caller still owns the startup. + const guardedStart = startPromise.finally(() => { + if (params.state.startPromise === guardedStart) { + params.state.startPromise = null; + } + }); + params.state.startPromise = guardedStart; - try { - await params.state.startPromise; - } finally { - params.state.startPromise = null; - } + await waitForStart(guardedStart); } async function resolveSharedMatrixClientState( @@ -138,6 +146,7 @@ async function resolveSharedMatrixClientState( auth?: MatrixAuth; startClient?: boolean; accountId?: string | null; + abortSignal?: AbortSignal; } = {}, ): Promise { const requestedAccountId = normalizeOptionalAccountId(params.accountId); @@ -168,9 +177,8 @@ async function resolveSharedMatrixClientState( if (shouldStart) { await ensureSharedClientStarted({ state: existingState, - timeoutMs: params.timeoutMs, - initialSyncLimit: auth.initialSyncLimit, encryption: auth.encryption, + abortSignal: params.abortSignal, }); } return existingState; @@ -182,9 +190,8 @@ async function resolveSharedMatrixClientState( if (shouldStart) { await ensureSharedClientStarted({ state: pending, - timeoutMs: params.timeoutMs, - initialSyncLimit: auth.initialSyncLimit, encryption: auth.encryption, + abortSignal: params.abortSignal, }); } return pending; @@ -202,9 +209,8 @@ async function resolveSharedMatrixClientState( if (shouldStart) { await ensureSharedClientStarted({ state: created, - timeoutMs: params.timeoutMs, - initialSyncLimit: auth.initialSyncLimit, encryption: auth.encryption, + abortSignal: params.abortSignal, }); } return created; @@ -221,6 +227,7 @@ export async function resolveSharedMatrixClient( auth?: MatrixAuth; startClient?: boolean; accountId?: string | null; + abortSignal?: AbortSignal; } = {}, ): Promise { const state = await resolveSharedMatrixClientState(params); @@ -235,6 +242,7 @@ export async function acquireSharedMatrixClient( auth?: MatrixAuth; startClient?: boolean; accountId?: string | null; + abortSignal?: AbortSignal; } = {}, ): Promise { const state = await resolveSharedMatrixClientState(params); diff --git a/extensions/matrix/src/matrix/monitor/events.ts b/extensions/matrix/src/matrix/monitor/events.ts index 258c4ebcfb0..9e5b531f5ba 100644 --- a/extensions/matrix/src/matrix/monitor/events.ts +++ b/extensions/matrix/src/matrix/monitor/events.ts @@ -49,6 +49,7 @@ export function registerMatrixMonitorEvents(params: { logger: RuntimeLogger; formatNativeDependencyHint: PluginRuntime["system"]["formatNativeDependencyHint"]; onRoomMessage: (roomId: string, event: MatrixRawEvent) => void | Promise; + runDetachedTask?: (label: string, task: () => Promise) => Promise; }): void { const { cfg, @@ -65,6 +66,7 @@ export function registerMatrixMonitorEvents(params: { logger, formatNativeDependencyHint, onRoomMessage, + runDetachedTask, } = params; const { routeVerificationEvent, routeVerificationSummary } = createMatrixVerificationEventRouter({ client, @@ -75,11 +77,27 @@ export function registerMatrixMonitorEvents(params: { logVerboseMessage, }); + const runMonitorTask = (label: string, task: () => Promise) => { + if (runDetachedTask) { + return runDetachedTask(label, task); + } + return Promise.resolve() + .then(task) + .catch((error) => { + logVerboseMessage(`matrix: ${label} failed (${String(error)})`); + }); + }; + client.on("room.message", (roomId: string, event: MatrixRawEvent) => { if (routeVerificationEvent(roomId, event)) { return; } - void onRoomMessage(roomId, event); + void runMonitorTask( + `room message handler room=${roomId} id=${event.event_id ?? "unknown"}`, + async () => { + await onRoomMessage(roomId, event); + }, + ); }); client.on("room.encrypted_event", (roomId: string, event: MatrixRawEvent) => { @@ -121,7 +139,9 @@ export function registerMatrixMonitorEvents(params: { ); client.on("verification.summary", (summary) => { - void routeVerificationSummary(summary); + void runMonitorTask("verification summary handler", async () => { + await routeVerificationSummary(summary); + }); }); client.on("room.invite", (roomId: string, event: MatrixRawEvent) => { @@ -179,7 +199,12 @@ export function registerMatrixMonitorEvents(params: { ); } if (eventType === EventType.Reaction) { - void onRoomMessage(roomId, event); + void runMonitorTask( + `reaction handler room=${roomId} id=${event.event_id ?? "unknown"}`, + async () => { + await onRoomMessage(roomId, event); + }, + ); return; } diff --git a/extensions/matrix/src/matrix/monitor/index.test.ts b/extensions/matrix/src/matrix/monitor/index.test.ts index d4ac4c8153e..887f021a2c0 100644 --- a/extensions/matrix/src/matrix/monitor/index.test.ts +++ b/extensions/matrix/src/matrix/monitor/index.test.ts @@ -12,6 +12,34 @@ type DirectRoomTrackerOptions = { }; const hoisted = vi.hoisted(() => { + const createEmitter = () => { + const listeners = new Map void>>(); + return { + on(event: string, listener: (...args: unknown[]) => void) { + let bucket = listeners.get(event); + if (!bucket) { + bucket = new Set(); + listeners.set(event, bucket); + } + bucket.add(listener); + return this; + }, + off(event: string, listener: (...args: unknown[]) => void) { + listeners.get(event)?.delete(listener); + return this; + }, + emit(event: string, ...args: unknown[]) { + for (const listener of listeners.get(event) ?? []) { + listener(...args); + } + return true; + }, + removeAllListeners() { + listeners.clear(); + return this; + }, + }; + }; const callOrder: string[] = []; const state = { startClientError: null as Error | null, @@ -26,12 +54,12 @@ const hoisted = vi.hoisted(() => { flush: vi.fn(async () => undefined), stop: vi.fn(async () => undefined), }; - const client = { + const client = Object.assign(createEmitter(), { id: "matrix-client", hasPersistedSyncState: vi.fn(() => false), stopSyncWithoutPersist: vi.fn(), drainPendingDecryptions: vi.fn(async () => undefined), - }; + }); const createMatrixRoomMessageHandler = vi.fn(() => vi.fn()); const createDirectRoomTracker = vi.fn((_client: unknown, _opts?: DirectRoomTrackerOptions) => ({ isDirectMessage: vi.fn(async () => false), @@ -55,9 +83,27 @@ const hoisted = vi.hoisted(() => { }; const stopThreadBindingManager = vi.fn(); const releaseSharedClientInstance = vi.fn(async () => true); + const resolveSharedMatrixClient = vi.fn(async (params: { startClient?: boolean }) => { + if (params.startClient === false) { + callOrder.push("prepare-client"); + return client; + } + if (!callOrder.includes("create-manager")) { + throw new Error("Matrix client started before thread bindings were registered"); + } + if (state.startClientError) { + throw state.startClientError; + } + callOrder.push("start-client"); + return client; + }); const setActiveMatrixClient = vi.fn(); const setMatrixRuntime = vi.fn(); const backfillMatrixAuthDeviceIdAfterStartup = vi.fn(async () => undefined); + const runMatrixStartupMaintenance = vi.fn< + (params: { abortSignal?: AbortSignal }) => Promise + >(async () => undefined); + const setStatus = vi.fn(); return { backfillMatrixAuthDeviceIdAfterStartup, callOrder, @@ -71,9 +117,12 @@ const hoisted = vi.hoisted(() => { logger, registeredOnRoomMessage: null as null | ((roomId: string, event: unknown) => Promise), releaseSharedClientInstance, + resolveSharedMatrixClient, resolveTextChunkLimit, + runMatrixStartupMaintenance, setActiveMatrixClient, setMatrixRuntime, + setStatus, state, stopThreadBindingManager, }; @@ -237,20 +286,7 @@ vi.mock("../client.js", () => ({ resolveMatrixAuthContext: vi.fn(() => ({ accountId: "default", })), - resolveSharedMatrixClient: vi.fn(async (params: { startClient?: boolean }) => { - if (params.startClient === false) { - hoisted.callOrder.push("prepare-client"); - return hoisted.client; - } - if (!hoisted.callOrder.includes("create-manager")) { - throw new Error("Matrix client started before thread bindings were registered"); - } - if (hoisted.state.startClientError) { - throw hoisted.state.startClientError; - } - hoisted.callOrder.push("start-client"); - return hoisted.client; - }), + resolveSharedMatrixClient: hoisted.resolveSharedMatrixClient, })); vi.mock("../client/shared.js", () => ({ @@ -300,9 +336,17 @@ vi.mock("./direct.js", () => ({ vi.mock("./events.js", () => ({ registerMatrixMonitorEvents: vi.fn( - (params: { onRoomMessage: (roomId: string, event: unknown) => Promise }) => { + (params: { + onRoomMessage: (roomId: string, event: unknown) => Promise; + runDetachedTask?: (label: string, task: () => Promise) => Promise; + }) => { hoisted.callOrder.push("register-events"); - hoisted.registeredOnRoomMessage = params.onRoomMessage; + hoisted.registeredOnRoomMessage = (roomId: string, event: unknown) => + params.runDetachedTask + ? params.runDetachedTask("test room message", async () => { + await params.onRoomMessage(roomId, event); + }) + : params.onRoomMessage(roomId, event); }, ), })); @@ -330,6 +374,10 @@ vi.mock("./startup-verification.js", () => ({ ensureMatrixStartupVerification: vi.fn(), })); +vi.mock("./startup.js", () => ({ + runMatrixStartupMaintenance: hoisted.runMatrixStartupMaintenance, +})); + let monitorMatrixProvider: typeof import("./index.js").monitorMatrixProvider; describe("monitorMatrixProvider", () => { @@ -353,6 +401,22 @@ describe("monitorMatrixProvider", () => { delete (hoisted.accountConfig as { rooms?: Record }).rooms; hoisted.resolveTextChunkLimit.mockReset().mockReturnValue(4000); hoisted.releaseSharedClientInstance.mockReset().mockResolvedValue(true); + hoisted.resolveSharedMatrixClient + .mockReset() + .mockImplementation(async (params: { startClient?: boolean }) => { + if (params.startClient === false) { + hoisted.callOrder.push("prepare-client"); + return hoisted.client; + } + if (!hoisted.callOrder.includes("create-manager")) { + throw new Error("Matrix client started before thread bindings were registered"); + } + if (hoisted.state.startClientError) { + throw hoisted.state.startClientError; + } + hoisted.callOrder.push("start-client"); + return hoisted.client; + }); hoisted.createDirectRoomTracker.mockReset().mockReturnValue({ isDirectMessage: vi.fn(async () => false), }); @@ -365,6 +429,7 @@ describe("monitorMatrixProvider", () => { hoisted.registeredOnRoomMessage = null; hoisted.setActiveMatrixClient.mockReset(); hoisted.stopThreadBindingManager.mockReset(); + hoisted.client.removeAllListeners(); hoisted.client.hasPersistedSyncState.mockReset().mockReturnValue(false); hoisted.client.stopSyncWithoutPersist.mockReset(); hoisted.client.drainPendingDecryptions.mockReset().mockResolvedValue(undefined); @@ -374,7 +439,9 @@ describe("monitorMatrixProvider", () => { hoisted.inboundDeduper.flush.mockReset().mockResolvedValue(undefined); hoisted.inboundDeduper.stop.mockReset().mockResolvedValue(undefined); hoisted.backfillMatrixAuthDeviceIdAfterStartup.mockReset().mockResolvedValue(undefined); + hoisted.runMatrixStartupMaintenance.mockReset().mockResolvedValue(undefined); hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn()); + hoisted.setStatus.mockReset(); Object.values(hoisted.logger).forEach((mock) => mock.mockReset()); }); @@ -390,6 +457,178 @@ describe("monitorMatrixProvider", () => { expect(hoisted.setActiveMatrixClient).not.toHaveBeenCalled(); }); + it("publishes disconnected startup status and connected sync status without failing the monitor", async () => { + const abortController = new AbortController(); + const monitorPromise = monitorMatrixProvider({ + abortSignal: abortController.signal, + setStatus: hoisted.setStatus, + }); + + await vi.waitFor(() => { + expect(hoisted.callOrder).toContain("start-client"); + }); + + expect(hoisted.setStatus).toHaveBeenCalledWith( + expect.objectContaining({ + accountId: "default", + baseUrl: "https://matrix.example.org", + connected: false, + healthState: "starting", + }), + ); + + hoisted.client.emit("sync.state", "SYNCING", "RECONNECTING", undefined); + + await vi.waitFor(() => { + expect(hoisted.setStatus).toHaveBeenCalledWith( + expect.objectContaining({ + accountId: "default", + connected: true, + healthState: "healthy", + lastError: null, + }), + ); + }); + + abortController.abort(); + await expect(monitorPromise).resolves.toBeUndefined(); + }); + + it("contains room-message handler rejections inside monitor task tracking", async () => { + const abortController = new AbortController(); + const unhandled: unknown[] = []; + const onUnhandled = (reason: unknown) => { + unhandled.push(reason); + }; + + hoisted.createMatrixRoomMessageHandler.mockReturnValue( + vi.fn(async () => { + throw new Error("room handler exploded"); + }), + ); + + process.on("unhandledRejection", onUnhandled); + try { + const monitorPromise = monitorMatrixProvider({ abortSignal: abortController.signal }); + await vi.waitFor(() => { + expect(hoisted.callOrder).toContain("start-client"); + }); + + const onRoomMessage = hoisted.registeredOnRoomMessage; + if (!onRoomMessage) { + throw new Error("expected room message handler to be registered"); + } + + await onRoomMessage("!room:example.org", { event_id: "$event" }); + await Promise.resolve(); + + expect(unhandled).toHaveLength(0); + expect(hoisted.logger.warn).toHaveBeenCalledWith( + "matrix background task failed", + expect.objectContaining({ + task: "test room message", + error: "Error: room handler exploded", + }), + ); + + abortController.abort(); + await monitorPromise; + } finally { + process.off("unhandledRejection", onUnhandled); + } + }); + + it("fails the channel task when Matrix sync emits an unexpected fatal error", async () => { + const abortController = new AbortController(); + const monitorPromise = monitorMatrixProvider({ + abortSignal: abortController.signal, + setStatus: hoisted.setStatus, + }); + + await vi.waitFor(() => { + expect(hoisted.callOrder).toContain("start-client"); + }); + + hoisted.client.emit("sync.unexpected_error", new Error("sync exploded")); + + await expect(monitorPromise).rejects.toThrow("sync exploded"); + expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledWith(hoisted.client, "persist"); + expect(hoisted.setStatus).toHaveBeenCalledWith( + expect.objectContaining({ + accountId: "default", + connected: false, + healthState: "error", + lastError: "sync exploded", + }), + ); + }); + + it("aborts stalled startup promptly and releases the shared client without persist", async () => { + const abortController = new AbortController(); + hoisted.resolveSharedMatrixClient.mockImplementation( + async (params: { startClient?: boolean; abortSignal?: AbortSignal }) => { + if (params.startClient === false) { + hoisted.callOrder.push("prepare-client"); + return hoisted.client; + } + hoisted.callOrder.push("start-client"); + return await new Promise((_resolve, reject) => { + params.abortSignal?.addEventListener( + "abort", + () => { + const error = new Error("Matrix startup aborted"); + error.name = "AbortError"; + reject(error); + }, + { once: true }, + ); + }); + }, + ); + + const monitorPromise = monitorMatrixProvider({ abortSignal: abortController.signal }); + + await vi.waitFor(() => { + expect(hoisted.callOrder).toContain("start-client"); + }); + + abortController.abort(); + + await expect(monitorPromise).resolves.toBeUndefined(); + expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledWith(hoisted.client, "stop"); + expect(hoisted.client.drainPendingDecryptions).not.toHaveBeenCalled(); + }); + + it("aborts during startup maintenance and releases the shared client without persist", async () => { + const abortController = new AbortController(); + hoisted.runMatrixStartupMaintenance.mockImplementation( + async (params: { abortSignal?: AbortSignal }) => + await new Promise((_resolve, reject) => { + params.abortSignal?.addEventListener( + "abort", + () => { + const error = new Error("Matrix startup aborted"); + error.name = "AbortError"; + reject(error); + }, + { once: true }, + ); + }), + ); + + const monitorPromise = monitorMatrixProvider({ abortSignal: abortController.signal }); + + await vi.waitFor(() => { + expect(hoisted.runMatrixStartupMaintenance).toHaveBeenCalledTimes(1); + }); + + abortController.abort(); + + await expect(monitorPromise).resolves.toBeUndefined(); + expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledWith(hoisted.client, "stop"); + expect(hoisted.client.drainPendingDecryptions).not.toHaveBeenCalled(); + }); + it("registers Matrix thread bindings before starting the client", async () => { await startMonitorAndAbortAfterStartup(); diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 13b3918ed11..e15174e10a6 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -1,5 +1,6 @@ import { format } from "node:util"; import { CHANNEL_APPROVAL_NATIVE_RUNTIME_CONTEXT_CAPABILITY } from "openclaw/plugin-sdk/approval-handler-adapter-runtime"; +import { waitUntilAbort } from "openclaw/plugin-sdk/channel-lifecycle"; import { registerChannelRuntimeContext } from "openclaw/plugin-sdk/channel-runtime-context"; import { GROUP_POLICY_BLOCKED_LABEL, @@ -23,6 +24,7 @@ import { resolveSharedMatrixClient, } from "../client.js"; import { releaseSharedClientInstance } from "../client/shared.js"; +import { isMatrixStartupAbortError } from "../startup-abort.js"; import { createMatrixThreadBindingManager } from "../thread-bindings.js"; import { registerMatrixAutoJoin } from "./auto-join.js"; import { resolveMatrixMonitorConfig } from "./config.js"; @@ -33,6 +35,9 @@ import { createMatrixInboundEventDeduper } from "./inbound-dedupe.js"; import { shouldPromoteRecentInviteRoom } from "./recent-invite.js"; import { createMatrixRoomInfoResolver } from "./room-info.js"; import { runMatrixStartupMaintenance } from "./startup.js"; +import { createMatrixMonitorStatusController } from "./status.js"; +import { createMatrixMonitorSyncLifecycle } from "./sync-lifecycle.js"; +import { createMatrixMonitorTaskRunner } from "./task-runner.js"; export type MonitorMatrixOpts = { runtime?: RuntimeEnv; @@ -42,6 +47,7 @@ export type MonitorMatrixOpts = { initialSyncLimit?: number; replyToMode?: ReplyToMode; accountId?: string | null; + setStatus?: (next: import("openclaw/plugin-sdk/channel-contract").ChannelAccountSnapshot) => void; }; const DEFAULT_MEDIA_MAX_MB = 20; @@ -140,6 +146,11 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi resolvedInitialSyncLimit === auth.initialSyncLimit ? auth : { ...auth, initialSyncLimit: resolvedInitialSyncLimit }; + const statusController = createMatrixMonitorStatusController({ + accountId: auth.accountId, + baseUrl: auth.homeserver, + statusSink: opts.setStatus, + }); const client = await resolveSharedMatrixClient({ cfg, auth: authWithLimit, @@ -153,25 +164,32 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi auth, env: process.env, }); - const inFlightRoomMessages = new Set>(); - const waitForInFlightRoomMessages = async () => { - while (inFlightRoomMessages.size > 0) { - await Promise.allSettled(Array.from(inFlightRoomMessages)); - } - }; - const cleanup = async () => { + const monitorTaskRunner = createMatrixMonitorTaskRunner({ + logger, + logVerboseMessage, + }); + const syncLifecycle = createMatrixMonitorSyncLifecycle({ + client, + statusController, + isStopping: () => cleanedUp || opts.abortSignal?.aborted === true, + }); + const cleanup = async (mode: "persist" | "stop" = "persist") => { if (cleanedUp) { return; } cleanedUp = true; try { client.stopSyncWithoutPersist(); - await client.drainPendingDecryptions("matrix monitor shutdown"); - await waitForInFlightRoomMessages(); + if (mode === "persist") { + await client.drainPendingDecryptions("matrix monitor shutdown"); + await monitorTaskRunner.waitForIdle(); + } threadBindingManager?.stop(); await inboundDeduper.stop(); - await releaseSharedClientInstance(client, "persist"); + await releaseSharedClientInstance(client, mode); } finally { + syncLifecycle.dispose(); + statusController.markStopped(); setActiveMatrixClient(null, auth.accountId); } }; @@ -294,13 +312,6 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi getMemberDisplayName, needsRoomAliasesForConfig, }); - const trackRoomMessage = (roomId: string, event: Parameters[1]) => { - const task = Promise.resolve(handleRoomMessage(roomId, event)).finally(() => { - inFlightRoomMessages.delete(task); - }); - inFlightRoomMessages.add(task); - return task; - }; try { threadBindingManager = await createMatrixThreadBindingManager({ @@ -337,7 +348,8 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi warnedCryptoMissingRooms, logger, formatNativeDependencyHint: core.system.formatNativeDependencyHint, - onRoomMessage: trackRoomMessage, + onRoomMessage: handleRoomMessage, + runDetachedTask: monitorTaskRunner.runDetachedTask, }); // Register Matrix thread bindings before the client starts syncing so threaded @@ -347,6 +359,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi cfg, auth: authWithLimit, accountId: auth.accountId, + abortSignal: opts.abortSignal, }); logVerboseMessage("matrix: client started"); @@ -383,10 +396,11 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi writeConfigFile: async (nextCfg) => await core.config.writeConfigFile(nextCfg), loadWebMedia: async (url, maxBytes) => await core.media.loadWebMedia(url, maxBytes), env: process.env, + abortSignal: opts.abortSignal, }); - await new Promise((resolve) => { - const stopAndResolve = async () => { + await Promise.race([ + waitUntilAbort(opts.abortSignal, async () => { try { logVerboseMessage("matrix: stopping client"); await cleanup(); @@ -394,23 +408,15 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi logger.warn("matrix: failed during monitor shutdown cleanup", { error: String(err), }); - } finally { - resolve(); } - }; - if (opts.abortSignal?.aborted) { - void stopAndResolve(); - return; - } - opts.abortSignal?.addEventListener( - "abort", - () => { - void stopAndResolve(); - }, - { once: true }, - ); - }); + }), + syncLifecycle.waitForFatalStop(), + ]); } catch (err) { + if (opts.abortSignal?.aborted === true && isMatrixStartupAbortError(err)) { + await cleanup("stop"); + return; + } await cleanup(); throw err; } diff --git a/extensions/matrix/src/matrix/monitor/startup.test.ts b/extensions/matrix/src/matrix/monitor/startup.test.ts index e73d771da1c..6f97912eefc 100644 --- a/extensions/matrix/src/matrix/monitor/startup.test.ts +++ b/extensions/matrix/src/matrix/monitor/startup.test.ts @@ -133,6 +133,7 @@ describe("runMatrixStartupMaintenance", () => { contentType: "image/png", fileName: "avatar.png", })), + abortSignal: undefined, env: {}, }; } @@ -235,4 +236,22 @@ describe("runMatrixStartupMaintenance", () => { { error: "boom" }, ); }); + + it("aborts maintenance before later startup steps continue", async () => { + const params = createParams(); + params.auth.encryption = true; + const abortController = new AbortController(); + params.abortSignal = abortController.signal; + vi.mocked(deps.syncMatrixOwnProfile).mockImplementation(async () => { + abortController.abort(); + return createProfileSyncResult(); + }); + + await expect(runMatrixStartupMaintenance(params, deps)).rejects.toMatchObject({ + message: "Matrix startup aborted", + name: "AbortError", + }); + expect(deps.ensureMatrixStartupVerification).not.toHaveBeenCalled(); + expect(deps.maybeRestoreLegacyMatrixBackup).not.toHaveBeenCalled(); + }); }); diff --git a/extensions/matrix/src/matrix/monitor/startup.ts b/extensions/matrix/src/matrix/monitor/startup.ts index cb888aa7f7e..78a1a1e8ecd 100644 --- a/extensions/matrix/src/matrix/monitor/startup.ts +++ b/extensions/matrix/src/matrix/monitor/startup.ts @@ -2,6 +2,7 @@ import type { RuntimeLogger } from "../../runtime-api.js"; import type { CoreConfig, MatrixConfig } from "../../types.js"; import type { MatrixAuth } from "../client.js"; import type { MatrixClient } from "../sdk.js"; +import { isMatrixStartupAbortError, throwIfMatrixStartupAborted } from "../startup-abort.js"; type MatrixStartupClient = Pick< MatrixClient, @@ -66,10 +67,12 @@ export async function runMatrixStartupMaintenance( maxBytes: number, ) => Promise<{ buffer: Buffer; contentType?: string; fileName?: string }>; env?: NodeJS.ProcessEnv; + abortSignal?: AbortSignal; }, deps?: MatrixStartupMaintenanceDeps, ): Promise { const runtimeDeps = deps ?? (await loadMatrixStartupMaintenanceDeps()); + throwIfMatrixStartupAborted(params.abortSignal); try { const profileSync = await runtimeDeps.syncMatrixOwnProfile({ client: params.client, @@ -78,6 +81,7 @@ export async function runMatrixStartupMaintenance( avatarUrl: params.accountConfig.avatarUrl, loadAvatarFromUrl: async (url, maxBytes) => await params.loadWebMedia(url, maxBytes), }); + throwIfMatrixStartupAborted(params.abortSignal); if (profileSync.displayNameUpdated) { params.logger.info(`matrix: profile display name updated for ${params.auth.userId}`); } @@ -94,11 +98,15 @@ export async function runMatrixStartupMaintenance( avatarUrl: profileSync.resolvedAvatarUrl, }); await params.writeConfigFile(updatedCfg as never); + throwIfMatrixStartupAborted(params.abortSignal); params.logVerboseMessage( `matrix: persisted converted avatar URL for account ${params.accountId} (${profileSync.resolvedAvatarUrl})`, ); } } catch (err) { + if (isMatrixStartupAbortError(err)) { + throw err; + } params.logger.warn("matrix: failed to sync profile from config", { error: String(err) }); } @@ -107,6 +115,7 @@ export async function runMatrixStartupMaintenance( } try { + throwIfMatrixStartupAborted(params.abortSignal); const deviceHealth = runtimeDeps.summarizeMatrixDeviceHealth( await params.client.listOwnDevices(), ); @@ -116,18 +125,23 @@ export async function runMatrixStartupMaintenance( ); } } catch (err) { + if (isMatrixStartupAbortError(err)) { + throw err; + } params.logger.debug?.("Failed to inspect matrix device hygiene (non-fatal)", { error: String(err), }); } try { + throwIfMatrixStartupAborted(params.abortSignal); const startupVerification = await runtimeDeps.ensureMatrixStartupVerification({ client: params.client, auth: params.auth, accountConfig: params.accountConfig, env: params.env, }); + throwIfMatrixStartupAborted(params.abortSignal); if (startupVerification.kind === "verified") { params.logger.info("matrix: device is verified by its owner and ready for encrypted rooms"); } else if ( @@ -158,17 +172,22 @@ export async function runMatrixStartupMaintenance( ); } } catch (err) { + if (isMatrixStartupAbortError(err)) { + throw err; + } params.logger.debug?.("Failed to resolve matrix verification status (non-fatal)", { error: String(err), }); } try { + throwIfMatrixStartupAborted(params.abortSignal); const legacyCryptoRestore = await runtimeDeps.maybeRestoreLegacyMatrixBackup({ client: params.client, auth: params.auth, env: params.env, }); + throwIfMatrixStartupAborted(params.abortSignal); if (legacyCryptoRestore.kind === "restored") { params.logger.info( `matrix: restored ${legacyCryptoRestore.imported}/${legacyCryptoRestore.total} room key(s) from legacy encrypted-state backup`, @@ -189,6 +208,9 @@ export async function runMatrixStartupMaintenance( } } } catch (err) { + if (isMatrixStartupAbortError(err)) { + throw err; + } params.logger.warn("matrix: failed restoring legacy encrypted-state backup", { error: String(err), }); diff --git a/extensions/matrix/src/matrix/monitor/status.ts b/extensions/matrix/src/matrix/monitor/status.ts new file mode 100644 index 00000000000..6e5630a845e --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/status.ts @@ -0,0 +1,111 @@ +import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract"; +import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime"; +import { formatMatrixErrorMessage } from "../errors.js"; +import { + isMatrixDisconnectedSyncState, + isMatrixReadySyncState, + type MatrixSyncState, +} from "../sync-state.js"; + +type MatrixMonitorStatusSink = (patch: ChannelAccountSnapshot) => void; + +function cloneLastDisconnect( + value: ChannelAccountSnapshot["lastDisconnect"], +): ChannelAccountSnapshot["lastDisconnect"] { + if (!value || typeof value === "string") { + return value ?? null; + } + return { ...value }; +} + +function formatSyncError(error: unknown): string | null { + if (!error) { + return null; + } + if (error instanceof Error) { + return error.message || error.name || "unknown"; + } + return formatMatrixErrorMessage(error); +} + +export type MatrixMonitorStatusController = ReturnType; + +export function createMatrixMonitorStatusController(params: { + accountId: string; + baseUrl?: string; + statusSink?: MatrixMonitorStatusSink; +}) { + const status: ChannelAccountSnapshot = { + accountId: params.accountId, + ...(params.baseUrl ? { baseUrl: params.baseUrl } : {}), + connected: false, + lastConnectedAt: null, + lastDisconnect: null, + lastError: null, + healthState: "starting", + }; + + const emit = () => { + params.statusSink?.({ + ...status, + lastDisconnect: cloneLastDisconnect(status.lastDisconnect), + }); + }; + + const noteConnected = (at = Date.now()) => { + if (status.connected === true) { + status.lastEventAt = at; + } else { + Object.assign(status, createConnectedChannelStatusPatch(at)); + } + status.lastError = null; + status.lastDisconnect = null; + status.healthState = "healthy"; + emit(); + }; + + const noteDisconnected = (params: { state: MatrixSyncState; at?: number; error?: unknown }) => { + const at = params.at ?? Date.now(); + const error = formatSyncError(params.error); + status.connected = false; + status.lastEventAt = at; + status.lastDisconnect = { + at, + ...(error ? { error } : {}), + }; + status.lastError = error; + status.healthState = params.state.toLowerCase(); + emit(); + }; + + emit(); + + return { + noteSyncState(state: MatrixSyncState, error?: unknown, at = Date.now()) { + if (isMatrixReadySyncState(state)) { + noteConnected(at); + return; + } + if (isMatrixDisconnectedSyncState(state)) { + noteDisconnected({ state, at, error }); + return; + } + // Unknown future SDK states inherit the current connectivity bit until the + // SDK classifies them as ready or disconnected. Avoid guessing here. + status.lastEventAt = at; + status.healthState = state.toLowerCase(); + emit(); + }, + noteUnexpectedError(error: unknown, at = Date.now()) { + noteDisconnected({ state: "ERROR", at, error }); + }, + markStopped(at = Date.now()) { + status.connected = false; + status.lastEventAt = at; + if (status.healthState !== "error") { + status.healthState = "stopped"; + } + emit(); + }, + }; +} diff --git a/extensions/matrix/src/matrix/monitor/sync-lifecycle.test.ts b/extensions/matrix/src/matrix/monitor/sync-lifecycle.test.ts new file mode 100644 index 00000000000..76818528374 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/sync-lifecycle.test.ts @@ -0,0 +1,224 @@ +import { EventEmitter } from "node:events"; +import { describe, expect, it, vi } from "vitest"; +import { createMatrixMonitorStatusController } from "./status.js"; +import { createMatrixMonitorSyncLifecycle } from "./sync-lifecycle.js"; + +function createClientEmitter() { + return new EventEmitter() as unknown as { + on: (event: string, listener: (...args: unknown[]) => void) => unknown; + off: (event: string, listener: (...args: unknown[]) => void) => unknown; + emit: (event: string, ...args: unknown[]) => boolean; + }; +} + +describe("createMatrixMonitorSyncLifecycle", () => { + it("rejects the channel wait on unexpected sync errors", async () => { + const client = createClientEmitter(); + const setStatus = vi.fn(); + const lifecycle = createMatrixMonitorSyncLifecycle({ + client: client as never, + statusController: createMatrixMonitorStatusController({ + accountId: "default", + statusSink: setStatus, + }), + }); + + const waitPromise = lifecycle.waitForFatalStop(); + client.emit("sync.unexpected_error", new Error("sync exploded")); + + await expect(waitPromise).rejects.toThrow("sync exploded"); + expect(setStatus).toHaveBeenCalledWith( + expect.objectContaining({ + accountId: "default", + healthState: "error", + lastError: "sync exploded", + }), + ); + }); + + it("ignores STOPPED emitted during intentional shutdown", async () => { + const client = createClientEmitter(); + const setStatus = vi.fn(); + let stopping = false; + const lifecycle = createMatrixMonitorSyncLifecycle({ + client: client as never, + statusController: createMatrixMonitorStatusController({ + accountId: "default", + statusSink: setStatus, + }), + isStopping: () => stopping, + }); + + const waitPromise = lifecycle.waitForFatalStop(); + stopping = true; + client.emit("sync.state", "STOPPED", "SYNCING", undefined); + lifecycle.dispose(); + + await expect(waitPromise).resolves.toBeUndefined(); + expect(setStatus).toHaveBeenCalledWith( + expect.objectContaining({ + accountId: "default", + healthState: "stopped", + }), + ); + }); + + it("marks unexpected STOPPED sync as an error state", async () => { + const client = createClientEmitter(); + const setStatus = vi.fn(); + const lifecycle = createMatrixMonitorSyncLifecycle({ + client: client as never, + statusController: createMatrixMonitorStatusController({ + accountId: "default", + statusSink: setStatus, + }), + }); + + const waitPromise = lifecycle.waitForFatalStop(); + client.emit("sync.state", "STOPPED", "SYNCING", undefined); + + await expect(waitPromise).rejects.toThrow("Matrix sync stopped unexpectedly"); + expect(setStatus).toHaveBeenCalledWith( + expect.objectContaining({ + accountId: "default", + healthState: "error", + lastError: "Matrix sync stopped unexpectedly", + }), + ); + }); + + it("ignores unexpected sync errors emitted during intentional shutdown", async () => { + const client = createClientEmitter(); + const setStatus = vi.fn(); + let stopping = false; + const lifecycle = createMatrixMonitorSyncLifecycle({ + client: client as never, + statusController: createMatrixMonitorStatusController({ + accountId: "default", + statusSink: setStatus, + }), + isStopping: () => stopping, + }); + + const waitPromise = lifecycle.waitForFatalStop(); + stopping = true; + client.emit("sync.unexpected_error", new Error("shutdown noise")); + lifecycle.dispose(); + + await expect(waitPromise).resolves.toBeUndefined(); + expect(setStatus).not.toHaveBeenCalledWith( + expect.objectContaining({ + accountId: "default", + healthState: "error", + }), + ); + }); + + it("ignores non-terminal sync states emitted during intentional shutdown", async () => { + const client = createClientEmitter(); + const setStatus = vi.fn(); + let stopping = false; + const statusController = createMatrixMonitorStatusController({ + accountId: "default", + statusSink: setStatus, + }); + const lifecycle = createMatrixMonitorSyncLifecycle({ + client: client as never, + statusController, + isStopping: () => stopping, + }); + + const waitPromise = lifecycle.waitForFatalStop(); + stopping = true; + client.emit("sync.state", "ERROR", "RECONNECTING", new Error("shutdown noise")); + lifecycle.dispose(); + statusController.markStopped(); + + await expect(waitPromise).resolves.toBeUndefined(); + expect(setStatus).toHaveBeenLastCalledWith( + expect.objectContaining({ + accountId: "default", + healthState: "stopped", + lastError: null, + }), + ); + }); + + it("does not downgrade a fatal error to stopped during shutdown", async () => { + const client = createClientEmitter(); + const setStatus = vi.fn(); + let stopping = false; + const statusController = createMatrixMonitorStatusController({ + accountId: "default", + statusSink: setStatus, + }); + const lifecycle = createMatrixMonitorSyncLifecycle({ + client: client as never, + statusController, + isStopping: () => stopping, + }); + + const waitPromise = lifecycle.waitForFatalStop(); + client.emit("sync.unexpected_error", new Error("sync exploded")); + await expect(waitPromise).rejects.toThrow("sync exploded"); + + stopping = true; + client.emit("sync.state", "STOPPED", "SYNCING", undefined); + lifecycle.dispose(); + statusController.markStopped(); + + expect(setStatus).toHaveBeenLastCalledWith( + expect.objectContaining({ + accountId: "default", + healthState: "error", + lastError: "sync exploded", + }), + ); + }); + + it("ignores follow-up sync states after a fatal sync error", async () => { + const client = createClientEmitter(); + const setStatus = vi.fn(); + const lifecycle = createMatrixMonitorSyncLifecycle({ + client: client as never, + statusController: createMatrixMonitorStatusController({ + accountId: "default", + statusSink: setStatus, + }), + }); + + const waitPromise = lifecycle.waitForFatalStop(); + client.emit("sync.unexpected_error", new Error("sync exploded")); + await expect(waitPromise).rejects.toThrow("sync exploded"); + + client.emit("sync.state", "RECONNECTING", "SYNCING", new Error("late reconnect")); + lifecycle.dispose(); + + expect(setStatus).toHaveBeenLastCalledWith( + expect.objectContaining({ + accountId: "default", + healthState: "error", + lastError: "sync exploded", + }), + ); + }); + + it("rejects a second concurrent fatal-stop waiter", async () => { + const client = createClientEmitter(); + const lifecycle = createMatrixMonitorSyncLifecycle({ + client: client as never, + statusController: createMatrixMonitorStatusController({ + accountId: "default", + }), + }); + + const firstWait = lifecycle.waitForFatalStop(); + + await expect(lifecycle.waitForFatalStop()).rejects.toThrow( + "Matrix fatal-stop wait already in progress", + ); + + lifecycle.dispose(); + await expect(firstWait).resolves.toBeUndefined(); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/sync-lifecycle.ts b/extensions/matrix/src/matrix/monitor/sync-lifecycle.ts new file mode 100644 index 00000000000..db58300fe33 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/sync-lifecycle.ts @@ -0,0 +1,91 @@ +import type { MatrixClient } from "../sdk.js"; +import { isMatrixTerminalSyncState, type MatrixSyncState } from "../sync-state.js"; +import type { MatrixMonitorStatusController } from "./status.js"; + +function formatSyncLifecycleError(state: MatrixSyncState, error?: unknown): Error { + if (error instanceof Error) { + return error; + } + const message = typeof error === "string" && error.trim() ? error.trim() : undefined; + if (state === "STOPPED") { + return new Error(message ?? "Matrix sync stopped unexpectedly"); + } + if (state === "ERROR") { + return new Error(message ?? "Matrix sync entered ERROR unexpectedly"); + } + return new Error(message ?? `Matrix sync entered ${state} unexpectedly`); +} + +export function createMatrixMonitorSyncLifecycle(params: { + client: MatrixClient; + statusController: MatrixMonitorStatusController; + isStopping?: () => boolean; +}) { + let fatalError: Error | null = null; + let resolveFatalWait: (() => void) | null = null; + let rejectFatalWait: ((error: Error) => void) | null = null; + + const settleFatal = (error: Error) => { + if (fatalError) { + return; + } + fatalError = error; + rejectFatalWait?.(error); + resolveFatalWait = null; + rejectFatalWait = null; + }; + + const onSyncState = (state: MatrixSyncState, _prevState: string | null, error?: unknown) => { + if (isMatrixTerminalSyncState(state) && !params.isStopping?.()) { + const fatalError = formatSyncLifecycleError(state, error); + params.statusController.noteUnexpectedError(fatalError); + settleFatal(fatalError); + return; + } + // Fatal sync failures are sticky for telemetry; later SDK state churn during + // cleanup or reconnect should not overwrite the first recorded error. + if (fatalError) { + return; + } + // Operator-initiated shutdown can still emit transient sync states before + // the final STOPPED. Ignore that churn so intentional stops do not look + // like runtime failures. + if (params.isStopping?.() && !isMatrixTerminalSyncState(state)) { + return; + } + params.statusController.noteSyncState(state, error); + }; + + const onUnexpectedError = (error: Error) => { + if (params.isStopping?.()) { + return; + } + params.statusController.noteUnexpectedError(error); + settleFatal(error); + }; + + params.client.on("sync.state", onSyncState); + params.client.on("sync.unexpected_error", onUnexpectedError); + + return { + async waitForFatalStop(): Promise { + if (fatalError) { + throw fatalError; + } + if (resolveFatalWait || rejectFatalWait) { + throw new Error("Matrix fatal-stop wait already in progress"); + } + await new Promise((resolve, reject) => { + resolveFatalWait = resolve; + rejectFatalWait = (error) => reject(error); + }); + }, + dispose() { + resolveFatalWait?.(); + resolveFatalWait = null; + rejectFatalWait = null; + params.client.off("sync.state", onSyncState); + params.client.off("sync.unexpected_error", onUnexpectedError); + }, + }; +} diff --git a/extensions/matrix/src/matrix/monitor/task-runner.ts b/extensions/matrix/src/matrix/monitor/task-runner.ts new file mode 100644 index 00000000000..eee678f0cf0 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/task-runner.ts @@ -0,0 +1,38 @@ +import type { RuntimeLogger } from "../../runtime-api.js"; + +export function createMatrixMonitorTaskRunner(params: { + logger: RuntimeLogger; + logVerboseMessage: (message: string) => void; +}) { + const inFlight = new Set>(); + + const runDetachedTask = (label: string, task: () => Promise): Promise => { + let trackedTask!: Promise; + trackedTask = Promise.resolve() + .then(task) + .catch((error) => { + const message = String(error); + params.logVerboseMessage(`matrix: ${label} failed (${message})`); + params.logger.warn("matrix background task failed", { + task: label, + error: message, + }); + }) + .finally(() => { + inFlight.delete(trackedTask); + }); + inFlight.add(trackedTask); + return trackedTask; + }; + + const waitForIdle = async (): Promise => { + while (inFlight.size > 0) { + await Promise.allSettled(Array.from(inFlight)); + } + }; + + return { + runDetachedTask, + waitForIdle, + }; +} diff --git a/extensions/matrix/src/matrix/sdk.test.ts b/extensions/matrix/src/matrix/sdk.test.ts index 41a60f9ef24..8dca0eec27e 100644 --- a/extensions/matrix/src/matrix/sdk.test.ts +++ b/extensions/matrix/src/matrix/sdk.test.ts @@ -114,7 +114,11 @@ type MatrixJsClientStub = EventEmitter & { function createMatrixJsClientStub(): MatrixJsClientStub { const client = new EventEmitter() as MatrixJsClientStub; - client.startClient = vi.fn(async () => {}); + client.startClient = vi.fn(async () => { + queueMicrotask(() => { + client.emit("sync", "PREPARED", null, undefined); + }); + }); client.stopClient = vi.fn(); client.initRustCrypto = vi.fn(async () => {}); client.getUserId = vi.fn(() => "@bot:example.org"); @@ -182,7 +186,12 @@ vi.mock("matrix-js-sdk/lib/matrix.js", async () => { ); return { ...actual, - ClientEvent: { Event: "event", Room: "Room" }, + ClientEvent: { + Event: "event", + Room: "Room", + Sync: "sync", + SyncUnexpectedError: "sync.unexpectedError", + }, MatrixEventEvent: { Decrypted: "decrypted" }, createClient: vi.fn((opts: Record) => { lastCreateClientOpts = opts; @@ -947,6 +956,150 @@ describe("MatrixClient event bridge", () => { expect(invites).toEqual(["!invite:example.org"]); }); + it("waits for a ready sync state before resolving startup", async () => { + let releaseSyncReady: (() => void) | undefined; + matrixJsClient.startClient = vi.fn(async () => { + await new Promise((resolve) => { + releaseSyncReady = () => { + matrixJsClient.emit("sync", "PREPARED", null, undefined); + resolve(); + }; + }); + }); + + const client = new MatrixClient("https://matrix.example.org", "token"); + let resolved = false; + const startPromise = client.start().then(() => { + resolved = true; + }); + + await vi.waitFor(() => { + expect(releaseSyncReady).toEqual(expect.any(Function)); + }); + expect(resolved).toBe(false); + + releaseSyncReady?.(); + await startPromise; + + expect(resolved).toBe(true); + }); + + it("rejects startup when sync reports an unexpected error before ready", async () => { + matrixJsClient.startClient = vi.fn(async () => { + const timer = setTimeout(() => { + matrixJsClient.emit("sync.unexpectedError", new Error("sync exploded")); + }, 0); + timer.unref?.(); + }); + + const client = new MatrixClient("https://matrix.example.org", "token"); + + await expect(client.start()).rejects.toThrow("sync exploded"); + }); + + it("allows transient startup ERROR to recover into PREPARED", async () => { + matrixJsClient.startClient = vi.fn(async () => { + queueMicrotask(() => { + matrixJsClient.emit("sync", "ERROR", null, new Error("temporary outage")); + queueMicrotask(() => { + matrixJsClient.emit("sync", "PREPARED", "ERROR", undefined); + }); + }); + }); + + const client = new MatrixClient("https://matrix.example.org", "token"); + + await expect(client.start()).resolves.toBeUndefined(); + }); + + it("aborts startup when the readiness wait is canceled", async () => { + matrixJsClient.startClient = vi.fn(async () => {}); + + const abortController = new AbortController(); + const client = new MatrixClient("https://matrix.example.org", "token"); + const startPromise = client.start({ abortSignal: abortController.signal }); + + abortController.abort(); + + await expect(startPromise).rejects.toMatchObject({ + message: "Matrix startup aborted", + name: "AbortError", + }); + }); + + it("aborts before post-ready startup work when shutdown races ready sync", async () => { + matrixJsClient.startClient = vi.fn(async () => { + queueMicrotask(() => { + matrixJsClient.emit("sync", "PREPARED", null, undefined); + }); + }); + + const abortController = new AbortController(); + const client = new MatrixClient("https://matrix.example.org", "token"); + const bootstrapCryptoSpy = vi.spyOn( + client as unknown as { bootstrapCryptoIfNeeded: () => Promise }, + "bootstrapCryptoIfNeeded", + ); + bootstrapCryptoSpy.mockImplementation(async () => {}); + + client.on("sync.state", (state) => { + if (state === "PREPARED") { + abortController.abort(); + } + }); + + await expect(client.start({ abortSignal: abortController.signal })).rejects.toMatchObject({ + message: "Matrix startup aborted", + name: "AbortError", + }); + expect(bootstrapCryptoSpy).not.toHaveBeenCalled(); + }); + + it("times out startup when no ready sync state arrives", async () => { + vi.useFakeTimers(); + matrixJsClient.startClient = vi.fn(async () => {}); + + const client = new MatrixClient("https://matrix.example.org", "token"); + const startPromise = client.start(); + const startExpectation = expect(startPromise).rejects.toThrow( + "Matrix client did not reach a ready sync state within 30000ms", + ); + + await vi.advanceTimersByTimeAsync(30_000); + + await startExpectation; + }); + + it("clears stale sync state before a restarted sync session waits for fresh readiness", async () => { + matrixJsClient.startClient = vi + .fn(async () => { + queueMicrotask(() => { + matrixJsClient.emit("sync", "PREPARED", null, undefined); + }); + }) + .mockImplementationOnce(async () => { + queueMicrotask(() => { + matrixJsClient.emit("sync", "PREPARED", null, undefined); + }); + }) + .mockImplementationOnce(async () => {}); + + const client = new MatrixClient("https://matrix.example.org", "token"); + + await client.start(); + client.stopSyncWithoutPersist(); + + vi.useFakeTimers(); + const restartPromise = client.start(); + const restartExpectation = expect(restartPromise).rejects.toThrow( + "Matrix client did not reach a ready sync state within 30000ms", + ); + + await vi.advanceTimersByTimeAsync(30_000); + + await restartExpectation; + }); + it("replays outstanding invite rooms at startup", async () => { matrixJsClient.getRooms = vi.fn(() => [ { diff --git a/extensions/matrix/src/matrix/sdk.ts b/extensions/matrix/src/matrix/sdk.ts index 19de1badebf..eaceecef873 100644 --- a/extensions/matrix/src/matrix/sdk.ts +++ b/extensions/matrix/src/matrix/sdk.ts @@ -44,6 +44,12 @@ import type { MessageEventContent, } from "./sdk/types.js"; import type { MatrixVerificationSummary } from "./sdk/verification-manager.js"; +import { createMatrixStartupAbortError, throwIfMatrixStartupAborted } from "./startup-abort.js"; +import { + isMatrixReadySyncState, + isMatrixTerminalSyncState, + type MatrixSyncState, +} from "./sync-state.js"; export { ConsoleLogger, LogService }; export type { @@ -221,6 +227,7 @@ export class MatrixClient { private readonly autoBootstrapCrypto: boolean; private stopPersistPromise: Promise | null = null; private verificationSummaryListenerBound = false; + private currentSyncState: MatrixSyncState | null = null; readonly dms = { update: async (): Promise => { @@ -367,25 +374,128 @@ export class MatrixClient { } } - async start(): Promise { - await this.startSyncSession({ bootstrapCrypto: true }); + async start(opts: { abortSignal?: AbortSignal; readyTimeoutMs?: number } = {}): Promise { + await this.startSyncSession({ + bootstrapCrypto: true, + abortSignal: opts.abortSignal, + readyTimeoutMs: opts.readyTimeoutMs, + }); } - private async startSyncSession(opts: { bootstrapCrypto: boolean }): Promise { + private async waitForInitialSyncReady( + params: { + timeoutMs?: number; + abortSignal?: AbortSignal; + } = {}, + ): Promise { + const timeoutMs = params.timeoutMs ?? 30_000; + if (isMatrixReadySyncState(this.currentSyncState)) { + return; + } + if (isMatrixTerminalSyncState(this.currentSyncState)) { + throw new Error(`Matrix sync entered ${this.currentSyncState} during startup`); + } + + await new Promise((resolve, reject) => { + let settled = false; + let timeoutId: ReturnType | undefined; + const abortSignal = params.abortSignal; + + const cleanup = () => { + this.off("sync.state", onSyncState); + this.off("sync.unexpected_error", onUnexpectedError); + abortSignal?.removeEventListener("abort", onAbort); + if (timeoutId) { + clearTimeout(timeoutId); + timeoutId = undefined; + } + }; + + const settleResolve = () => { + if (settled) { + return; + } + settled = true; + cleanup(); + resolve(); + }; + + const settleReject = (error: Error) => { + if (settled) { + return; + } + settled = true; + cleanup(); + reject(error); + }; + + const onSyncState = (state: MatrixSyncState, _prevState: string | null, error?: unknown) => { + if (isMatrixReadySyncState(state)) { + settleResolve(); + return; + } + if (isMatrixTerminalSyncState(state)) { + settleReject( + new Error( + error instanceof Error && error.message + ? error.message + : `Matrix sync entered ${state} during startup`, + ), + ); + } + }; + + const onUnexpectedError = (error: Error) => { + settleReject(error); + }; + + const onAbort = () => { + settleReject(createMatrixStartupAbortError()); + }; + + this.on("sync.state", onSyncState); + this.on("sync.unexpected_error", onUnexpectedError); + if (abortSignal?.aborted) { + onAbort(); + return; + } + abortSignal?.addEventListener("abort", onAbort, { once: true }); + timeoutId = setTimeout(() => { + settleReject( + new Error(`Matrix client did not reach a ready sync state within ${timeoutMs}ms`), + ); + }, timeoutMs); + timeoutId.unref?.(); + }); + } + + private async startSyncSession(opts: { + bootstrapCrypto: boolean; + abortSignal?: AbortSignal; + readyTimeoutMs?: number; + }): Promise { if (this.started) { return; } + throwIfMatrixStartupAborted(opts.abortSignal); await this.ensureCryptoSupportInitialized(); + throwIfMatrixStartupAborted(opts.abortSignal); this.registerBridge(); - await this.initializeCryptoIfNeeded(); + await this.initializeCryptoIfNeeded(opts.abortSignal); await this.client.startClient({ initialSyncLimit: this.initialSyncLimit, }); + await this.waitForInitialSyncReady({ + abortSignal: opts.abortSignal, + timeoutMs: opts.readyTimeoutMs, + }); + throwIfMatrixStartupAborted(opts.abortSignal); if (opts.bootstrapCrypto && this.autoBootstrapCrypto) { - await this.bootstrapCryptoIfNeeded(); + await this.bootstrapCryptoIfNeeded(opts.abortSignal); } + throwIfMatrixStartupAborted(opts.abortSignal); this.started = true; this.emitOutstandingInviteEvents(); await this.refreshDmCache().catch(noop); @@ -426,6 +536,7 @@ export class MatrixClient { clearInterval(this.idbPersistTimer); this.idbPersistTimer = null; } + this.currentSyncState = null; this.client.stopClient(); this.started = false; } @@ -469,10 +580,11 @@ export class MatrixClient { await this.stopPersistPromise; } - private async bootstrapCryptoIfNeeded(): Promise { + private async bootstrapCryptoIfNeeded(abortSignal?: AbortSignal): Promise { if (!this.encryptionEnabled || !this.cryptoInitialized || this.cryptoBootstrapped) { return; } + throwIfMatrixStartupAborted(abortSignal); await this.ensureCryptoSupportInitialized(); const crypto = this.client.getCrypto() as MatrixCryptoBootstrapApi | undefined; if (!crypto) { @@ -486,6 +598,7 @@ export class MatrixClient { crypto, MATRIX_INITIAL_CRYPTO_BOOTSTRAP_OPTIONS, ); + throwIfMatrixStartupAborted(abortSignal); if (!initial.crossSigningPublished || initial.ownDeviceVerified === false) { const status = await this.getOwnDeviceVerificationStatus(); if (status.signedByOwner) { @@ -503,6 +616,7 @@ export class MatrixClient { crypto, MATRIX_AUTOMATIC_REPAIR_BOOTSTRAP_OPTIONS, ); + throwIfMatrixStartupAborted(abortSignal); if (repaired.crossSigningPublished && repaired.ownDeviceVerified !== false) { LogService.info( "MatrixClientLite", @@ -526,26 +640,30 @@ export class MatrixClient { this.cryptoBootstrapped = true; } - private async initializeCryptoIfNeeded(): Promise { + private async initializeCryptoIfNeeded(abortSignal?: AbortSignal): Promise { if (!this.encryptionEnabled || this.cryptoInitialized) { return; } + throwIfMatrixStartupAborted(abortSignal); const { persistIdbToDisk, restoreIdbFromDisk } = await loadMatrixCryptoRuntime(); // Restore persisted IndexedDB crypto store before initializing WASM crypto. await restoreIdbFromDisk(this.idbSnapshotPath); + throwIfMatrixStartupAborted(abortSignal); try { await this.client.initRustCrypto({ cryptoDatabasePrefix: this.cryptoDatabasePrefix, }); this.cryptoInitialized = true; + throwIfMatrixStartupAborted(abortSignal); // Persist the crypto store after successful init (captures fresh keys on first run). await persistIdbToDisk({ snapshotPath: this.idbSnapshotPath, databasePrefix: this.cryptoDatabasePrefix, }); + throwIfMatrixStartupAborted(abortSignal); // Periodically persist to capture new Olm sessions and room keys. this.idbPersistTimer = setInterval(() => { @@ -1587,6 +1705,20 @@ export class MatrixClient { this.client.on(ClientEvent.Room, (room) => { this.emitMembershipForRoom(room); }); + this.client.on( + ClientEvent.Sync, + (state: MatrixSyncState, prevState: string | null, data?: unknown) => { + this.currentSyncState = state; + const error = + data && typeof data === "object" && "error" in data + ? (data as { error?: unknown }).error + : undefined; + this.emitter.emit("sync.state", state, prevState, error); + }, + ); + this.client.on(ClientEvent.SyncUnexpectedError, (error: Error) => { + this.emitter.emit("sync.unexpected_error", error); + }); } private emitMembershipForRoom(room: unknown): void { diff --git a/extensions/matrix/src/matrix/sdk/types.ts b/extensions/matrix/src/matrix/sdk/types.ts index 5b555478e3e..a43987117c8 100644 --- a/extensions/matrix/src/matrix/sdk/types.ts +++ b/extensions/matrix/src/matrix/sdk/types.ts @@ -1,3 +1,4 @@ +import type { MatrixSyncState } from "../sync-state.js"; import type { MatrixVerificationRequestLike, MatrixVerificationSummary, @@ -31,6 +32,8 @@ export type MatrixClientEventMap = { "room.failed_decryption": [roomId: string, event: MatrixRawEvent, error: Error]; "room.invite": [roomId: string, event: MatrixRawEvent]; "room.join": [roomId: string, event: MatrixRawEvent]; + "sync.state": [state: MatrixSyncState, prevState: string | null, error?: unknown]; + "sync.unexpected_error": [error: Error]; "verification.summary": [summary: MatrixVerificationSummary]; }; diff --git a/extensions/matrix/src/matrix/startup-abort.ts b/extensions/matrix/src/matrix/startup-abort.ts new file mode 100644 index 00000000000..71b530627da --- /dev/null +++ b/extensions/matrix/src/matrix/startup-abort.ts @@ -0,0 +1,44 @@ +export function createMatrixStartupAbortError(): Error { + const error = new Error("Matrix startup aborted"); + error.name = "AbortError"; + return error; +} + +export function throwIfMatrixStartupAborted(abortSignal?: AbortSignal): void { + if (abortSignal?.aborted === true) { + throw createMatrixStartupAbortError(); + } +} + +export function isMatrixStartupAbortError(error: unknown): boolean { + return error instanceof Error && error.name === "AbortError"; +} + +export async function awaitMatrixStartupWithAbort( + promise: Promise, + abortSignal?: AbortSignal, +): Promise { + if (!abortSignal) { + return await promise; + } + if (abortSignal.aborted) { + throw createMatrixStartupAbortError(); + } + return await new Promise((resolve, reject) => { + const onAbort = () => { + abortSignal.removeEventListener("abort", onAbort); + reject(createMatrixStartupAbortError()); + }; + abortSignal.addEventListener("abort", onAbort, { once: true }); + promise.then( + (value) => { + abortSignal.removeEventListener("abort", onAbort); + resolve(value); + }, + (error) => { + abortSignal.removeEventListener("abort", onAbort); + reject(error); + }, + ); + }); +} diff --git a/extensions/matrix/src/matrix/sync-state.ts b/extensions/matrix/src/matrix/sync-state.ts new file mode 100644 index 00000000000..3edeefe978c --- /dev/null +++ b/extensions/matrix/src/matrix/sync-state.ts @@ -0,0 +1,27 @@ +export type MatrixSyncState = + | "PREPARED" + | "SYNCING" + | "CATCHUP" + | "RECONNECTING" + | "ERROR" + | "STOPPED" + | (string & {}); + +export function isMatrixReadySyncState( + state: MatrixSyncState | null | undefined, +): state is "PREPARED" | "SYNCING" | "CATCHUP" { + return state === "PREPARED" || state === "SYNCING" || state === "CATCHUP"; +} + +export function isMatrixDisconnectedSyncState( + state: MatrixSyncState | null | undefined, +): state is "RECONNECTING" | "ERROR" | "STOPPED" { + return state === "RECONNECTING" || state === "ERROR" || state === "STOPPED"; +} + +export function isMatrixTerminalSyncState( + state: MatrixSyncState | null | undefined, +): state is "STOPPED" { + // matrix-js-sdk can recover from ERROR to PREPARED during initial sync. + return state === "STOPPED"; +}