fix(matrix): contain sync outage failures (#62779)

Merged via squash.

Prepared head SHA: 901bb767b5
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Gustavo Madeira Santana
2026-04-08 14:41:28 -04:00
committed by GitHub
parent a3d21539ef
commit 0c00c3c230
18 changed files with 1307 additions and 81 deletions

View File

@@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai
- Auto-reply/NO_REPLY: strip glued leading `NO_REPLY` tokens before reply normalization and ACP-visible streaming so silent sentinel text no longer leaks into user-visible replies while preserving substantive `NO_REPLY ...` text. Thanks @frankekn.
- Gateway/sessions: clear auto-fallback-pinned model overrides on `/reset` and `/new` while still preserving explicit user model selections, including legacy sessions created before override-source tracking existed. (#63155) Thanks @frankekn.
- Codex CLI: pass OpenClaw's system prompt through Codex's `model_instructions_file` config override so fresh Codex CLI sessions receive the same prompt guidance as Claude CLI sessions.
- Matrix/gateway: wait for Matrix sync readiness before marking startup successful, keep Matrix background handler failures contained, and route fatal Matrix sync stops through channel-level restart handling instead of crashing the whole gateway. (#62779) Thanks @gumadeiras.
## 2026.4.8
@@ -40,6 +41,8 @@ Docs: https://docs.openclaw.ai
- Slack/actions: pass the already resolved read token into `downloadFile` so SecretRef-backed bot tokens no longer fail after a raw config re-read. (#62097) Thanks @martingarramon.
- Network/fetch guard: skip target DNS pinning when trusted env-proxy mode is active so proxy-only sandboxes can let the trusted proxy resolve outbound hosts. (#59007) Thanks @cluster2600.
## 2026.4.7-1
## 2026.4.7
### Changes

View File

@@ -496,6 +496,7 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount, MatrixProbe> =
initialSyncLimit: account.config.initialSyncLimit,
replyToMode: account.config.replyToMode,
accountId: account.accountId,
setStatus: ctx.setStatus,
});
},
},

View File

@@ -233,6 +233,86 @@ describe("resolveSharedMatrixClient", () => {
).rejects.toThrow("Matrix shared client account mismatch");
});
it("lets a later waiter abort while shared startup continues for the owner", async () => {
const mainAuth = authFor("main");
let resolveStartup: (() => void) | undefined;
const mainClient = {
...createMockClient("main"),
start: vi.fn(
async () =>
await new Promise<void>((resolve) => {
resolveStartup = resolve;
}),
),
};
resolveMatrixAuthMock.mockResolvedValue(mainAuth);
createMatrixClientMock.mockResolvedValue(mainClient);
const ownerPromise = resolveSharedMatrixClient({ accountId: "main" });
await vi.waitFor(() => {
expect(mainClient.start).toHaveBeenCalledTimes(1);
expect(resolveStartup).toEqual(expect.any(Function));
});
const abortController = new AbortController();
const canceledWaiter = resolveSharedMatrixClient({
accountId: "main",
abortSignal: abortController.signal,
});
abortController.abort();
await expect(canceledWaiter).rejects.toMatchObject({
message: "Matrix startup aborted",
name: "AbortError",
});
resolveStartup?.();
await expect(ownerPromise).resolves.toBe(mainClient);
});
it("keeps the shared startup lock while an aborted waiter exits early", async () => {
const mainAuth = authFor("main");
let resolveStartup: (() => void) | undefined;
const mainClient = {
...createMockClient("main"),
start: vi.fn(
async () =>
await new Promise<void>((resolve) => {
resolveStartup = resolve;
}),
),
};
resolveMatrixAuthMock.mockResolvedValue(mainAuth);
createMatrixClientMock.mockResolvedValue(mainClient);
const ownerPromise = resolveSharedMatrixClient({ accountId: "main" });
await vi.waitFor(() => {
expect(mainClient.start).toHaveBeenCalledTimes(1);
expect(resolveStartup).toEqual(expect.any(Function));
});
const abortController = new AbortController();
const abortedWaiter = resolveSharedMatrixClient({
accountId: "main",
abortSignal: abortController.signal,
});
abortController.abort();
await expect(abortedWaiter).rejects.toMatchObject({
message: "Matrix startup aborted",
name: "AbortError",
});
const followerPromise = resolveSharedMatrixClient({ accountId: "main" });
expect(mainClient.start).toHaveBeenCalledTimes(1);
resolveStartup?.();
await expect(ownerPromise).resolves.toBe(mainClient);
await expect(followerPromise).resolves.toBe(mainClient);
expect(mainClient.start).toHaveBeenCalledTimes(1);
});
it("recreates the shared client when dispatcherPolicy changes", async () => {
const firstAuth = {
...authFor("main"),

View File

@@ -2,6 +2,7 @@ import { normalizeOptionalAccountId } from "openclaw/plugin-sdk/account-id";
import type { CoreConfig } from "../../types.js";
import type { MatrixClient } from "../sdk.js";
import { LogService } from "../sdk/logger.js";
import { awaitMatrixStartupWithAbort } from "../startup-abort.js";
import { resolveMatrixAuth, resolveMatrixAuthContext } from "./config.js";
import type { MatrixAuth } from "./types.js";
@@ -91,19 +92,22 @@ function deleteSharedClientState(state: SharedMatrixClientState): void {
async function ensureSharedClientStarted(params: {
state: SharedMatrixClientState;
timeoutMs?: number;
initialSyncLimit?: number;
encryption?: boolean;
abortSignal?: AbortSignal;
}): Promise<void> {
const waitForStart = async (startPromise: Promise<void>) => {
await awaitMatrixStartupWithAbort(startPromise, params.abortSignal);
};
if (params.state.started) {
return;
}
if (params.state.startPromise) {
await params.state.startPromise;
await waitForStart(params.state.startPromise);
return;
}
params.state.startPromise = (async () => {
const startPromise = (async () => {
const client = params.state.client;
// Initialize crypto if enabled
@@ -119,15 +123,19 @@ async function ensureSharedClientStarted(params: {
}
}
await client.start();
await client.start({ abortSignal: params.abortSignal });
params.state.started = true;
})();
// Keep the shared startup lock until the underlying start fully settles, even
// if one waiter aborts early while another caller still owns the startup.
const guardedStart = startPromise.finally(() => {
if (params.state.startPromise === guardedStart) {
params.state.startPromise = null;
}
});
params.state.startPromise = guardedStart;
try {
await params.state.startPromise;
} finally {
params.state.startPromise = null;
}
await waitForStart(guardedStart);
}
async function resolveSharedMatrixClientState(
@@ -138,6 +146,7 @@ async function resolveSharedMatrixClientState(
auth?: MatrixAuth;
startClient?: boolean;
accountId?: string | null;
abortSignal?: AbortSignal;
} = {},
): Promise<SharedMatrixClientState> {
const requestedAccountId = normalizeOptionalAccountId(params.accountId);
@@ -168,9 +177,8 @@ async function resolveSharedMatrixClientState(
if (shouldStart) {
await ensureSharedClientStarted({
state: existingState,
timeoutMs: params.timeoutMs,
initialSyncLimit: auth.initialSyncLimit,
encryption: auth.encryption,
abortSignal: params.abortSignal,
});
}
return existingState;
@@ -182,9 +190,8 @@ async function resolveSharedMatrixClientState(
if (shouldStart) {
await ensureSharedClientStarted({
state: pending,
timeoutMs: params.timeoutMs,
initialSyncLimit: auth.initialSyncLimit,
encryption: auth.encryption,
abortSignal: params.abortSignal,
});
}
return pending;
@@ -202,9 +209,8 @@ async function resolveSharedMatrixClientState(
if (shouldStart) {
await ensureSharedClientStarted({
state: created,
timeoutMs: params.timeoutMs,
initialSyncLimit: auth.initialSyncLimit,
encryption: auth.encryption,
abortSignal: params.abortSignal,
});
}
return created;
@@ -221,6 +227,7 @@ export async function resolveSharedMatrixClient(
auth?: MatrixAuth;
startClient?: boolean;
accountId?: string | null;
abortSignal?: AbortSignal;
} = {},
): Promise<MatrixClient> {
const state = await resolveSharedMatrixClientState(params);
@@ -235,6 +242,7 @@ export async function acquireSharedMatrixClient(
auth?: MatrixAuth;
startClient?: boolean;
accountId?: string | null;
abortSignal?: AbortSignal;
} = {},
): Promise<MatrixClient> {
const state = await resolveSharedMatrixClientState(params);

View File

@@ -49,6 +49,7 @@ export function registerMatrixMonitorEvents(params: {
logger: RuntimeLogger;
formatNativeDependencyHint: PluginRuntime["system"]["formatNativeDependencyHint"];
onRoomMessage: (roomId: string, event: MatrixRawEvent) => void | Promise<void>;
runDetachedTask?: (label: string, task: () => Promise<void>) => Promise<void>;
}): void {
const {
cfg,
@@ -65,6 +66,7 @@ export function registerMatrixMonitorEvents(params: {
logger,
formatNativeDependencyHint,
onRoomMessage,
runDetachedTask,
} = params;
const { routeVerificationEvent, routeVerificationSummary } = createMatrixVerificationEventRouter({
client,
@@ -75,11 +77,27 @@ export function registerMatrixMonitorEvents(params: {
logVerboseMessage,
});
const runMonitorTask = (label: string, task: () => Promise<void>) => {
if (runDetachedTask) {
return runDetachedTask(label, task);
}
return Promise.resolve()
.then(task)
.catch((error) => {
logVerboseMessage(`matrix: ${label} failed (${String(error)})`);
});
};
client.on("room.message", (roomId: string, event: MatrixRawEvent) => {
if (routeVerificationEvent(roomId, event)) {
return;
}
void onRoomMessage(roomId, event);
void runMonitorTask(
`room message handler room=${roomId} id=${event.event_id ?? "unknown"}`,
async () => {
await onRoomMessage(roomId, event);
},
);
});
client.on("room.encrypted_event", (roomId: string, event: MatrixRawEvent) => {
@@ -121,7 +139,9 @@ export function registerMatrixMonitorEvents(params: {
);
client.on("verification.summary", (summary) => {
void routeVerificationSummary(summary);
void runMonitorTask("verification summary handler", async () => {
await routeVerificationSummary(summary);
});
});
client.on("room.invite", (roomId: string, event: MatrixRawEvent) => {
@@ -179,7 +199,12 @@ export function registerMatrixMonitorEvents(params: {
);
}
if (eventType === EventType.Reaction) {
void onRoomMessage(roomId, event);
void runMonitorTask(
`reaction handler room=${roomId} id=${event.event_id ?? "unknown"}`,
async () => {
await onRoomMessage(roomId, event);
},
);
return;
}

View File

@@ -12,6 +12,34 @@ type DirectRoomTrackerOptions = {
};
const hoisted = vi.hoisted(() => {
const createEmitter = () => {
const listeners = new Map<string, Set<(...args: unknown[]) => void>>();
return {
on(event: string, listener: (...args: unknown[]) => void) {
let bucket = listeners.get(event);
if (!bucket) {
bucket = new Set();
listeners.set(event, bucket);
}
bucket.add(listener);
return this;
},
off(event: string, listener: (...args: unknown[]) => void) {
listeners.get(event)?.delete(listener);
return this;
},
emit(event: string, ...args: unknown[]) {
for (const listener of listeners.get(event) ?? []) {
listener(...args);
}
return true;
},
removeAllListeners() {
listeners.clear();
return this;
},
};
};
const callOrder: string[] = [];
const state = {
startClientError: null as Error | null,
@@ -26,12 +54,12 @@ const hoisted = vi.hoisted(() => {
flush: vi.fn(async () => undefined),
stop: vi.fn(async () => undefined),
};
const client = {
const client = Object.assign(createEmitter(), {
id: "matrix-client",
hasPersistedSyncState: vi.fn(() => false),
stopSyncWithoutPersist: vi.fn(),
drainPendingDecryptions: vi.fn(async () => undefined),
};
});
const createMatrixRoomMessageHandler = vi.fn(() => vi.fn());
const createDirectRoomTracker = vi.fn((_client: unknown, _opts?: DirectRoomTrackerOptions) => ({
isDirectMessage: vi.fn(async () => false),
@@ -55,9 +83,27 @@ const hoisted = vi.hoisted(() => {
};
const stopThreadBindingManager = vi.fn();
const releaseSharedClientInstance = vi.fn(async () => true);
const resolveSharedMatrixClient = vi.fn(async (params: { startClient?: boolean }) => {
if (params.startClient === false) {
callOrder.push("prepare-client");
return client;
}
if (!callOrder.includes("create-manager")) {
throw new Error("Matrix client started before thread bindings were registered");
}
if (state.startClientError) {
throw state.startClientError;
}
callOrder.push("start-client");
return client;
});
const setActiveMatrixClient = vi.fn();
const setMatrixRuntime = vi.fn();
const backfillMatrixAuthDeviceIdAfterStartup = vi.fn(async () => undefined);
const runMatrixStartupMaintenance = vi.fn<
(params: { abortSignal?: AbortSignal }) => Promise<void>
>(async () => undefined);
const setStatus = vi.fn();
return {
backfillMatrixAuthDeviceIdAfterStartup,
callOrder,
@@ -71,9 +117,12 @@ const hoisted = vi.hoisted(() => {
logger,
registeredOnRoomMessage: null as null | ((roomId: string, event: unknown) => Promise<void>),
releaseSharedClientInstance,
resolveSharedMatrixClient,
resolveTextChunkLimit,
runMatrixStartupMaintenance,
setActiveMatrixClient,
setMatrixRuntime,
setStatus,
state,
stopThreadBindingManager,
};
@@ -237,20 +286,7 @@ vi.mock("../client.js", () => ({
resolveMatrixAuthContext: vi.fn(() => ({
accountId: "default",
})),
resolveSharedMatrixClient: vi.fn(async (params: { startClient?: boolean }) => {
if (params.startClient === false) {
hoisted.callOrder.push("prepare-client");
return hoisted.client;
}
if (!hoisted.callOrder.includes("create-manager")) {
throw new Error("Matrix client started before thread bindings were registered");
}
if (hoisted.state.startClientError) {
throw hoisted.state.startClientError;
}
hoisted.callOrder.push("start-client");
return hoisted.client;
}),
resolveSharedMatrixClient: hoisted.resolveSharedMatrixClient,
}));
vi.mock("../client/shared.js", () => ({
@@ -300,9 +336,17 @@ vi.mock("./direct.js", () => ({
vi.mock("./events.js", () => ({
registerMatrixMonitorEvents: vi.fn(
(params: { onRoomMessage: (roomId: string, event: unknown) => Promise<void> }) => {
(params: {
onRoomMessage: (roomId: string, event: unknown) => Promise<void>;
runDetachedTask?: (label: string, task: () => Promise<void>) => Promise<void>;
}) => {
hoisted.callOrder.push("register-events");
hoisted.registeredOnRoomMessage = params.onRoomMessage;
hoisted.registeredOnRoomMessage = (roomId: string, event: unknown) =>
params.runDetachedTask
? params.runDetachedTask("test room message", async () => {
await params.onRoomMessage(roomId, event);
})
: params.onRoomMessage(roomId, event);
},
),
}));
@@ -330,6 +374,10 @@ vi.mock("./startup-verification.js", () => ({
ensureMatrixStartupVerification: vi.fn(),
}));
vi.mock("./startup.js", () => ({
runMatrixStartupMaintenance: hoisted.runMatrixStartupMaintenance,
}));
let monitorMatrixProvider: typeof import("./index.js").monitorMatrixProvider;
describe("monitorMatrixProvider", () => {
@@ -353,6 +401,22 @@ describe("monitorMatrixProvider", () => {
delete (hoisted.accountConfig as { rooms?: Record<string, unknown> }).rooms;
hoisted.resolveTextChunkLimit.mockReset().mockReturnValue(4000);
hoisted.releaseSharedClientInstance.mockReset().mockResolvedValue(true);
hoisted.resolveSharedMatrixClient
.mockReset()
.mockImplementation(async (params: { startClient?: boolean }) => {
if (params.startClient === false) {
hoisted.callOrder.push("prepare-client");
return hoisted.client;
}
if (!hoisted.callOrder.includes("create-manager")) {
throw new Error("Matrix client started before thread bindings were registered");
}
if (hoisted.state.startClientError) {
throw hoisted.state.startClientError;
}
hoisted.callOrder.push("start-client");
return hoisted.client;
});
hoisted.createDirectRoomTracker.mockReset().mockReturnValue({
isDirectMessage: vi.fn(async () => false),
});
@@ -365,6 +429,7 @@ describe("monitorMatrixProvider", () => {
hoisted.registeredOnRoomMessage = null;
hoisted.setActiveMatrixClient.mockReset();
hoisted.stopThreadBindingManager.mockReset();
hoisted.client.removeAllListeners();
hoisted.client.hasPersistedSyncState.mockReset().mockReturnValue(false);
hoisted.client.stopSyncWithoutPersist.mockReset();
hoisted.client.drainPendingDecryptions.mockReset().mockResolvedValue(undefined);
@@ -374,7 +439,9 @@ describe("monitorMatrixProvider", () => {
hoisted.inboundDeduper.flush.mockReset().mockResolvedValue(undefined);
hoisted.inboundDeduper.stop.mockReset().mockResolvedValue(undefined);
hoisted.backfillMatrixAuthDeviceIdAfterStartup.mockReset().mockResolvedValue(undefined);
hoisted.runMatrixStartupMaintenance.mockReset().mockResolvedValue(undefined);
hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn());
hoisted.setStatus.mockReset();
Object.values(hoisted.logger).forEach((mock) => mock.mockReset());
});
@@ -390,6 +457,178 @@ describe("monitorMatrixProvider", () => {
expect(hoisted.setActiveMatrixClient).not.toHaveBeenCalled();
});
it("publishes disconnected startup status and connected sync status without failing the monitor", async () => {
const abortController = new AbortController();
const monitorPromise = monitorMatrixProvider({
abortSignal: abortController.signal,
setStatus: hoisted.setStatus,
});
await vi.waitFor(() => {
expect(hoisted.callOrder).toContain("start-client");
});
expect(hoisted.setStatus).toHaveBeenCalledWith(
expect.objectContaining({
accountId: "default",
baseUrl: "https://matrix.example.org",
connected: false,
healthState: "starting",
}),
);
hoisted.client.emit("sync.state", "SYNCING", "RECONNECTING", undefined);
await vi.waitFor(() => {
expect(hoisted.setStatus).toHaveBeenCalledWith(
expect.objectContaining({
accountId: "default",
connected: true,
healthState: "healthy",
lastError: null,
}),
);
});
abortController.abort();
await expect(monitorPromise).resolves.toBeUndefined();
});
it("contains room-message handler rejections inside monitor task tracking", async () => {
const abortController = new AbortController();
const unhandled: unknown[] = [];
const onUnhandled = (reason: unknown) => {
unhandled.push(reason);
};
hoisted.createMatrixRoomMessageHandler.mockReturnValue(
vi.fn(async () => {
throw new Error("room handler exploded");
}),
);
process.on("unhandledRejection", onUnhandled);
try {
const monitorPromise = monitorMatrixProvider({ abortSignal: abortController.signal });
await vi.waitFor(() => {
expect(hoisted.callOrder).toContain("start-client");
});
const onRoomMessage = hoisted.registeredOnRoomMessage;
if (!onRoomMessage) {
throw new Error("expected room message handler to be registered");
}
await onRoomMessage("!room:example.org", { event_id: "$event" });
await Promise.resolve();
expect(unhandled).toHaveLength(0);
expect(hoisted.logger.warn).toHaveBeenCalledWith(
"matrix background task failed",
expect.objectContaining({
task: "test room message",
error: "Error: room handler exploded",
}),
);
abortController.abort();
await monitorPromise;
} finally {
process.off("unhandledRejection", onUnhandled);
}
});
it("fails the channel task when Matrix sync emits an unexpected fatal error", async () => {
const abortController = new AbortController();
const monitorPromise = monitorMatrixProvider({
abortSignal: abortController.signal,
setStatus: hoisted.setStatus,
});
await vi.waitFor(() => {
expect(hoisted.callOrder).toContain("start-client");
});
hoisted.client.emit("sync.unexpected_error", new Error("sync exploded"));
await expect(monitorPromise).rejects.toThrow("sync exploded");
expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledWith(hoisted.client, "persist");
expect(hoisted.setStatus).toHaveBeenCalledWith(
expect.objectContaining({
accountId: "default",
connected: false,
healthState: "error",
lastError: "sync exploded",
}),
);
});
it("aborts stalled startup promptly and releases the shared client without persist", async () => {
const abortController = new AbortController();
hoisted.resolveSharedMatrixClient.mockImplementation(
async (params: { startClient?: boolean; abortSignal?: AbortSignal }) => {
if (params.startClient === false) {
hoisted.callOrder.push("prepare-client");
return hoisted.client;
}
hoisted.callOrder.push("start-client");
return await new Promise<typeof hoisted.client>((_resolve, reject) => {
params.abortSignal?.addEventListener(
"abort",
() => {
const error = new Error("Matrix startup aborted");
error.name = "AbortError";
reject(error);
},
{ once: true },
);
});
},
);
const monitorPromise = monitorMatrixProvider({ abortSignal: abortController.signal });
await vi.waitFor(() => {
expect(hoisted.callOrder).toContain("start-client");
});
abortController.abort();
await expect(monitorPromise).resolves.toBeUndefined();
expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledWith(hoisted.client, "stop");
expect(hoisted.client.drainPendingDecryptions).not.toHaveBeenCalled();
});
it("aborts during startup maintenance and releases the shared client without persist", async () => {
const abortController = new AbortController();
hoisted.runMatrixStartupMaintenance.mockImplementation(
async (params: { abortSignal?: AbortSignal }) =>
await new Promise<void>((_resolve, reject) => {
params.abortSignal?.addEventListener(
"abort",
() => {
const error = new Error("Matrix startup aborted");
error.name = "AbortError";
reject(error);
},
{ once: true },
);
}),
);
const monitorPromise = monitorMatrixProvider({ abortSignal: abortController.signal });
await vi.waitFor(() => {
expect(hoisted.runMatrixStartupMaintenance).toHaveBeenCalledTimes(1);
});
abortController.abort();
await expect(monitorPromise).resolves.toBeUndefined();
expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledWith(hoisted.client, "stop");
expect(hoisted.client.drainPendingDecryptions).not.toHaveBeenCalled();
});
it("registers Matrix thread bindings before starting the client", async () => {
await startMonitorAndAbortAfterStartup();

View File

@@ -1,5 +1,6 @@
import { format } from "node:util";
import { CHANNEL_APPROVAL_NATIVE_RUNTIME_CONTEXT_CAPABILITY } from "openclaw/plugin-sdk/approval-handler-adapter-runtime";
import { waitUntilAbort } from "openclaw/plugin-sdk/channel-lifecycle";
import { registerChannelRuntimeContext } from "openclaw/plugin-sdk/channel-runtime-context";
import {
GROUP_POLICY_BLOCKED_LABEL,
@@ -23,6 +24,7 @@ import {
resolveSharedMatrixClient,
} from "../client.js";
import { releaseSharedClientInstance } from "../client/shared.js";
import { isMatrixStartupAbortError } from "../startup-abort.js";
import { createMatrixThreadBindingManager } from "../thread-bindings.js";
import { registerMatrixAutoJoin } from "./auto-join.js";
import { resolveMatrixMonitorConfig } from "./config.js";
@@ -33,6 +35,9 @@ import { createMatrixInboundEventDeduper } from "./inbound-dedupe.js";
import { shouldPromoteRecentInviteRoom } from "./recent-invite.js";
import { createMatrixRoomInfoResolver } from "./room-info.js";
import { runMatrixStartupMaintenance } from "./startup.js";
import { createMatrixMonitorStatusController } from "./status.js";
import { createMatrixMonitorSyncLifecycle } from "./sync-lifecycle.js";
import { createMatrixMonitorTaskRunner } from "./task-runner.js";
export type MonitorMatrixOpts = {
runtime?: RuntimeEnv;
@@ -42,6 +47,7 @@ export type MonitorMatrixOpts = {
initialSyncLimit?: number;
replyToMode?: ReplyToMode;
accountId?: string | null;
setStatus?: (next: import("openclaw/plugin-sdk/channel-contract").ChannelAccountSnapshot) => void;
};
const DEFAULT_MEDIA_MAX_MB = 20;
@@ -140,6 +146,11 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
resolvedInitialSyncLimit === auth.initialSyncLimit
? auth
: { ...auth, initialSyncLimit: resolvedInitialSyncLimit };
const statusController = createMatrixMonitorStatusController({
accountId: auth.accountId,
baseUrl: auth.homeserver,
statusSink: opts.setStatus,
});
const client = await resolveSharedMatrixClient({
cfg,
auth: authWithLimit,
@@ -153,25 +164,32 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
auth,
env: process.env,
});
const inFlightRoomMessages = new Set<Promise<void>>();
const waitForInFlightRoomMessages = async () => {
while (inFlightRoomMessages.size > 0) {
await Promise.allSettled(Array.from(inFlightRoomMessages));
}
};
const cleanup = async () => {
const monitorTaskRunner = createMatrixMonitorTaskRunner({
logger,
logVerboseMessage,
});
const syncLifecycle = createMatrixMonitorSyncLifecycle({
client,
statusController,
isStopping: () => cleanedUp || opts.abortSignal?.aborted === true,
});
const cleanup = async (mode: "persist" | "stop" = "persist") => {
if (cleanedUp) {
return;
}
cleanedUp = true;
try {
client.stopSyncWithoutPersist();
await client.drainPendingDecryptions("matrix monitor shutdown");
await waitForInFlightRoomMessages();
if (mode === "persist") {
await client.drainPendingDecryptions("matrix monitor shutdown");
await monitorTaskRunner.waitForIdle();
}
threadBindingManager?.stop();
await inboundDeduper.stop();
await releaseSharedClientInstance(client, "persist");
await releaseSharedClientInstance(client, mode);
} finally {
syncLifecycle.dispose();
statusController.markStopped();
setActiveMatrixClient(null, auth.accountId);
}
};
@@ -294,13 +312,6 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
getMemberDisplayName,
needsRoomAliasesForConfig,
});
const trackRoomMessage = (roomId: string, event: Parameters<typeof handleRoomMessage>[1]) => {
const task = Promise.resolve(handleRoomMessage(roomId, event)).finally(() => {
inFlightRoomMessages.delete(task);
});
inFlightRoomMessages.add(task);
return task;
};
try {
threadBindingManager = await createMatrixThreadBindingManager({
@@ -337,7 +348,8 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
warnedCryptoMissingRooms,
logger,
formatNativeDependencyHint: core.system.formatNativeDependencyHint,
onRoomMessage: trackRoomMessage,
onRoomMessage: handleRoomMessage,
runDetachedTask: monitorTaskRunner.runDetachedTask,
});
// Register Matrix thread bindings before the client starts syncing so threaded
@@ -347,6 +359,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
cfg,
auth: authWithLimit,
accountId: auth.accountId,
abortSignal: opts.abortSignal,
});
logVerboseMessage("matrix: client started");
@@ -383,10 +396,11 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
writeConfigFile: async (nextCfg) => await core.config.writeConfigFile(nextCfg),
loadWebMedia: async (url, maxBytes) => await core.media.loadWebMedia(url, maxBytes),
env: process.env,
abortSignal: opts.abortSignal,
});
await new Promise<void>((resolve) => {
const stopAndResolve = async () => {
await Promise.race([
waitUntilAbort(opts.abortSignal, async () => {
try {
logVerboseMessage("matrix: stopping client");
await cleanup();
@@ -394,23 +408,15 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
logger.warn("matrix: failed during monitor shutdown cleanup", {
error: String(err),
});
} finally {
resolve();
}
};
if (opts.abortSignal?.aborted) {
void stopAndResolve();
return;
}
opts.abortSignal?.addEventListener(
"abort",
() => {
void stopAndResolve();
},
{ once: true },
);
});
}),
syncLifecycle.waitForFatalStop(),
]);
} catch (err) {
if (opts.abortSignal?.aborted === true && isMatrixStartupAbortError(err)) {
await cleanup("stop");
return;
}
await cleanup();
throw err;
}

View File

@@ -133,6 +133,7 @@ describe("runMatrixStartupMaintenance", () => {
contentType: "image/png",
fileName: "avatar.png",
})),
abortSignal: undefined,
env: {},
};
}
@@ -235,4 +236,22 @@ describe("runMatrixStartupMaintenance", () => {
{ error: "boom" },
);
});
it("aborts maintenance before later startup steps continue", async () => {
const params = createParams();
params.auth.encryption = true;
const abortController = new AbortController();
params.abortSignal = abortController.signal;
vi.mocked(deps.syncMatrixOwnProfile).mockImplementation(async () => {
abortController.abort();
return createProfileSyncResult();
});
await expect(runMatrixStartupMaintenance(params, deps)).rejects.toMatchObject({
message: "Matrix startup aborted",
name: "AbortError",
});
expect(deps.ensureMatrixStartupVerification).not.toHaveBeenCalled();
expect(deps.maybeRestoreLegacyMatrixBackup).not.toHaveBeenCalled();
});
});

View File

@@ -2,6 +2,7 @@ import type { RuntimeLogger } from "../../runtime-api.js";
import type { CoreConfig, MatrixConfig } from "../../types.js";
import type { MatrixAuth } from "../client.js";
import type { MatrixClient } from "../sdk.js";
import { isMatrixStartupAbortError, throwIfMatrixStartupAborted } from "../startup-abort.js";
type MatrixStartupClient = Pick<
MatrixClient,
@@ -66,10 +67,12 @@ export async function runMatrixStartupMaintenance(
maxBytes: number,
) => Promise<{ buffer: Buffer; contentType?: string; fileName?: string }>;
env?: NodeJS.ProcessEnv;
abortSignal?: AbortSignal;
},
deps?: MatrixStartupMaintenanceDeps,
): Promise<void> {
const runtimeDeps = deps ?? (await loadMatrixStartupMaintenanceDeps());
throwIfMatrixStartupAborted(params.abortSignal);
try {
const profileSync = await runtimeDeps.syncMatrixOwnProfile({
client: params.client,
@@ -78,6 +81,7 @@ export async function runMatrixStartupMaintenance(
avatarUrl: params.accountConfig.avatarUrl,
loadAvatarFromUrl: async (url, maxBytes) => await params.loadWebMedia(url, maxBytes),
});
throwIfMatrixStartupAborted(params.abortSignal);
if (profileSync.displayNameUpdated) {
params.logger.info(`matrix: profile display name updated for ${params.auth.userId}`);
}
@@ -94,11 +98,15 @@ export async function runMatrixStartupMaintenance(
avatarUrl: profileSync.resolvedAvatarUrl,
});
await params.writeConfigFile(updatedCfg as never);
throwIfMatrixStartupAborted(params.abortSignal);
params.logVerboseMessage(
`matrix: persisted converted avatar URL for account ${params.accountId} (${profileSync.resolvedAvatarUrl})`,
);
}
} catch (err) {
if (isMatrixStartupAbortError(err)) {
throw err;
}
params.logger.warn("matrix: failed to sync profile from config", { error: String(err) });
}
@@ -107,6 +115,7 @@ export async function runMatrixStartupMaintenance(
}
try {
throwIfMatrixStartupAborted(params.abortSignal);
const deviceHealth = runtimeDeps.summarizeMatrixDeviceHealth(
await params.client.listOwnDevices(),
);
@@ -116,18 +125,23 @@ export async function runMatrixStartupMaintenance(
);
}
} catch (err) {
if (isMatrixStartupAbortError(err)) {
throw err;
}
params.logger.debug?.("Failed to inspect matrix device hygiene (non-fatal)", {
error: String(err),
});
}
try {
throwIfMatrixStartupAborted(params.abortSignal);
const startupVerification = await runtimeDeps.ensureMatrixStartupVerification({
client: params.client,
auth: params.auth,
accountConfig: params.accountConfig,
env: params.env,
});
throwIfMatrixStartupAborted(params.abortSignal);
if (startupVerification.kind === "verified") {
params.logger.info("matrix: device is verified by its owner and ready for encrypted rooms");
} else if (
@@ -158,17 +172,22 @@ export async function runMatrixStartupMaintenance(
);
}
} catch (err) {
if (isMatrixStartupAbortError(err)) {
throw err;
}
params.logger.debug?.("Failed to resolve matrix verification status (non-fatal)", {
error: String(err),
});
}
try {
throwIfMatrixStartupAborted(params.abortSignal);
const legacyCryptoRestore = await runtimeDeps.maybeRestoreLegacyMatrixBackup({
client: params.client,
auth: params.auth,
env: params.env,
});
throwIfMatrixStartupAborted(params.abortSignal);
if (legacyCryptoRestore.kind === "restored") {
params.logger.info(
`matrix: restored ${legacyCryptoRestore.imported}/${legacyCryptoRestore.total} room key(s) from legacy encrypted-state backup`,
@@ -189,6 +208,9 @@ export async function runMatrixStartupMaintenance(
}
}
} catch (err) {
if (isMatrixStartupAbortError(err)) {
throw err;
}
params.logger.warn("matrix: failed restoring legacy encrypted-state backup", {
error: String(err),
});

View File

@@ -0,0 +1,111 @@
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract";
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
import { formatMatrixErrorMessage } from "../errors.js";
import {
isMatrixDisconnectedSyncState,
isMatrixReadySyncState,
type MatrixSyncState,
} from "../sync-state.js";
type MatrixMonitorStatusSink = (patch: ChannelAccountSnapshot) => void;
function cloneLastDisconnect(
value: ChannelAccountSnapshot["lastDisconnect"],
): ChannelAccountSnapshot["lastDisconnect"] {
if (!value || typeof value === "string") {
return value ?? null;
}
return { ...value };
}
function formatSyncError(error: unknown): string | null {
if (!error) {
return null;
}
if (error instanceof Error) {
return error.message || error.name || "unknown";
}
return formatMatrixErrorMessage(error);
}
export type MatrixMonitorStatusController = ReturnType<typeof createMatrixMonitorStatusController>;
export function createMatrixMonitorStatusController(params: {
accountId: string;
baseUrl?: string;
statusSink?: MatrixMonitorStatusSink;
}) {
const status: ChannelAccountSnapshot = {
accountId: params.accountId,
...(params.baseUrl ? { baseUrl: params.baseUrl } : {}),
connected: false,
lastConnectedAt: null,
lastDisconnect: null,
lastError: null,
healthState: "starting",
};
const emit = () => {
params.statusSink?.({
...status,
lastDisconnect: cloneLastDisconnect(status.lastDisconnect),
});
};
const noteConnected = (at = Date.now()) => {
if (status.connected === true) {
status.lastEventAt = at;
} else {
Object.assign(status, createConnectedChannelStatusPatch(at));
}
status.lastError = null;
status.lastDisconnect = null;
status.healthState = "healthy";
emit();
};
const noteDisconnected = (params: { state: MatrixSyncState; at?: number; error?: unknown }) => {
const at = params.at ?? Date.now();
const error = formatSyncError(params.error);
status.connected = false;
status.lastEventAt = at;
status.lastDisconnect = {
at,
...(error ? { error } : {}),
};
status.lastError = error;
status.healthState = params.state.toLowerCase();
emit();
};
emit();
return {
noteSyncState(state: MatrixSyncState, error?: unknown, at = Date.now()) {
if (isMatrixReadySyncState(state)) {
noteConnected(at);
return;
}
if (isMatrixDisconnectedSyncState(state)) {
noteDisconnected({ state, at, error });
return;
}
// Unknown future SDK states inherit the current connectivity bit until the
// SDK classifies them as ready or disconnected. Avoid guessing here.
status.lastEventAt = at;
status.healthState = state.toLowerCase();
emit();
},
noteUnexpectedError(error: unknown, at = Date.now()) {
noteDisconnected({ state: "ERROR", at, error });
},
markStopped(at = Date.now()) {
status.connected = false;
status.lastEventAt = at;
if (status.healthState !== "error") {
status.healthState = "stopped";
}
emit();
},
};
}

View File

@@ -0,0 +1,224 @@
import { EventEmitter } from "node:events";
import { describe, expect, it, vi } from "vitest";
import { createMatrixMonitorStatusController } from "./status.js";
import { createMatrixMonitorSyncLifecycle } from "./sync-lifecycle.js";
function createClientEmitter() {
return new EventEmitter() as unknown as {
on: (event: string, listener: (...args: unknown[]) => void) => unknown;
off: (event: string, listener: (...args: unknown[]) => void) => unknown;
emit: (event: string, ...args: unknown[]) => boolean;
};
}
describe("createMatrixMonitorSyncLifecycle", () => {
it("rejects the channel wait on unexpected sync errors", async () => {
const client = createClientEmitter();
const setStatus = vi.fn();
const lifecycle = createMatrixMonitorSyncLifecycle({
client: client as never,
statusController: createMatrixMonitorStatusController({
accountId: "default",
statusSink: setStatus,
}),
});
const waitPromise = lifecycle.waitForFatalStop();
client.emit("sync.unexpected_error", new Error("sync exploded"));
await expect(waitPromise).rejects.toThrow("sync exploded");
expect(setStatus).toHaveBeenCalledWith(
expect.objectContaining({
accountId: "default",
healthState: "error",
lastError: "sync exploded",
}),
);
});
it("ignores STOPPED emitted during intentional shutdown", async () => {
const client = createClientEmitter();
const setStatus = vi.fn();
let stopping = false;
const lifecycle = createMatrixMonitorSyncLifecycle({
client: client as never,
statusController: createMatrixMonitorStatusController({
accountId: "default",
statusSink: setStatus,
}),
isStopping: () => stopping,
});
const waitPromise = lifecycle.waitForFatalStop();
stopping = true;
client.emit("sync.state", "STOPPED", "SYNCING", undefined);
lifecycle.dispose();
await expect(waitPromise).resolves.toBeUndefined();
expect(setStatus).toHaveBeenCalledWith(
expect.objectContaining({
accountId: "default",
healthState: "stopped",
}),
);
});
it("marks unexpected STOPPED sync as an error state", async () => {
const client = createClientEmitter();
const setStatus = vi.fn();
const lifecycle = createMatrixMonitorSyncLifecycle({
client: client as never,
statusController: createMatrixMonitorStatusController({
accountId: "default",
statusSink: setStatus,
}),
});
const waitPromise = lifecycle.waitForFatalStop();
client.emit("sync.state", "STOPPED", "SYNCING", undefined);
await expect(waitPromise).rejects.toThrow("Matrix sync stopped unexpectedly");
expect(setStatus).toHaveBeenCalledWith(
expect.objectContaining({
accountId: "default",
healthState: "error",
lastError: "Matrix sync stopped unexpectedly",
}),
);
});
it("ignores unexpected sync errors emitted during intentional shutdown", async () => {
const client = createClientEmitter();
const setStatus = vi.fn();
let stopping = false;
const lifecycle = createMatrixMonitorSyncLifecycle({
client: client as never,
statusController: createMatrixMonitorStatusController({
accountId: "default",
statusSink: setStatus,
}),
isStopping: () => stopping,
});
const waitPromise = lifecycle.waitForFatalStop();
stopping = true;
client.emit("sync.unexpected_error", new Error("shutdown noise"));
lifecycle.dispose();
await expect(waitPromise).resolves.toBeUndefined();
expect(setStatus).not.toHaveBeenCalledWith(
expect.objectContaining({
accountId: "default",
healthState: "error",
}),
);
});
it("ignores non-terminal sync states emitted during intentional shutdown", async () => {
const client = createClientEmitter();
const setStatus = vi.fn();
let stopping = false;
const statusController = createMatrixMonitorStatusController({
accountId: "default",
statusSink: setStatus,
});
const lifecycle = createMatrixMonitorSyncLifecycle({
client: client as never,
statusController,
isStopping: () => stopping,
});
const waitPromise = lifecycle.waitForFatalStop();
stopping = true;
client.emit("sync.state", "ERROR", "RECONNECTING", new Error("shutdown noise"));
lifecycle.dispose();
statusController.markStopped();
await expect(waitPromise).resolves.toBeUndefined();
expect(setStatus).toHaveBeenLastCalledWith(
expect.objectContaining({
accountId: "default",
healthState: "stopped",
lastError: null,
}),
);
});
it("does not downgrade a fatal error to stopped during shutdown", async () => {
const client = createClientEmitter();
const setStatus = vi.fn();
let stopping = false;
const statusController = createMatrixMonitorStatusController({
accountId: "default",
statusSink: setStatus,
});
const lifecycle = createMatrixMonitorSyncLifecycle({
client: client as never,
statusController,
isStopping: () => stopping,
});
const waitPromise = lifecycle.waitForFatalStop();
client.emit("sync.unexpected_error", new Error("sync exploded"));
await expect(waitPromise).rejects.toThrow("sync exploded");
stopping = true;
client.emit("sync.state", "STOPPED", "SYNCING", undefined);
lifecycle.dispose();
statusController.markStopped();
expect(setStatus).toHaveBeenLastCalledWith(
expect.objectContaining({
accountId: "default",
healthState: "error",
lastError: "sync exploded",
}),
);
});
it("ignores follow-up sync states after a fatal sync error", async () => {
const client = createClientEmitter();
const setStatus = vi.fn();
const lifecycle = createMatrixMonitorSyncLifecycle({
client: client as never,
statusController: createMatrixMonitorStatusController({
accountId: "default",
statusSink: setStatus,
}),
});
const waitPromise = lifecycle.waitForFatalStop();
client.emit("sync.unexpected_error", new Error("sync exploded"));
await expect(waitPromise).rejects.toThrow("sync exploded");
client.emit("sync.state", "RECONNECTING", "SYNCING", new Error("late reconnect"));
lifecycle.dispose();
expect(setStatus).toHaveBeenLastCalledWith(
expect.objectContaining({
accountId: "default",
healthState: "error",
lastError: "sync exploded",
}),
);
});
it("rejects a second concurrent fatal-stop waiter", async () => {
const client = createClientEmitter();
const lifecycle = createMatrixMonitorSyncLifecycle({
client: client as never,
statusController: createMatrixMonitorStatusController({
accountId: "default",
}),
});
const firstWait = lifecycle.waitForFatalStop();
await expect(lifecycle.waitForFatalStop()).rejects.toThrow(
"Matrix fatal-stop wait already in progress",
);
lifecycle.dispose();
await expect(firstWait).resolves.toBeUndefined();
});
});

View File

@@ -0,0 +1,91 @@
import type { MatrixClient } from "../sdk.js";
import { isMatrixTerminalSyncState, type MatrixSyncState } from "../sync-state.js";
import type { MatrixMonitorStatusController } from "./status.js";
function formatSyncLifecycleError(state: MatrixSyncState, error?: unknown): Error {
if (error instanceof Error) {
return error;
}
const message = typeof error === "string" && error.trim() ? error.trim() : undefined;
if (state === "STOPPED") {
return new Error(message ?? "Matrix sync stopped unexpectedly");
}
if (state === "ERROR") {
return new Error(message ?? "Matrix sync entered ERROR unexpectedly");
}
return new Error(message ?? `Matrix sync entered ${state} unexpectedly`);
}
export function createMatrixMonitorSyncLifecycle(params: {
client: MatrixClient;
statusController: MatrixMonitorStatusController;
isStopping?: () => boolean;
}) {
let fatalError: Error | null = null;
let resolveFatalWait: (() => void) | null = null;
let rejectFatalWait: ((error: Error) => void) | null = null;
const settleFatal = (error: Error) => {
if (fatalError) {
return;
}
fatalError = error;
rejectFatalWait?.(error);
resolveFatalWait = null;
rejectFatalWait = null;
};
const onSyncState = (state: MatrixSyncState, _prevState: string | null, error?: unknown) => {
if (isMatrixTerminalSyncState(state) && !params.isStopping?.()) {
const fatalError = formatSyncLifecycleError(state, error);
params.statusController.noteUnexpectedError(fatalError);
settleFatal(fatalError);
return;
}
// Fatal sync failures are sticky for telemetry; later SDK state churn during
// cleanup or reconnect should not overwrite the first recorded error.
if (fatalError) {
return;
}
// Operator-initiated shutdown can still emit transient sync states before
// the final STOPPED. Ignore that churn so intentional stops do not look
// like runtime failures.
if (params.isStopping?.() && !isMatrixTerminalSyncState(state)) {
return;
}
params.statusController.noteSyncState(state, error);
};
const onUnexpectedError = (error: Error) => {
if (params.isStopping?.()) {
return;
}
params.statusController.noteUnexpectedError(error);
settleFatal(error);
};
params.client.on("sync.state", onSyncState);
params.client.on("sync.unexpected_error", onUnexpectedError);
return {
async waitForFatalStop(): Promise<void> {
if (fatalError) {
throw fatalError;
}
if (resolveFatalWait || rejectFatalWait) {
throw new Error("Matrix fatal-stop wait already in progress");
}
await new Promise<void>((resolve, reject) => {
resolveFatalWait = resolve;
rejectFatalWait = (error) => reject(error);
});
},
dispose() {
resolveFatalWait?.();
resolveFatalWait = null;
rejectFatalWait = null;
params.client.off("sync.state", onSyncState);
params.client.off("sync.unexpected_error", onUnexpectedError);
},
};
}

View File

@@ -0,0 +1,38 @@
import type { RuntimeLogger } from "../../runtime-api.js";
export function createMatrixMonitorTaskRunner(params: {
logger: RuntimeLogger;
logVerboseMessage: (message: string) => void;
}) {
const inFlight = new Set<Promise<void>>();
const runDetachedTask = (label: string, task: () => Promise<void>): Promise<void> => {
let trackedTask!: Promise<void>;
trackedTask = Promise.resolve()
.then(task)
.catch((error) => {
const message = String(error);
params.logVerboseMessage(`matrix: ${label} failed (${message})`);
params.logger.warn("matrix background task failed", {
task: label,
error: message,
});
})
.finally(() => {
inFlight.delete(trackedTask);
});
inFlight.add(trackedTask);
return trackedTask;
};
const waitForIdle = async (): Promise<void> => {
while (inFlight.size > 0) {
await Promise.allSettled(Array.from(inFlight));
}
};
return {
runDetachedTask,
waitForIdle,
};
}

View File

@@ -114,7 +114,11 @@ type MatrixJsClientStub = EventEmitter & {
function createMatrixJsClientStub(): MatrixJsClientStub {
const client = new EventEmitter() as MatrixJsClientStub;
client.startClient = vi.fn(async () => {});
client.startClient = vi.fn(async () => {
queueMicrotask(() => {
client.emit("sync", "PREPARED", null, undefined);
});
});
client.stopClient = vi.fn();
client.initRustCrypto = vi.fn(async () => {});
client.getUserId = vi.fn(() => "@bot:example.org");
@@ -182,7 +186,12 @@ vi.mock("matrix-js-sdk/lib/matrix.js", async () => {
);
return {
...actual,
ClientEvent: { Event: "event", Room: "Room" },
ClientEvent: {
Event: "event",
Room: "Room",
Sync: "sync",
SyncUnexpectedError: "sync.unexpectedError",
},
MatrixEventEvent: { Decrypted: "decrypted" },
createClient: vi.fn((opts: Record<string, unknown>) => {
lastCreateClientOpts = opts;
@@ -947,6 +956,150 @@ describe("MatrixClient event bridge", () => {
expect(invites).toEqual(["!invite:example.org"]);
});
it("waits for a ready sync state before resolving startup", async () => {
let releaseSyncReady: (() => void) | undefined;
matrixJsClient.startClient = vi.fn(async () => {
await new Promise<void>((resolve) => {
releaseSyncReady = () => {
matrixJsClient.emit("sync", "PREPARED", null, undefined);
resolve();
};
});
});
const client = new MatrixClient("https://matrix.example.org", "token");
let resolved = false;
const startPromise = client.start().then(() => {
resolved = true;
});
await vi.waitFor(() => {
expect(releaseSyncReady).toEqual(expect.any(Function));
});
expect(resolved).toBe(false);
releaseSyncReady?.();
await startPromise;
expect(resolved).toBe(true);
});
it("rejects startup when sync reports an unexpected error before ready", async () => {
matrixJsClient.startClient = vi.fn(async () => {
const timer = setTimeout(() => {
matrixJsClient.emit("sync.unexpectedError", new Error("sync exploded"));
}, 0);
timer.unref?.();
});
const client = new MatrixClient("https://matrix.example.org", "token");
await expect(client.start()).rejects.toThrow("sync exploded");
});
it("allows transient startup ERROR to recover into PREPARED", async () => {
matrixJsClient.startClient = vi.fn(async () => {
queueMicrotask(() => {
matrixJsClient.emit("sync", "ERROR", null, new Error("temporary outage"));
queueMicrotask(() => {
matrixJsClient.emit("sync", "PREPARED", "ERROR", undefined);
});
});
});
const client = new MatrixClient("https://matrix.example.org", "token");
await expect(client.start()).resolves.toBeUndefined();
});
it("aborts startup when the readiness wait is canceled", async () => {
matrixJsClient.startClient = vi.fn(async () => {});
const abortController = new AbortController();
const client = new MatrixClient("https://matrix.example.org", "token");
const startPromise = client.start({ abortSignal: abortController.signal });
abortController.abort();
await expect(startPromise).rejects.toMatchObject({
message: "Matrix startup aborted",
name: "AbortError",
});
});
it("aborts before post-ready startup work when shutdown races ready sync", async () => {
matrixJsClient.startClient = vi.fn(async () => {
queueMicrotask(() => {
matrixJsClient.emit("sync", "PREPARED", null, undefined);
});
});
const abortController = new AbortController();
const client = new MatrixClient("https://matrix.example.org", "token");
const bootstrapCryptoSpy = vi.spyOn(
client as unknown as { bootstrapCryptoIfNeeded: () => Promise<void> },
"bootstrapCryptoIfNeeded",
);
bootstrapCryptoSpy.mockImplementation(async () => {});
client.on("sync.state", (state) => {
if (state === "PREPARED") {
abortController.abort();
}
});
await expect(client.start({ abortSignal: abortController.signal })).rejects.toMatchObject({
message: "Matrix startup aborted",
name: "AbortError",
});
expect(bootstrapCryptoSpy).not.toHaveBeenCalled();
});
it("times out startup when no ready sync state arrives", async () => {
vi.useFakeTimers();
matrixJsClient.startClient = vi.fn(async () => {});
const client = new MatrixClient("https://matrix.example.org", "token");
const startPromise = client.start();
const startExpectation = expect(startPromise).rejects.toThrow(
"Matrix client did not reach a ready sync state within 30000ms",
);
await vi.advanceTimersByTimeAsync(30_000);
await startExpectation;
});
it("clears stale sync state before a restarted sync session waits for fresh readiness", async () => {
matrixJsClient.startClient = vi
.fn(async () => {
queueMicrotask(() => {
matrixJsClient.emit("sync", "PREPARED", null, undefined);
});
})
.mockImplementationOnce(async () => {
queueMicrotask(() => {
matrixJsClient.emit("sync", "PREPARED", null, undefined);
});
})
.mockImplementationOnce(async () => {});
const client = new MatrixClient("https://matrix.example.org", "token");
await client.start();
client.stopSyncWithoutPersist();
vi.useFakeTimers();
const restartPromise = client.start();
const restartExpectation = expect(restartPromise).rejects.toThrow(
"Matrix client did not reach a ready sync state within 30000ms",
);
await vi.advanceTimersByTimeAsync(30_000);
await restartExpectation;
});
it("replays outstanding invite rooms at startup", async () => {
matrixJsClient.getRooms = vi.fn(() => [
{

View File

@@ -44,6 +44,12 @@ import type {
MessageEventContent,
} from "./sdk/types.js";
import type { MatrixVerificationSummary } from "./sdk/verification-manager.js";
import { createMatrixStartupAbortError, throwIfMatrixStartupAborted } from "./startup-abort.js";
import {
isMatrixReadySyncState,
isMatrixTerminalSyncState,
type MatrixSyncState,
} from "./sync-state.js";
export { ConsoleLogger, LogService };
export type {
@@ -221,6 +227,7 @@ export class MatrixClient {
private readonly autoBootstrapCrypto: boolean;
private stopPersistPromise: Promise<void> | null = null;
private verificationSummaryListenerBound = false;
private currentSyncState: MatrixSyncState | null = null;
readonly dms = {
update: async (): Promise<boolean> => {
@@ -367,25 +374,128 @@ export class MatrixClient {
}
}
async start(): Promise<void> {
await this.startSyncSession({ bootstrapCrypto: true });
async start(opts: { abortSignal?: AbortSignal; readyTimeoutMs?: number } = {}): Promise<void> {
await this.startSyncSession({
bootstrapCrypto: true,
abortSignal: opts.abortSignal,
readyTimeoutMs: opts.readyTimeoutMs,
});
}
private async startSyncSession(opts: { bootstrapCrypto: boolean }): Promise<void> {
private async waitForInitialSyncReady(
params: {
timeoutMs?: number;
abortSignal?: AbortSignal;
} = {},
): Promise<void> {
const timeoutMs = params.timeoutMs ?? 30_000;
if (isMatrixReadySyncState(this.currentSyncState)) {
return;
}
if (isMatrixTerminalSyncState(this.currentSyncState)) {
throw new Error(`Matrix sync entered ${this.currentSyncState} during startup`);
}
await new Promise<void>((resolve, reject) => {
let settled = false;
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const abortSignal = params.abortSignal;
const cleanup = () => {
this.off("sync.state", onSyncState);
this.off("sync.unexpected_error", onUnexpectedError);
abortSignal?.removeEventListener("abort", onAbort);
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = undefined;
}
};
const settleResolve = () => {
if (settled) {
return;
}
settled = true;
cleanup();
resolve();
};
const settleReject = (error: Error) => {
if (settled) {
return;
}
settled = true;
cleanup();
reject(error);
};
const onSyncState = (state: MatrixSyncState, _prevState: string | null, error?: unknown) => {
if (isMatrixReadySyncState(state)) {
settleResolve();
return;
}
if (isMatrixTerminalSyncState(state)) {
settleReject(
new Error(
error instanceof Error && error.message
? error.message
: `Matrix sync entered ${state} during startup`,
),
);
}
};
const onUnexpectedError = (error: Error) => {
settleReject(error);
};
const onAbort = () => {
settleReject(createMatrixStartupAbortError());
};
this.on("sync.state", onSyncState);
this.on("sync.unexpected_error", onUnexpectedError);
if (abortSignal?.aborted) {
onAbort();
return;
}
abortSignal?.addEventListener("abort", onAbort, { once: true });
timeoutId = setTimeout(() => {
settleReject(
new Error(`Matrix client did not reach a ready sync state within ${timeoutMs}ms`),
);
}, timeoutMs);
timeoutId.unref?.();
});
}
private async startSyncSession(opts: {
bootstrapCrypto: boolean;
abortSignal?: AbortSignal;
readyTimeoutMs?: number;
}): Promise<void> {
if (this.started) {
return;
}
throwIfMatrixStartupAborted(opts.abortSignal);
await this.ensureCryptoSupportInitialized();
throwIfMatrixStartupAborted(opts.abortSignal);
this.registerBridge();
await this.initializeCryptoIfNeeded();
await this.initializeCryptoIfNeeded(opts.abortSignal);
await this.client.startClient({
initialSyncLimit: this.initialSyncLimit,
});
await this.waitForInitialSyncReady({
abortSignal: opts.abortSignal,
timeoutMs: opts.readyTimeoutMs,
});
throwIfMatrixStartupAborted(opts.abortSignal);
if (opts.bootstrapCrypto && this.autoBootstrapCrypto) {
await this.bootstrapCryptoIfNeeded();
await this.bootstrapCryptoIfNeeded(opts.abortSignal);
}
throwIfMatrixStartupAborted(opts.abortSignal);
this.started = true;
this.emitOutstandingInviteEvents();
await this.refreshDmCache().catch(noop);
@@ -426,6 +536,7 @@ export class MatrixClient {
clearInterval(this.idbPersistTimer);
this.idbPersistTimer = null;
}
this.currentSyncState = null;
this.client.stopClient();
this.started = false;
}
@@ -469,10 +580,11 @@ export class MatrixClient {
await this.stopPersistPromise;
}
private async bootstrapCryptoIfNeeded(): Promise<void> {
private async bootstrapCryptoIfNeeded(abortSignal?: AbortSignal): Promise<void> {
if (!this.encryptionEnabled || !this.cryptoInitialized || this.cryptoBootstrapped) {
return;
}
throwIfMatrixStartupAborted(abortSignal);
await this.ensureCryptoSupportInitialized();
const crypto = this.client.getCrypto() as MatrixCryptoBootstrapApi | undefined;
if (!crypto) {
@@ -486,6 +598,7 @@ export class MatrixClient {
crypto,
MATRIX_INITIAL_CRYPTO_BOOTSTRAP_OPTIONS,
);
throwIfMatrixStartupAborted(abortSignal);
if (!initial.crossSigningPublished || initial.ownDeviceVerified === false) {
const status = await this.getOwnDeviceVerificationStatus();
if (status.signedByOwner) {
@@ -503,6 +616,7 @@ export class MatrixClient {
crypto,
MATRIX_AUTOMATIC_REPAIR_BOOTSTRAP_OPTIONS,
);
throwIfMatrixStartupAborted(abortSignal);
if (repaired.crossSigningPublished && repaired.ownDeviceVerified !== false) {
LogService.info(
"MatrixClientLite",
@@ -526,26 +640,30 @@ export class MatrixClient {
this.cryptoBootstrapped = true;
}
private async initializeCryptoIfNeeded(): Promise<void> {
private async initializeCryptoIfNeeded(abortSignal?: AbortSignal): Promise<void> {
if (!this.encryptionEnabled || this.cryptoInitialized) {
return;
}
throwIfMatrixStartupAborted(abortSignal);
const { persistIdbToDisk, restoreIdbFromDisk } = await loadMatrixCryptoRuntime();
// Restore persisted IndexedDB crypto store before initializing WASM crypto.
await restoreIdbFromDisk(this.idbSnapshotPath);
throwIfMatrixStartupAborted(abortSignal);
try {
await this.client.initRustCrypto({
cryptoDatabasePrefix: this.cryptoDatabasePrefix,
});
this.cryptoInitialized = true;
throwIfMatrixStartupAborted(abortSignal);
// Persist the crypto store after successful init (captures fresh keys on first run).
await persistIdbToDisk({
snapshotPath: this.idbSnapshotPath,
databasePrefix: this.cryptoDatabasePrefix,
});
throwIfMatrixStartupAborted(abortSignal);
// Periodically persist to capture new Olm sessions and room keys.
this.idbPersistTimer = setInterval(() => {
@@ -1587,6 +1705,20 @@ export class MatrixClient {
this.client.on(ClientEvent.Room, (room) => {
this.emitMembershipForRoom(room);
});
this.client.on(
ClientEvent.Sync,
(state: MatrixSyncState, prevState: string | null, data?: unknown) => {
this.currentSyncState = state;
const error =
data && typeof data === "object" && "error" in data
? (data as { error?: unknown }).error
: undefined;
this.emitter.emit("sync.state", state, prevState, error);
},
);
this.client.on(ClientEvent.SyncUnexpectedError, (error: Error) => {
this.emitter.emit("sync.unexpected_error", error);
});
}
private emitMembershipForRoom(room: unknown): void {

View File

@@ -1,3 +1,4 @@
import type { MatrixSyncState } from "../sync-state.js";
import type {
MatrixVerificationRequestLike,
MatrixVerificationSummary,
@@ -31,6 +32,8 @@ export type MatrixClientEventMap = {
"room.failed_decryption": [roomId: string, event: MatrixRawEvent, error: Error];
"room.invite": [roomId: string, event: MatrixRawEvent];
"room.join": [roomId: string, event: MatrixRawEvent];
"sync.state": [state: MatrixSyncState, prevState: string | null, error?: unknown];
"sync.unexpected_error": [error: Error];
"verification.summary": [summary: MatrixVerificationSummary];
};

View File

@@ -0,0 +1,44 @@
export function createMatrixStartupAbortError(): Error {
const error = new Error("Matrix startup aborted");
error.name = "AbortError";
return error;
}
export function throwIfMatrixStartupAborted(abortSignal?: AbortSignal): void {
if (abortSignal?.aborted === true) {
throw createMatrixStartupAbortError();
}
}
export function isMatrixStartupAbortError(error: unknown): boolean {
return error instanceof Error && error.name === "AbortError";
}
export async function awaitMatrixStartupWithAbort<T>(
promise: Promise<T>,
abortSignal?: AbortSignal,
): Promise<T> {
if (!abortSignal) {
return await promise;
}
if (abortSignal.aborted) {
throw createMatrixStartupAbortError();
}
return await new Promise<T>((resolve, reject) => {
const onAbort = () => {
abortSignal.removeEventListener("abort", onAbort);
reject(createMatrixStartupAbortError());
};
abortSignal.addEventListener("abort", onAbort, { once: true });
promise.then(
(value) => {
abortSignal.removeEventListener("abort", onAbort);
resolve(value);
},
(error) => {
abortSignal.removeEventListener("abort", onAbort);
reject(error);
},
);
});
}

View File

@@ -0,0 +1,27 @@
export type MatrixSyncState =
| "PREPARED"
| "SYNCING"
| "CATCHUP"
| "RECONNECTING"
| "ERROR"
| "STOPPED"
| (string & {});
export function isMatrixReadySyncState(
state: MatrixSyncState | null | undefined,
): state is "PREPARED" | "SYNCING" | "CATCHUP" {
return state === "PREPARED" || state === "SYNCING" || state === "CATCHUP";
}
export function isMatrixDisconnectedSyncState(
state: MatrixSyncState | null | undefined,
): state is "RECONNECTING" | "ERROR" | "STOPPED" {
return state === "RECONNECTING" || state === "ERROR" || state === "STOPPED";
}
export function isMatrixTerminalSyncState(
state: MatrixSyncState | null | undefined,
): state is "STOPPED" {
// matrix-js-sdk can recover from ERROR to PREPARED during initial sync.
return state === "STOPPED";
}