mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-13 18:21:27 +00:00
fix(matrix): harden shared startup and fatal sync latching
This commit is contained in:
@@ -271,6 +271,48 @@ describe("resolveSharedMatrixClient", () => {
|
||||
await expect(ownerPromise).resolves.toBe(mainClient);
|
||||
});
|
||||
|
||||
it("keeps the shared startup lock while an aborted waiter exits early", async () => {
|
||||
const mainAuth = authFor("main");
|
||||
let resolveStartup: (() => void) | undefined;
|
||||
const mainClient = {
|
||||
...createMockClient("main"),
|
||||
start: vi.fn(
|
||||
async () =>
|
||||
await new Promise<void>((resolve) => {
|
||||
resolveStartup = resolve;
|
||||
}),
|
||||
),
|
||||
};
|
||||
|
||||
resolveMatrixAuthMock.mockResolvedValue(mainAuth);
|
||||
createMatrixClientMock.mockResolvedValue(mainClient);
|
||||
|
||||
const ownerPromise = resolveSharedMatrixClient({ accountId: "main" });
|
||||
await vi.waitFor(() => {
|
||||
expect(mainClient.start).toHaveBeenCalledTimes(1);
|
||||
expect(resolveStartup).toEqual(expect.any(Function));
|
||||
});
|
||||
|
||||
const abortController = new AbortController();
|
||||
const abortedWaiter = resolveSharedMatrixClient({
|
||||
accountId: "main",
|
||||
abortSignal: abortController.signal,
|
||||
});
|
||||
abortController.abort();
|
||||
await expect(abortedWaiter).rejects.toMatchObject({
|
||||
message: "Matrix startup aborted",
|
||||
name: "AbortError",
|
||||
});
|
||||
|
||||
const followerPromise = resolveSharedMatrixClient({ accountId: "main" });
|
||||
expect(mainClient.start).toHaveBeenCalledTimes(1);
|
||||
|
||||
resolveStartup?.();
|
||||
await expect(ownerPromise).resolves.toBe(mainClient);
|
||||
await expect(followerPromise).resolves.toBe(mainClient);
|
||||
expect(mainClient.start).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("recreates the shared client when dispatcherPolicy changes", async () => {
|
||||
const firstAuth = {
|
||||
...authFor("main"),
|
||||
|
||||
@@ -107,7 +107,7 @@ async function ensureSharedClientStarted(params: {
|
||||
return;
|
||||
}
|
||||
|
||||
params.state.startPromise = (async () => {
|
||||
const startPromise = (async () => {
|
||||
const client = params.state.client;
|
||||
|
||||
// Initialize crypto if enabled
|
||||
@@ -126,12 +126,16 @@ async function ensureSharedClientStarted(params: {
|
||||
await client.start({ abortSignal: params.abortSignal });
|
||||
params.state.started = true;
|
||||
})();
|
||||
// Keep the shared startup lock until the underlying start fully settles, even
|
||||
// if one waiter aborts early while another caller still owns the startup.
|
||||
const guardedStart = startPromise.finally(() => {
|
||||
if (params.state.startPromise === guardedStart) {
|
||||
params.state.startPromise = null;
|
||||
}
|
||||
});
|
||||
params.state.startPromise = guardedStart;
|
||||
|
||||
try {
|
||||
await waitForStart(params.state.startPromise);
|
||||
} finally {
|
||||
params.state.startPromise = null;
|
||||
}
|
||||
await waitForStart(guardedStart);
|
||||
}
|
||||
|
||||
async function resolveSharedMatrixClientState(
|
||||
|
||||
@@ -146,6 +146,33 @@ describe("createMatrixMonitorSyncLifecycle", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("ignores follow-up sync states after a fatal sync error", async () => {
|
||||
const client = createClientEmitter();
|
||||
const setStatus = vi.fn();
|
||||
const lifecycle = createMatrixMonitorSyncLifecycle({
|
||||
client: client as never,
|
||||
statusController: createMatrixMonitorStatusController({
|
||||
accountId: "default",
|
||||
statusSink: setStatus,
|
||||
}),
|
||||
});
|
||||
|
||||
const waitPromise = lifecycle.waitForFatalStop();
|
||||
client.emit("sync.unexpected_error", new Error("sync exploded"));
|
||||
await expect(waitPromise).rejects.toThrow("sync exploded");
|
||||
|
||||
client.emit("sync.state", "RECONNECTING", "SYNCING", new Error("late reconnect"));
|
||||
lifecycle.dispose();
|
||||
|
||||
expect(setStatus).toHaveBeenLastCalledWith(
|
||||
expect.objectContaining({
|
||||
accountId: "default",
|
||||
healthState: "error",
|
||||
lastError: "sync exploded",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects a second concurrent fatal-stop waiter", async () => {
|
||||
const client = createClientEmitter();
|
||||
const lifecycle = createMatrixMonitorSyncLifecycle({
|
||||
|
||||
@@ -42,7 +42,9 @@ export function createMatrixMonitorSyncLifecycle(params: {
|
||||
settleFatal(fatalError);
|
||||
return;
|
||||
}
|
||||
if (isMatrixTerminalSyncState(state) && fatalError) {
|
||||
// Fatal sync failures are sticky for telemetry; later SDK state churn during
|
||||
// cleanup or reconnect should not overwrite the first recorded error.
|
||||
if (fatalError) {
|
||||
return;
|
||||
}
|
||||
params.statusController.noteSyncState(state, error);
|
||||
|
||||
Reference in New Issue
Block a user