mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 13:50:49 +00:00
matrix: clean up sync health listener lifecycle
(cherry picked from commit 31ef7b86cdb3db20dac16ab235f9d6b38f25e771) (cherry picked from commit 2499a76e3f3fe55bf434952d7228c4f643891bb1)
This commit is contained in:
@@ -122,6 +122,7 @@ const hoisted = vi.hoisted(() => {
|
||||
resolveSharedMatrixClient,
|
||||
resolveTextChunkLimit,
|
||||
runMatrixStartupMaintenance,
|
||||
registeredHealthySyncGetter: undefined as undefined | (() => number | undefined),
|
||||
setActiveMatrixClient,
|
||||
setMatrixRuntime,
|
||||
setStatus,
|
||||
@@ -339,10 +340,12 @@ vi.mock("./direct.js", () => ({
|
||||
vi.mock("./events.js", () => ({
|
||||
registerMatrixMonitorEvents: vi.fn(
|
||||
(params: {
|
||||
getHealthySyncSinceMs?: () => number | undefined;
|
||||
onRoomMessage: (roomId: string, event: unknown) => Promise<void>;
|
||||
runDetachedTask?: (label: string, task: () => Promise<void>) => Promise<void>;
|
||||
}) => {
|
||||
hoisted.callOrder.push("register-events");
|
||||
hoisted.registeredHealthySyncGetter = params.getHealthySyncSinceMs;
|
||||
hoisted.registeredOnRoomMessage = (roomId: string, event: unknown) =>
|
||||
params.runDetachedTask
|
||||
? params.runDetachedTask("test room message", async () => {
|
||||
@@ -429,6 +432,7 @@ describe("monitorMatrixProvider", () => {
|
||||
});
|
||||
hoisted.getMemberDisplayName.mockReset().mockResolvedValue("Bot");
|
||||
hoisted.registeredOnRoomMessage = null;
|
||||
hoisted.registeredHealthySyncGetter = undefined;
|
||||
hoisted.setActiveMatrixClient.mockReset();
|
||||
hoisted.stopThreadBindingManager.mockReset();
|
||||
hoisted.client.removeAllListeners();
|
||||
@@ -497,6 +501,49 @@ describe("monitorMatrixProvider", () => {
|
||||
await expect(monitorPromise).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("re-arms the healthy-sync milestone across reconnect transitions", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z"));
|
||||
const abortController = new AbortController();
|
||||
try {
|
||||
const monitorPromise = monitorMatrixProvider({
|
||||
abortSignal: abortController.signal,
|
||||
setStatus: hoisted.setStatus,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(hoisted.callOrder).toContain("start-client");
|
||||
});
|
||||
|
||||
const getHealthySyncSinceMs = hoisted.registeredHealthySyncGetter;
|
||||
if (!getHealthySyncSinceMs) {
|
||||
throw new Error("expected healthy sync getter to be registered");
|
||||
}
|
||||
|
||||
expect(getHealthySyncSinceMs()).toBeUndefined();
|
||||
|
||||
hoisted.client.emit("sync.state", "SYNCING", "RECONNECTING", undefined);
|
||||
expect(getHealthySyncSinceMs()).toBe(Date.now());
|
||||
|
||||
await vi.advanceTimersByTimeAsync(5_000);
|
||||
hoisted.client.emit("sync.state", "RECONNECTING", "SYNCING", new Error("network flap"));
|
||||
expect(getHealthySyncSinceMs()).toBeUndefined();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(7_000);
|
||||
hoisted.client.emit("sync.state", "SYNCING", "RECONNECTING", undefined);
|
||||
const rearmedHealthySyncSinceMs = Date.now();
|
||||
expect(getHealthySyncSinceMs()).toBe(rearmedHealthySyncSinceMs);
|
||||
|
||||
abortController.abort();
|
||||
await expect(monitorPromise).resolves.toBeUndefined();
|
||||
|
||||
hoisted.client.emit("sync.state", "RECONNECTING", "SYNCING", new Error("late noise"));
|
||||
expect(getHealthySyncSinceMs()).toBe(rearmedHealthySyncSinceMs);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("contains room-message handler rejections inside monitor task tracking", async () => {
|
||||
const abortController = new AbortController();
|
||||
const unhandled: unknown[] = [];
|
||||
|
||||
@@ -27,6 +27,11 @@ import {
|
||||
import { releaseSharedClientInstance } from "../client/shared.js";
|
||||
import type { MatrixClient } from "../sdk.js";
|
||||
import { isMatrixStartupAbortError } from "../startup-abort.js";
|
||||
import {
|
||||
isMatrixDisconnectedSyncState,
|
||||
isMatrixReadySyncState,
|
||||
type MatrixSyncState,
|
||||
} from "../sync-state.js";
|
||||
import { createMatrixThreadBindingManager } from "../thread-bindings.js";
|
||||
import { registerMatrixAutoJoin } from "./auto-join.js";
|
||||
import { resolveMatrixMonitorConfig } from "./config.js";
|
||||
@@ -184,6 +189,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
await releaseSharedClientInstance(client, mode);
|
||||
}
|
||||
} finally {
|
||||
client?.off("sync.state", onSyncState);
|
||||
syncLifecycle?.dispose();
|
||||
statusController.markStopped();
|
||||
setActiveMatrixClient(null, auth.accountId);
|
||||
@@ -242,6 +248,18 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
const warnedEncryptedRooms = new Set<string>();
|
||||
const warnedCryptoMissingRooms = new Set<string>();
|
||||
let healthySyncSinceMs: number | undefined;
|
||||
const noteSyncHealthState = (state: MatrixSyncState, at = Date.now()) => {
|
||||
if (isMatrixReadySyncState(state)) {
|
||||
healthySyncSinceMs = at;
|
||||
return;
|
||||
}
|
||||
if (isMatrixDisconnectedSyncState(state)) {
|
||||
healthySyncSinceMs = undefined;
|
||||
}
|
||||
};
|
||||
const onSyncState = (state: MatrixSyncState) => {
|
||||
noteSyncHealthState(state);
|
||||
};
|
||||
|
||||
try {
|
||||
client = await resolveSharedMatrixClient({
|
||||
@@ -260,6 +278,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
statusController,
|
||||
isStopping: () => cleanedUp || opts.abortSignal?.aborted === true,
|
||||
});
|
||||
client.on("sync.state", onSyncState);
|
||||
// Cold starts should ignore old room history, but once we have a persisted
|
||||
// /sync cursor we want restart backlogs to replay just like other channels.
|
||||
const dropPreStartupMessages = !client.hasPersistedSyncState();
|
||||
@@ -375,7 +394,6 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
accountId: auth.accountId,
|
||||
abortSignal: opts.abortSignal,
|
||||
});
|
||||
healthySyncSinceMs ??= Date.now();
|
||||
logVerboseMessage("matrix: client started");
|
||||
|
||||
// Shared client is already started via resolveSharedMatrixClient.
|
||||
|
||||
Reference in New Issue
Block a user