mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 01:50:19 +00:00
Matrix: report startup failures as errors
This commit is contained in:
@@ -54,6 +54,7 @@ const hoisted = vi.hoisted(() => {
|
||||
flush: vi.fn(async () => undefined),
|
||||
stop: vi.fn(async () => undefined),
|
||||
};
|
||||
const createMatrixInboundEventDeduper = vi.fn(async () => inboundDeduper);
|
||||
const client = Object.assign(createEmitter(), {
|
||||
id: "matrix-client",
|
||||
hasPersistedSyncState: vi.fn(() => false),
|
||||
@@ -110,6 +111,7 @@ const hoisted = vi.hoisted(() => {
|
||||
accountConfig,
|
||||
client,
|
||||
createDirectRoomTracker,
|
||||
createMatrixInboundEventDeduper,
|
||||
createMatrixRoomMessageHandler,
|
||||
getMemberDisplayName,
|
||||
getRoomInfo,
|
||||
@@ -356,7 +358,7 @@ vi.mock("./handler.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("./inbound-dedupe.js", () => ({
|
||||
createMatrixInboundEventDeduper: vi.fn(async () => hoisted.inboundDeduper),
|
||||
createMatrixInboundEventDeduper: hoisted.createMatrixInboundEventDeduper,
|
||||
}));
|
||||
|
||||
vi.mock("./legacy-crypto-restore.js", () => ({
|
||||
@@ -438,6 +440,7 @@ describe("monitorMatrixProvider", () => {
|
||||
hoisted.inboundDeduper.releaseEvent.mockReset();
|
||||
hoisted.inboundDeduper.flush.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.inboundDeduper.stop.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.createMatrixInboundEventDeduper.mockReset().mockResolvedValue(hoisted.inboundDeduper);
|
||||
hoisted.backfillMatrixAuthDeviceIdAfterStartup.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.runMatrixStartupMaintenance.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn());
|
||||
@@ -563,6 +566,55 @@ describe("monitorMatrixProvider", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("marks early startup failures as error before the monitor loop starts", async () => {
|
||||
hoisted.resolveSharedMatrixClient.mockImplementation(
|
||||
async (params: { startClient?: boolean }) => {
|
||||
if (params.startClient === false) {
|
||||
throw new Error("prepare failed");
|
||||
}
|
||||
hoisted.callOrder.push("start-client");
|
||||
return hoisted.client;
|
||||
},
|
||||
);
|
||||
|
||||
await expect(
|
||||
monitorMatrixProvider({
|
||||
setStatus: hoisted.setStatus,
|
||||
}),
|
||||
).rejects.toThrow("prepare failed");
|
||||
|
||||
expect(hoisted.releaseSharedClientInstance).not.toHaveBeenCalled();
|
||||
expect(hoisted.setStatus).toHaveBeenLastCalledWith(
|
||||
expect.objectContaining({
|
||||
accountId: "default",
|
||||
connected: false,
|
||||
healthState: "error",
|
||||
lastError: "prepare failed",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("releases the prepared client when startup fails before later resources exist", async () => {
|
||||
hoisted.createMatrixInboundEventDeduper.mockRejectedValue(new Error("deduper failed"));
|
||||
|
||||
await expect(
|
||||
monitorMatrixProvider({
|
||||
setStatus: hoisted.setStatus,
|
||||
}),
|
||||
).rejects.toThrow("deduper failed");
|
||||
|
||||
expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledWith(hoisted.client, "persist");
|
||||
expect(hoisted.inboundDeduper.stop).not.toHaveBeenCalled();
|
||||
expect(hoisted.setStatus).toHaveBeenLastCalledWith(
|
||||
expect.objectContaining({
|
||||
accountId: "default",
|
||||
connected: false,
|
||||
healthState: "error",
|
||||
lastError: "deduper failed",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("aborts stalled startup promptly and releases the shared client without persist", async () => {
|
||||
const abortController = new AbortController();
|
||||
hoisted.resolveSharedMatrixClient.mockImplementation(
|
||||
|
||||
@@ -24,6 +24,7 @@ import {
|
||||
resolveSharedMatrixClient,
|
||||
} from "../client.js";
|
||||
import { releaseSharedClientInstance } from "../client/shared.js";
|
||||
import type { MatrixClient } from "../sdk.js";
|
||||
import { isMatrixStartupAbortError } from "../startup-abort.js";
|
||||
import { createMatrixThreadBindingManager } from "../thread-bindings.js";
|
||||
import { registerMatrixAutoJoin } from "./auto-join.js";
|
||||
@@ -31,7 +32,10 @@ import { resolveMatrixMonitorConfig } from "./config.js";
|
||||
import { createDirectRoomTracker } from "./direct.js";
|
||||
import { registerMatrixMonitorEvents } from "./events.js";
|
||||
import { createMatrixRoomMessageHandler } from "./handler.js";
|
||||
import { createMatrixInboundEventDeduper } from "./inbound-dedupe.js";
|
||||
import {
|
||||
createMatrixInboundEventDeduper,
|
||||
type MatrixInboundEventDeduper,
|
||||
} from "./inbound-dedupe.js";
|
||||
import { shouldPromoteRecentInviteRoom } from "./recent-invite.js";
|
||||
import { createMatrixRoomInfoResolver } from "./room-info.js";
|
||||
import { runMatrixStartupMaintenance } from "./startup.js";
|
||||
@@ -151,44 +155,35 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
baseUrl: auth.homeserver,
|
||||
statusSink: opts.setStatus,
|
||||
});
|
||||
const client = await resolveSharedMatrixClient({
|
||||
cfg,
|
||||
auth: authWithLimit,
|
||||
startClient: false,
|
||||
accountId: auth.accountId,
|
||||
});
|
||||
setActiveMatrixClient(client, auth.accountId);
|
||||
let cleanedUp = false;
|
||||
let client: MatrixClient | null = null;
|
||||
let threadBindingManager: { accountId: string; stop: () => void } | null = null;
|
||||
const inboundDeduper = await createMatrixInboundEventDeduper({
|
||||
auth,
|
||||
env: process.env,
|
||||
});
|
||||
let inboundDeduper: MatrixInboundEventDeduper | null = null;
|
||||
const monitorTaskRunner = createMatrixMonitorTaskRunner({
|
||||
logger,
|
||||
logVerboseMessage,
|
||||
});
|
||||
const syncLifecycle = createMatrixMonitorSyncLifecycle({
|
||||
client,
|
||||
statusController,
|
||||
isStopping: () => cleanedUp || opts.abortSignal?.aborted === true,
|
||||
});
|
||||
let syncLifecycle: ReturnType<typeof createMatrixMonitorSyncLifecycle> | null = null;
|
||||
const cleanup = async (mode: "persist" | "stop" = "persist") => {
|
||||
if (cleanedUp) {
|
||||
return;
|
||||
}
|
||||
cleanedUp = true;
|
||||
try {
|
||||
client.stopSyncWithoutPersist();
|
||||
if (mode === "persist") {
|
||||
client?.stopSyncWithoutPersist();
|
||||
if (client && mode === "persist") {
|
||||
await client.drainPendingDecryptions("matrix monitor shutdown");
|
||||
}
|
||||
if (mode === "persist") {
|
||||
await monitorTaskRunner.waitForIdle();
|
||||
}
|
||||
threadBindingManager?.stop();
|
||||
await inboundDeduper.stop();
|
||||
await releaseSharedClientInstance(client, mode);
|
||||
await inboundDeduper?.stop();
|
||||
if (client) {
|
||||
await releaseSharedClientInstance(client, mode);
|
||||
}
|
||||
} finally {
|
||||
syncLifecycle.dispose();
|
||||
syncLifecycle?.dispose();
|
||||
statusController.markStopped();
|
||||
setActiveMatrixClient(null, auth.accountId);
|
||||
}
|
||||
@@ -243,77 +238,92 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
const blockStreamingEnabled = accountConfig.blockStreaming === true;
|
||||
const startupMs = Date.now();
|
||||
const startupGraceMs = 0;
|
||||
// Cold starts should ignore old room history, but once we have a persisted
|
||||
// /sync cursor we want restart backlogs to replay just like other channels.
|
||||
const dropPreStartupMessages = !client.hasPersistedSyncState();
|
||||
const { getRoomInfo, getMemberDisplayName } = createMatrixRoomInfoResolver(client);
|
||||
const directTracker = createDirectRoomTracker(client, {
|
||||
log: logVerboseMessage,
|
||||
canPromoteRecentInvite: async (roomId) =>
|
||||
shouldPromoteRecentInviteRoom({
|
||||
roomId,
|
||||
roomInfo: await getRoomInfo(roomId, { includeAliases: true }),
|
||||
rooms: roomsConfig,
|
||||
}),
|
||||
shouldKeepLocallyPromotedDirectRoom: async (roomId) => {
|
||||
try {
|
||||
const roomInfo = await getRoomInfo(roomId, { includeAliases: true });
|
||||
if (!roomInfo.nameResolved || !roomInfo.aliasesResolved) {
|
||||
return undefined;
|
||||
}
|
||||
return shouldPromoteRecentInviteRoom({
|
||||
roomId,
|
||||
roomInfo,
|
||||
rooms: roomsConfig,
|
||||
});
|
||||
} catch (err) {
|
||||
logVerboseMessage(
|
||||
`matrix: local promotion revalidation failed room=${roomId} (${String(err)})`,
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
},
|
||||
});
|
||||
registerMatrixAutoJoin({ client, accountConfig, runtime });
|
||||
const warnedEncryptedRooms = new Set<string>();
|
||||
const warnedCryptoMissingRooms = new Set<string>();
|
||||
|
||||
const handleRoomMessage = createMatrixRoomMessageHandler({
|
||||
client,
|
||||
core,
|
||||
cfg,
|
||||
accountId: effectiveAccountId,
|
||||
runtime,
|
||||
logger,
|
||||
logVerboseMessage,
|
||||
allowFrom,
|
||||
groupAllowFrom,
|
||||
roomsConfig,
|
||||
accountAllowBots,
|
||||
configuredBotUserIds,
|
||||
groupPolicy,
|
||||
replyToMode,
|
||||
threadReplies,
|
||||
dmThreadReplies,
|
||||
dmSessionScope,
|
||||
streaming,
|
||||
blockStreamingEnabled,
|
||||
dmEnabled,
|
||||
dmPolicy,
|
||||
textLimit,
|
||||
mediaMaxBytes,
|
||||
historyLimit,
|
||||
startupMs,
|
||||
startupGraceMs,
|
||||
dropPreStartupMessages,
|
||||
inboundDeduper,
|
||||
directTracker,
|
||||
getRoomInfo,
|
||||
getMemberDisplayName,
|
||||
needsRoomAliasesForConfig,
|
||||
});
|
||||
|
||||
try {
|
||||
client = await resolveSharedMatrixClient({
|
||||
cfg,
|
||||
auth: authWithLimit,
|
||||
startClient: false,
|
||||
accountId: auth.accountId,
|
||||
});
|
||||
setActiveMatrixClient(client, auth.accountId);
|
||||
inboundDeduper = await createMatrixInboundEventDeduper({
|
||||
auth,
|
||||
env: process.env,
|
||||
});
|
||||
syncLifecycle = createMatrixMonitorSyncLifecycle({
|
||||
client,
|
||||
statusController,
|
||||
isStopping: () => cleanedUp || opts.abortSignal?.aborted === true,
|
||||
});
|
||||
// Cold starts should ignore old room history, but once we have a persisted
|
||||
// /sync cursor we want restart backlogs to replay just like other channels.
|
||||
const dropPreStartupMessages = !client.hasPersistedSyncState();
|
||||
const { getRoomInfo, getMemberDisplayName } = createMatrixRoomInfoResolver(client);
|
||||
const directTracker = createDirectRoomTracker(client, {
|
||||
log: logVerboseMessage,
|
||||
canPromoteRecentInvite: async (roomId) =>
|
||||
shouldPromoteRecentInviteRoom({
|
||||
roomId,
|
||||
roomInfo: await getRoomInfo(roomId, { includeAliases: true }),
|
||||
rooms: roomsConfig,
|
||||
}),
|
||||
shouldKeepLocallyPromotedDirectRoom: async (roomId) => {
|
||||
try {
|
||||
const roomInfo = await getRoomInfo(roomId, { includeAliases: true });
|
||||
if (!roomInfo.nameResolved || !roomInfo.aliasesResolved) {
|
||||
return undefined;
|
||||
}
|
||||
return shouldPromoteRecentInviteRoom({
|
||||
roomId,
|
||||
roomInfo,
|
||||
rooms: roomsConfig,
|
||||
});
|
||||
} catch (err) {
|
||||
logVerboseMessage(
|
||||
`matrix: local promotion revalidation failed room=${roomId} (${String(err)})`,
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
},
|
||||
});
|
||||
registerMatrixAutoJoin({ client, accountConfig, runtime });
|
||||
const handleRoomMessage = createMatrixRoomMessageHandler({
|
||||
client,
|
||||
core,
|
||||
cfg,
|
||||
accountId: effectiveAccountId,
|
||||
runtime,
|
||||
logger,
|
||||
logVerboseMessage,
|
||||
allowFrom,
|
||||
groupAllowFrom,
|
||||
roomsConfig,
|
||||
accountAllowBots,
|
||||
configuredBotUserIds,
|
||||
groupPolicy,
|
||||
replyToMode,
|
||||
threadReplies,
|
||||
dmThreadReplies,
|
||||
dmSessionScope,
|
||||
streaming,
|
||||
blockStreamingEnabled,
|
||||
dmEnabled,
|
||||
dmPolicy,
|
||||
textLimit,
|
||||
mediaMaxBytes,
|
||||
historyLimit,
|
||||
startupMs,
|
||||
startupGraceMs,
|
||||
dropPreStartupMessages,
|
||||
inboundDeduper,
|
||||
directTracker,
|
||||
getRoomInfo,
|
||||
getMemberDisplayName,
|
||||
needsRoomAliasesForConfig,
|
||||
});
|
||||
threadBindingManager = await createMatrixThreadBindingManager({
|
||||
accountId: effectiveAccountId,
|
||||
auth,
|
||||
@@ -417,6 +427,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
await cleanup("stop");
|
||||
return;
|
||||
}
|
||||
statusController.noteUnexpectedError(err);
|
||||
await cleanup();
|
||||
throw err;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user