mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-14 18:51:04 +00:00
fix(matrix): reset startup and shutdown sync state edges
This commit is contained in:
@@ -90,6 +90,8 @@ export function createMatrixMonitorStatusController(params: {
|
||||
noteDisconnected({ state, at, error });
|
||||
return;
|
||||
}
|
||||
// Unknown future SDK states inherit the current connectivity bit until the
|
||||
// SDK classifies them as ready or disconnected. Avoid guessing here.
|
||||
status.lastEventAt = at;
|
||||
status.healthState = state.toLowerCase();
|
||||
emit();
|
||||
|
||||
@@ -62,4 +62,50 @@ describe("createMatrixMonitorSyncLifecycle", () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("ignores unexpected sync errors emitted during intentional shutdown", async () => {
|
||||
const client = createClientEmitter();
|
||||
const setStatus = vi.fn();
|
||||
let stopping = false;
|
||||
const lifecycle = createMatrixMonitorSyncLifecycle({
|
||||
client: client as never,
|
||||
statusController: createMatrixMonitorStatusController({
|
||||
accountId: "default",
|
||||
statusSink: setStatus,
|
||||
}),
|
||||
isStopping: () => stopping,
|
||||
});
|
||||
|
||||
const waitPromise = lifecycle.waitForFatalStop();
|
||||
stopping = true;
|
||||
client.emit("sync.unexpected_error", new Error("shutdown noise"));
|
||||
lifecycle.dispose();
|
||||
|
||||
await expect(waitPromise).resolves.toBeUndefined();
|
||||
expect(setStatus).not.toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
accountId: "default",
|
||||
healthState: "error",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects a second concurrent fatal-stop waiter", async () => {
|
||||
const client = createClientEmitter();
|
||||
const lifecycle = createMatrixMonitorSyncLifecycle({
|
||||
client: client as never,
|
||||
statusController: createMatrixMonitorStatusController({
|
||||
accountId: "default",
|
||||
}),
|
||||
});
|
||||
|
||||
const firstWait = lifecycle.waitForFatalStop();
|
||||
|
||||
await expect(lifecycle.waitForFatalStop()).rejects.toThrow(
|
||||
"Matrix fatal-stop wait already in progress",
|
||||
);
|
||||
|
||||
lifecycle.dispose();
|
||||
await expect(firstWait).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -43,10 +43,11 @@ export function createMatrixMonitorSyncLifecycle(params: {
|
||||
};
|
||||
|
||||
const onUnexpectedError = (error: Error) => {
|
||||
params.statusController.noteUnexpectedError(error);
|
||||
if (!params.isStopping?.()) {
|
||||
settleFatal(error);
|
||||
if (params.isStopping?.()) {
|
||||
return;
|
||||
}
|
||||
params.statusController.noteUnexpectedError(error);
|
||||
settleFatal(error);
|
||||
};
|
||||
|
||||
params.client.on("sync.state", onSyncState);
|
||||
@@ -57,6 +58,9 @@ export function createMatrixMonitorSyncLifecycle(params: {
|
||||
if (fatalError) {
|
||||
throw fatalError;
|
||||
}
|
||||
if (resolveFatalWait || rejectFatalWait) {
|
||||
throw new Error("Matrix fatal-stop wait already in progress");
|
||||
}
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
resolveFatalWait = resolve;
|
||||
rejectFatalWait = (error) => reject(error);
|
||||
|
||||
@@ -1070,6 +1070,36 @@ describe("MatrixClient event bridge", () => {
|
||||
await startExpectation;
|
||||
});
|
||||
|
||||
it("clears stale sync state before a restarted sync session waits for fresh readiness", async () => {
|
||||
matrixJsClient.startClient = vi
|
||||
.fn(async () => {
|
||||
queueMicrotask(() => {
|
||||
matrixJsClient.emit("sync", "PREPARED", null, undefined);
|
||||
});
|
||||
})
|
||||
.mockImplementationOnce(async () => {
|
||||
queueMicrotask(() => {
|
||||
matrixJsClient.emit("sync", "PREPARED", null, undefined);
|
||||
});
|
||||
})
|
||||
.mockImplementationOnce(async () => {});
|
||||
|
||||
const client = new MatrixClient("https://matrix.example.org", "token");
|
||||
|
||||
await client.start();
|
||||
client.stopSyncWithoutPersist();
|
||||
|
||||
vi.useFakeTimers();
|
||||
const restartPromise = client.start();
|
||||
const restartExpectation = expect(restartPromise).rejects.toThrow(
|
||||
"Matrix client did not reach a ready sync state within 30000ms",
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(30_000);
|
||||
|
||||
await restartExpectation;
|
||||
});
|
||||
|
||||
it("replays outstanding invite rooms at startup", async () => {
|
||||
matrixJsClient.getRooms = vi.fn(() => [
|
||||
{
|
||||
|
||||
@@ -536,6 +536,7 @@ export class MatrixClient {
|
||||
clearInterval(this.idbPersistTimer);
|
||||
this.idbPersistTimer = null;
|
||||
}
|
||||
this.currentSyncState = null;
|
||||
this.client.stopClient();
|
||||
this.started = false;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user