mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-14 18:51:04 +00:00
fix(matrix): abort startup maintenance promptly
This commit is contained in:
@@ -100,6 +100,9 @@ const hoisted = vi.hoisted(() => {
|
||||
const setActiveMatrixClient = vi.fn();
|
||||
const setMatrixRuntime = vi.fn();
|
||||
const backfillMatrixAuthDeviceIdAfterStartup = vi.fn(async () => undefined);
|
||||
const runMatrixStartupMaintenance = vi.fn<
|
||||
(params: { abortSignal?: AbortSignal }) => Promise<void>
|
||||
>(async () => undefined);
|
||||
const setStatus = vi.fn();
|
||||
return {
|
||||
backfillMatrixAuthDeviceIdAfterStartup,
|
||||
@@ -116,6 +119,7 @@ const hoisted = vi.hoisted(() => {
|
||||
releaseSharedClientInstance,
|
||||
resolveSharedMatrixClient,
|
||||
resolveTextChunkLimit,
|
||||
runMatrixStartupMaintenance,
|
||||
setActiveMatrixClient,
|
||||
setMatrixRuntime,
|
||||
setStatus,
|
||||
@@ -370,6 +374,10 @@ vi.mock("./startup-verification.js", () => ({
|
||||
ensureMatrixStartupVerification: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./startup.js", () => ({
|
||||
runMatrixStartupMaintenance: hoisted.runMatrixStartupMaintenance,
|
||||
}));
|
||||
|
||||
let monitorMatrixProvider: typeof import("./index.js").monitorMatrixProvider;
|
||||
|
||||
describe("monitorMatrixProvider", () => {
|
||||
@@ -431,6 +439,7 @@ describe("monitorMatrixProvider", () => {
|
||||
hoisted.inboundDeduper.flush.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.inboundDeduper.stop.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.backfillMatrixAuthDeviceIdAfterStartup.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.runMatrixStartupMaintenance.mockReset().mockResolvedValue(undefined);
|
||||
hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn());
|
||||
hoisted.setStatus.mockReset();
|
||||
Object.values(hoisted.logger).forEach((mock) => mock.mockReset());
|
||||
@@ -590,6 +599,36 @@ describe("monitorMatrixProvider", () => {
|
||||
expect(hoisted.client.drainPendingDecryptions).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("aborts during startup maintenance and releases the shared client without persist", async () => {
|
||||
const abortController = new AbortController();
|
||||
hoisted.runMatrixStartupMaintenance.mockImplementation(
|
||||
async (params: { abortSignal?: AbortSignal }) =>
|
||||
await new Promise<void>((_resolve, reject) => {
|
||||
params.abortSignal?.addEventListener(
|
||||
"abort",
|
||||
() => {
|
||||
const error = new Error("Matrix startup aborted");
|
||||
error.name = "AbortError";
|
||||
reject(error);
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
}),
|
||||
);
|
||||
|
||||
const monitorPromise = monitorMatrixProvider({ abortSignal: abortController.signal });
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(hoisted.runMatrixStartupMaintenance).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
abortController.abort();
|
||||
|
||||
await expect(monitorPromise).resolves.toBeUndefined();
|
||||
expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledWith(hoisted.client, "stop");
|
||||
expect(hoisted.client.drainPendingDecryptions).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("registers Matrix thread bindings before starting the client", async () => {
|
||||
await startMonitorAndAbortAfterStartup();
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ import {
|
||||
resolveSharedMatrixClient,
|
||||
} from "../client.js";
|
||||
import { releaseSharedClientInstance } from "../client/shared.js";
|
||||
import { isMatrixStartupAbortError } from "../startup-abort.js";
|
||||
import { createMatrixThreadBindingManager } from "../thread-bindings.js";
|
||||
import { registerMatrixAutoJoin } from "./auto-join.js";
|
||||
import { resolveMatrixMonitorConfig } from "./config.js";
|
||||
@@ -395,6 +396,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
writeConfigFile: async (nextCfg) => await core.config.writeConfigFile(nextCfg),
|
||||
loadWebMedia: async (url, maxBytes) => await core.media.loadWebMedia(url, maxBytes),
|
||||
env: process.env,
|
||||
abortSignal: opts.abortSignal,
|
||||
});
|
||||
|
||||
await Promise.race([
|
||||
@@ -411,7 +413,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
syncLifecycle.waitForFatalStop(),
|
||||
]);
|
||||
} catch (err) {
|
||||
if (opts.abortSignal?.aborted === true && err instanceof Error && err.name === "AbortError") {
|
||||
if (opts.abortSignal?.aborted === true && isMatrixStartupAbortError(err)) {
|
||||
await cleanup("stop");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -133,6 +133,7 @@ describe("runMatrixStartupMaintenance", () => {
|
||||
contentType: "image/png",
|
||||
fileName: "avatar.png",
|
||||
})),
|
||||
abortSignal: undefined,
|
||||
env: {},
|
||||
};
|
||||
}
|
||||
@@ -235,4 +236,22 @@ describe("runMatrixStartupMaintenance", () => {
|
||||
{ error: "boom" },
|
||||
);
|
||||
});
|
||||
|
||||
it("aborts maintenance before later startup steps continue", async () => {
|
||||
const params = createParams();
|
||||
params.auth.encryption = true;
|
||||
const abortController = new AbortController();
|
||||
params.abortSignal = abortController.signal;
|
||||
vi.mocked(deps.syncMatrixOwnProfile).mockImplementation(async () => {
|
||||
abortController.abort();
|
||||
return createProfileSyncResult();
|
||||
});
|
||||
|
||||
await expect(runMatrixStartupMaintenance(params, deps)).rejects.toMatchObject({
|
||||
message: "Matrix startup aborted",
|
||||
name: "AbortError",
|
||||
});
|
||||
expect(deps.ensureMatrixStartupVerification).not.toHaveBeenCalled();
|
||||
expect(deps.maybeRestoreLegacyMatrixBackup).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -2,6 +2,7 @@ import type { RuntimeLogger } from "../../runtime-api.js";
|
||||
import type { CoreConfig, MatrixConfig } from "../../types.js";
|
||||
import type { MatrixAuth } from "../client.js";
|
||||
import type { MatrixClient } from "../sdk.js";
|
||||
import { isMatrixStartupAbortError, throwIfMatrixStartupAborted } from "../startup-abort.js";
|
||||
|
||||
type MatrixStartupClient = Pick<
|
||||
MatrixClient,
|
||||
@@ -66,10 +67,12 @@ export async function runMatrixStartupMaintenance(
|
||||
maxBytes: number,
|
||||
) => Promise<{ buffer: Buffer; contentType?: string; fileName?: string }>;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
abortSignal?: AbortSignal;
|
||||
},
|
||||
deps?: MatrixStartupMaintenanceDeps,
|
||||
): Promise<void> {
|
||||
const runtimeDeps = deps ?? (await loadMatrixStartupMaintenanceDeps());
|
||||
throwIfMatrixStartupAborted(params.abortSignal);
|
||||
try {
|
||||
const profileSync = await runtimeDeps.syncMatrixOwnProfile({
|
||||
client: params.client,
|
||||
@@ -78,6 +81,7 @@ export async function runMatrixStartupMaintenance(
|
||||
avatarUrl: params.accountConfig.avatarUrl,
|
||||
loadAvatarFromUrl: async (url, maxBytes) => await params.loadWebMedia(url, maxBytes),
|
||||
});
|
||||
throwIfMatrixStartupAborted(params.abortSignal);
|
||||
if (profileSync.displayNameUpdated) {
|
||||
params.logger.info(`matrix: profile display name updated for ${params.auth.userId}`);
|
||||
}
|
||||
@@ -94,11 +98,15 @@ export async function runMatrixStartupMaintenance(
|
||||
avatarUrl: profileSync.resolvedAvatarUrl,
|
||||
});
|
||||
await params.writeConfigFile(updatedCfg as never);
|
||||
throwIfMatrixStartupAborted(params.abortSignal);
|
||||
params.logVerboseMessage(
|
||||
`matrix: persisted converted avatar URL for account ${params.accountId} (${profileSync.resolvedAvatarUrl})`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
if (isMatrixStartupAbortError(err)) {
|
||||
throw err;
|
||||
}
|
||||
params.logger.warn("matrix: failed to sync profile from config", { error: String(err) });
|
||||
}
|
||||
|
||||
@@ -107,6 +115,7 @@ export async function runMatrixStartupMaintenance(
|
||||
}
|
||||
|
||||
try {
|
||||
throwIfMatrixStartupAborted(params.abortSignal);
|
||||
const deviceHealth = runtimeDeps.summarizeMatrixDeviceHealth(
|
||||
await params.client.listOwnDevices(),
|
||||
);
|
||||
@@ -116,18 +125,23 @@ export async function runMatrixStartupMaintenance(
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
if (isMatrixStartupAbortError(err)) {
|
||||
throw err;
|
||||
}
|
||||
params.logger.debug?.("Failed to inspect matrix device hygiene (non-fatal)", {
|
||||
error: String(err),
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
throwIfMatrixStartupAborted(params.abortSignal);
|
||||
const startupVerification = await runtimeDeps.ensureMatrixStartupVerification({
|
||||
client: params.client,
|
||||
auth: params.auth,
|
||||
accountConfig: params.accountConfig,
|
||||
env: params.env,
|
||||
});
|
||||
throwIfMatrixStartupAborted(params.abortSignal);
|
||||
if (startupVerification.kind === "verified") {
|
||||
params.logger.info("matrix: device is verified by its owner and ready for encrypted rooms");
|
||||
} else if (
|
||||
@@ -158,17 +172,22 @@ export async function runMatrixStartupMaintenance(
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
if (isMatrixStartupAbortError(err)) {
|
||||
throw err;
|
||||
}
|
||||
params.logger.debug?.("Failed to resolve matrix verification status (non-fatal)", {
|
||||
error: String(err),
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
throwIfMatrixStartupAborted(params.abortSignal);
|
||||
const legacyCryptoRestore = await runtimeDeps.maybeRestoreLegacyMatrixBackup({
|
||||
client: params.client,
|
||||
auth: params.auth,
|
||||
env: params.env,
|
||||
});
|
||||
throwIfMatrixStartupAborted(params.abortSignal);
|
||||
if (legacyCryptoRestore.kind === "restored") {
|
||||
params.logger.info(
|
||||
`matrix: restored ${legacyCryptoRestore.imported}/${legacyCryptoRestore.total} room key(s) from legacy encrypted-state backup`,
|
||||
@@ -189,6 +208,9 @@ export async function runMatrixStartupMaintenance(
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
if (isMatrixStartupAbortError(err)) {
|
||||
throw err;
|
||||
}
|
||||
params.logger.warn("matrix: failed restoring legacy encrypted-state backup", {
|
||||
error: String(err),
|
||||
});
|
||||
|
||||
@@ -44,6 +44,7 @@ import type {
|
||||
MessageEventContent,
|
||||
} from "./sdk/types.js";
|
||||
import type { MatrixVerificationSummary } from "./sdk/verification-manager.js";
|
||||
import { createMatrixStartupAbortError, throwIfMatrixStartupAborted } from "./startup-abort.js";
|
||||
import {
|
||||
isMatrixReadySyncState,
|
||||
isMatrixTerminalSyncState,
|
||||
@@ -146,18 +147,6 @@ const MATRIX_AUTOMATIC_REPAIR_BOOTSTRAP_OPTIONS = {
|
||||
strict: true,
|
||||
} satisfies MatrixCryptoBootstrapOptions;
|
||||
|
||||
function createMatrixStartupAbortError(): Error {
|
||||
const error = new Error("Matrix startup aborted");
|
||||
error.name = "AbortError";
|
||||
return error;
|
||||
}
|
||||
|
||||
function throwIfMatrixStartupAborted(abortSignal?: AbortSignal): void {
|
||||
if (abortSignal?.aborted === true) {
|
||||
throw createMatrixStartupAbortError();
|
||||
}
|
||||
}
|
||||
|
||||
function createMatrixExplicitBootstrapOptions(params?: {
|
||||
forceResetCrossSigning?: boolean;
|
||||
}): MatrixCryptoBootstrapOptions {
|
||||
|
||||
15
extensions/matrix/src/matrix/startup-abort.ts
Normal file
15
extensions/matrix/src/matrix/startup-abort.ts
Normal file
@@ -0,0 +1,15 @@
|
||||
export function createMatrixStartupAbortError(): Error {
|
||||
const error = new Error("Matrix startup aborted");
|
||||
error.name = "AbortError";
|
||||
return error;
|
||||
}
|
||||
|
||||
export function throwIfMatrixStartupAborted(abortSignal?: AbortSignal): void {
|
||||
if (abortSignal?.aborted === true) {
|
||||
throw createMatrixStartupAbortError();
|
||||
}
|
||||
}
|
||||
|
||||
export function isMatrixStartupAbortError(error: unknown): boolean {
|
||||
return error instanceof Error && error.name === "AbortError";
|
||||
}
|
||||
Reference in New Issue
Block a user