mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 11:10:45 +00:00
matrix: detect repeated undecryptable events after startup (#64451)
Merged via squash.
Prepared head SHA: a2ad02ecba
Co-authored-by: shad0wca7 <9969843+shad0wca7@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
@@ -32,6 +32,9 @@ function createHarness(params?: {
|
||||
cryptoAvailable?: boolean;
|
||||
selfUserId?: string;
|
||||
selfUserIdError?: Error;
|
||||
startupMs?: number;
|
||||
startupGraceMs?: number;
|
||||
getHealthySyncSinceMs?: () => number | undefined;
|
||||
allowFrom?: string[];
|
||||
dmEnabled?: boolean;
|
||||
dmPolicy?: "open" | "pairing" | "allowlist" | "disabled";
|
||||
@@ -145,6 +148,10 @@ function createHarness(params?: {
|
||||
warnedEncryptedRooms: new Set<string>(),
|
||||
warnedCryptoMissingRooms: new Set<string>(),
|
||||
logger,
|
||||
startupGraceMs: params?.startupGraceMs,
|
||||
getHealthySyncSinceMs:
|
||||
params?.getHealthySyncSinceMs ??
|
||||
(typeof params?.startupMs === "number" ? () => params.startupMs : undefined),
|
||||
formatNativeDependencyHint,
|
||||
onRoomMessage,
|
||||
});
|
||||
@@ -1485,6 +1492,276 @@ describe("registerMatrixMonitorEvents verification routing", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("classifies repeated fresh post-healthy-sync decrypt failures separately", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z"));
|
||||
try {
|
||||
const healthySyncSinceMs = Date.now() - 60_000;
|
||||
const { logger, failedDecryptListener } = createHarness({
|
||||
accountId: "ops",
|
||||
getHealthySyncSinceMs: () => healthySyncSinceMs,
|
||||
});
|
||||
if (!failedDecryptListener) {
|
||||
throw new Error("room.failed_decryption listener was not registered");
|
||||
}
|
||||
|
||||
for (const [index, roomId] of [
|
||||
"!room-a:example.org",
|
||||
"!room-b:example.org",
|
||||
"!room-c:example.org",
|
||||
].entries()) {
|
||||
await failedDecryptListener(
|
||||
roomId,
|
||||
{
|
||||
event_id: `$enc-fresh-${index + 1}`,
|
||||
sender: `@alice${index + 1}:matrix.example.org`,
|
||||
type: EventType.RoomMessageEncrypted,
|
||||
origin_server_ts: Date.now() - 1_000 * (index + 1),
|
||||
content: {},
|
||||
},
|
||||
new Error("The sender's device has not sent us the keys for this message."),
|
||||
);
|
||||
}
|
||||
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
"Failed to decrypt fresh post-healthy-sync message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-fresh-1",
|
||||
freshAfterHealthySync: true,
|
||||
postHealthySyncFailureCount: 1,
|
||||
}),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
"Failed to decrypt fresh post-healthy-sync message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-fresh-2",
|
||||
freshAfterHealthySync: true,
|
||||
postHealthySyncFailureCount: 2,
|
||||
}),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
3,
|
||||
"Failed to decrypt fresh post-healthy-sync message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-fresh-3",
|
||||
freshAfterHealthySync: true,
|
||||
postHealthySyncFailureCount: 3,
|
||||
}),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
4,
|
||||
"matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. This device may still be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.",
|
||||
expect.objectContaining({
|
||||
failureCount: 3,
|
||||
roomCount: 3,
|
||||
senderCount: 3,
|
||||
rooms: ["!room-a:example.org", "!room-b:example.org", "!room-c:example.org"],
|
||||
sampleEventIds: ["$enc-fresh-1", "$enc-fresh-2", "$enc-fresh-3"],
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps decrypt failures before healthy sync on the generic warning path", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z"));
|
||||
try {
|
||||
let healthySyncSinceMs: number | undefined;
|
||||
const { logger, failedDecryptListener } = createHarness({
|
||||
accountId: "ops",
|
||||
getHealthySyncSinceMs: () => healthySyncSinceMs,
|
||||
});
|
||||
if (!failedDecryptListener) {
|
||||
throw new Error("room.failed_decryption listener was not registered");
|
||||
}
|
||||
|
||||
await failedDecryptListener(
|
||||
"!room:example.org",
|
||||
{
|
||||
event_id: "$enc-old",
|
||||
sender: "@alice:matrix.example.org",
|
||||
type: EventType.RoomMessageEncrypted,
|
||||
origin_server_ts: Date.now() - 5 * 60_000,
|
||||
content: {},
|
||||
},
|
||||
new Error("The sender's device has not sent us the keys for this message."),
|
||||
);
|
||||
|
||||
expect(logger.warn).toHaveBeenCalledTimes(1);
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
"Failed to decrypt message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-old",
|
||||
freshAfterHealthySync: false,
|
||||
}),
|
||||
);
|
||||
|
||||
healthySyncSinceMs = Date.now();
|
||||
|
||||
await failedDecryptListener(
|
||||
"!room:example.org",
|
||||
{
|
||||
event_id: "$enc-fresh-after-ready",
|
||||
sender: "@alice:matrix.example.org",
|
||||
type: EventType.RoomMessageEncrypted,
|
||||
origin_server_ts: Date.now() + 1,
|
||||
content: {},
|
||||
},
|
||||
new Error("The sender's device has not sent us the keys for this message."),
|
||||
);
|
||||
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
"Failed to decrypt fresh post-healthy-sync message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-fresh-after-ready",
|
||||
freshAfterHealthySync: true,
|
||||
postHealthySyncFailureCount: 1,
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("re-emits the aggregate warning for a new failure wave after the window clears", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z"));
|
||||
try {
|
||||
const healthySyncSinceMs = Date.now() - 60_000;
|
||||
const { logger, failedDecryptListener } = createHarness({
|
||||
accountId: "ops",
|
||||
getHealthySyncSinceMs: () => healthySyncSinceMs,
|
||||
});
|
||||
if (!failedDecryptListener) {
|
||||
throw new Error("room.failed_decryption listener was not registered");
|
||||
}
|
||||
|
||||
for (const wave of [1, 2]) {
|
||||
for (const index of [1, 2, 3]) {
|
||||
await failedDecryptListener(
|
||||
`!room-${wave}-${index}:example.org`,
|
||||
{
|
||||
event_id: `$enc-wave-${wave}-${index}`,
|
||||
sender: `@alice${wave}${index}:matrix.example.org`,
|
||||
type: EventType.RoomMessageEncrypted,
|
||||
origin_server_ts: Date.now() - index * 1_000,
|
||||
content: {},
|
||||
},
|
||||
new Error("The sender's device has not sent us the keys for this message."),
|
||||
);
|
||||
}
|
||||
|
||||
if (wave === 1) {
|
||||
await vi.advanceTimersByTimeAsync(2 * 60_000 + 1);
|
||||
}
|
||||
}
|
||||
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
4,
|
||||
"matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. This device may still be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.",
|
||||
expect.objectContaining({
|
||||
sampleEventIds: ["$enc-wave-1-1", "$enc-wave-1-2", "$enc-wave-1-3"],
|
||||
}),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
8,
|
||||
"matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. This device may still be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.",
|
||||
expect.objectContaining({
|
||||
sampleEventIds: ["$enc-wave-2-1", "$enc-wave-2-2", "$enc-wave-2-3"],
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("resets tracked failures when healthy sync restarts before the old window expires", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z"));
|
||||
try {
|
||||
let healthySyncSinceMs = Date.now() - 60_000;
|
||||
const { logger, failedDecryptListener } = createHarness({
|
||||
accountId: "ops",
|
||||
getHealthySyncSinceMs: () => healthySyncSinceMs,
|
||||
});
|
||||
if (!failedDecryptListener) {
|
||||
throw new Error("room.failed_decryption listener was not registered");
|
||||
}
|
||||
|
||||
for (const index of [1, 2, 3]) {
|
||||
await failedDecryptListener(
|
||||
`!room-first-${index}:example.org`,
|
||||
{
|
||||
event_id: `$enc-first-${index}`,
|
||||
sender: `@alice-first-${index}:matrix.example.org`,
|
||||
type: EventType.RoomMessageEncrypted,
|
||||
origin_server_ts: Date.now() - index * 1_000,
|
||||
content: {},
|
||||
},
|
||||
new Error("The sender's device has not sent us the keys for this message."),
|
||||
);
|
||||
}
|
||||
|
||||
healthySyncSinceMs = Date.now();
|
||||
|
||||
for (const index of [1, 2, 3]) {
|
||||
await failedDecryptListener(
|
||||
`!room-second-${index}:example.org`,
|
||||
{
|
||||
event_id: `$enc-second-${index}`,
|
||||
sender: `@alice-second-${index}:matrix.example.org`,
|
||||
type: EventType.RoomMessageEncrypted,
|
||||
origin_server_ts: Date.now() + index,
|
||||
content: {},
|
||||
},
|
||||
new Error("The sender's device has not sent us the keys for this message."),
|
||||
);
|
||||
}
|
||||
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
5,
|
||||
"Failed to decrypt fresh post-healthy-sync message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-second-1",
|
||||
freshAfterHealthySync: true,
|
||||
postHealthySyncFailureCount: 1,
|
||||
}),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
6,
|
||||
"Failed to decrypt fresh post-healthy-sync message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-second-2",
|
||||
freshAfterHealthySync: true,
|
||||
postHealthySyncFailureCount: 2,
|
||||
}),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
7,
|
||||
"Failed to decrypt fresh post-healthy-sync message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-second-3",
|
||||
freshAfterHealthySync: true,
|
||||
postHealthySyncFailureCount: 3,
|
||||
}),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
8,
|
||||
"matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. This device may still be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.",
|
||||
expect.objectContaining({
|
||||
sampleEventIds: ["$enc-second-1", "$enc-second-2", "$enc-second-3"],
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not throw when getUserId fails during decrypt guidance lookup", async () => {
|
||||
const { logger, logVerboseMessage, failedDecryptListener } = createHarness({
|
||||
accountId: "ops",
|
||||
|
||||
@@ -8,6 +8,140 @@ import type { MatrixRawEvent } from "./types.js";
|
||||
import { EventType } from "./types.js";
|
||||
import { createMatrixVerificationEventRouter } from "./verification-events.js";
|
||||
|
||||
const MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_WINDOW_MS = 2 * 60_000;
|
||||
const MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_THRESHOLD = 3;
|
||||
const MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT = 3;
|
||||
|
||||
type MatrixPostHealthySyncDecryptFailureObservation = {
|
||||
key: string;
|
||||
roomId: string;
|
||||
eventId: string;
|
||||
sender: string | null;
|
||||
eventTs: number;
|
||||
error: string;
|
||||
};
|
||||
|
||||
function formatMatrixPostHealthySyncDecryptionHint(accountId: string): string {
|
||||
return (
|
||||
"matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. " +
|
||||
"This device may still be missing new room keys. " +
|
||||
`Check 'openclaw matrix verify status --verbose --account ${accountId}' and 'openclaw matrix devices list --account ${accountId}'.`
|
||||
);
|
||||
}
|
||||
|
||||
function isFreshPostHealthySyncDecryptFailure(params: {
|
||||
event: MatrixRawEvent;
|
||||
healthySyncSinceMs?: number;
|
||||
graceMs?: number;
|
||||
nowMs: number;
|
||||
}): boolean {
|
||||
const { event, healthySyncSinceMs, graceMs = 0, nowMs } = params;
|
||||
if (typeof healthySyncSinceMs !== "number" || !Number.isFinite(healthySyncSinceMs)) {
|
||||
return false;
|
||||
}
|
||||
const eventTs = event.origin_server_ts;
|
||||
if (!Number.isFinite(eventTs) || eventTs <= 0) {
|
||||
return false;
|
||||
}
|
||||
if (eventTs < healthySyncSinceMs + graceMs) {
|
||||
return false;
|
||||
}
|
||||
if (eventTs > nowMs + 60_000) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
function createMatrixPostHealthySyncDecryptFailureTracker(params: {
|
||||
getHealthySyncSinceMs?: () => number | undefined;
|
||||
startupGraceMs?: number;
|
||||
}) {
|
||||
let observations: MatrixPostHealthySyncDecryptFailureObservation[] = [];
|
||||
let warningEmitted = false;
|
||||
let trackedHealthySyncSinceMs: number | undefined;
|
||||
|
||||
const resetObservations = () => {
|
||||
observations = [];
|
||||
warningEmitted = false;
|
||||
};
|
||||
|
||||
const pruneObservations = (nowMs: number) => {
|
||||
observations = observations.filter(
|
||||
(entry) => nowMs - entry.eventTs <= MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_WINDOW_MS,
|
||||
);
|
||||
if (observations.length === 0) {
|
||||
warningEmitted = false;
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
recordFailure(roomId: string, event: MatrixRawEvent, error: Error) {
|
||||
const nowMs = Date.now();
|
||||
const healthySyncSinceMs = params.getHealthySyncSinceMs?.();
|
||||
if (healthySyncSinceMs !== trackedHealthySyncSinceMs) {
|
||||
trackedHealthySyncSinceMs = healthySyncSinceMs;
|
||||
resetObservations();
|
||||
}
|
||||
if (
|
||||
!isFreshPostHealthySyncDecryptFailure({
|
||||
event,
|
||||
healthySyncSinceMs,
|
||||
graceMs: params.startupGraceMs,
|
||||
nowMs,
|
||||
})
|
||||
) {
|
||||
return { freshAfterHealthySync: false, failureCount: 0 } as const;
|
||||
}
|
||||
|
||||
pruneObservations(nowMs);
|
||||
|
||||
const key = `${roomId}|${event.event_id}`;
|
||||
if (!observations.some((entry) => entry.key === key)) {
|
||||
observations.push({
|
||||
key,
|
||||
roomId,
|
||||
eventId: event.event_id,
|
||||
sender: typeof event.sender === "string" ? event.sender : null,
|
||||
eventTs: event.origin_server_ts,
|
||||
error: error.message,
|
||||
});
|
||||
}
|
||||
|
||||
const failureCount = observations.length;
|
||||
if (warningEmitted || failureCount < MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_THRESHOLD) {
|
||||
return { freshAfterHealthySync: true, failureCount } as const;
|
||||
}
|
||||
|
||||
warningEmitted = true;
|
||||
const rooms = [...new Set(observations.map((entry) => entry.roomId))].slice(
|
||||
0,
|
||||
MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT,
|
||||
);
|
||||
const senders = [...new Set(observations.map((entry) => entry.sender).filter(Boolean))].slice(
|
||||
0,
|
||||
MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT,
|
||||
);
|
||||
const eventIds = observations
|
||||
.slice(-MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT)
|
||||
.map((entry) => entry.eventId);
|
||||
const latestError = observations.at(-1)?.error ?? error.message;
|
||||
return {
|
||||
freshAfterHealthySync: true,
|
||||
failureCount,
|
||||
warning: {
|
||||
rooms,
|
||||
roomCount: new Set(observations.map((entry) => entry.roomId)).size,
|
||||
senders,
|
||||
senderCount: new Set(observations.map((entry) => entry.sender).filter(Boolean)).size,
|
||||
eventIds,
|
||||
latestError,
|
||||
windowMs: MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_WINDOW_MS,
|
||||
},
|
||||
} as const;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function formatMatrixSelfDecryptionHint(accountId: string): string {
|
||||
return (
|
||||
"matrix: failed to decrypt a message from this same Matrix user. " +
|
||||
@@ -47,6 +181,8 @@ export function registerMatrixMonitorEvents(params: {
|
||||
warnedEncryptedRooms: Set<string>;
|
||||
warnedCryptoMissingRooms: Set<string>;
|
||||
logger: RuntimeLogger;
|
||||
startupGraceMs?: number;
|
||||
getHealthySyncSinceMs?: () => number | undefined;
|
||||
formatNativeDependencyHint: PluginRuntime["system"]["formatNativeDependencyHint"];
|
||||
onRoomMessage: (roomId: string, event: MatrixRawEvent) => void | Promise<void>;
|
||||
runDetachedTask?: (label: string, task: () => Promise<void>) => Promise<void>;
|
||||
@@ -64,10 +200,16 @@ export function registerMatrixMonitorEvents(params: {
|
||||
warnedEncryptedRooms,
|
||||
warnedCryptoMissingRooms,
|
||||
logger,
|
||||
startupGraceMs,
|
||||
getHealthySyncSinceMs,
|
||||
formatNativeDependencyHint,
|
||||
onRoomMessage,
|
||||
runDetachedTask,
|
||||
} = params;
|
||||
const postHealthySyncDecryptFailureTracker = createMatrixPostHealthySyncDecryptFailureTracker({
|
||||
getHealthySyncSinceMs,
|
||||
startupGraceMs,
|
||||
});
|
||||
const { routeVerificationEvent, routeVerificationSummary } = createMatrixVerificationEventRouter({
|
||||
client,
|
||||
allowFrom,
|
||||
@@ -115,16 +257,42 @@ export function registerMatrixMonitorEvents(params: {
|
||||
client.on(
|
||||
"room.failed_decryption",
|
||||
async (roomId: string, event: MatrixRawEvent, error: Error) => {
|
||||
const failureState = postHealthySyncDecryptFailureTracker.recordFailure(roomId, event, error);
|
||||
const selfUserId = await resolveMatrixSelfUserId(client, logVerboseMessage);
|
||||
const sender = typeof event.sender === "string" ? event.sender : null;
|
||||
const senderMatchesOwnUser = Boolean(selfUserId && sender && selfUserId === sender);
|
||||
logger.warn("Failed to decrypt message", {
|
||||
roomId,
|
||||
eventId: event.event_id,
|
||||
sender,
|
||||
senderMatchesOwnUser,
|
||||
error: error.message,
|
||||
});
|
||||
logger.warn(
|
||||
failureState.freshAfterHealthySync
|
||||
? "Failed to decrypt fresh post-healthy-sync message"
|
||||
: "Failed to decrypt message",
|
||||
{
|
||||
roomId,
|
||||
eventId: event.event_id,
|
||||
sender,
|
||||
senderMatchesOwnUser,
|
||||
error: error.message,
|
||||
freshAfterHealthySync: failureState.freshAfterHealthySync,
|
||||
...(failureState.freshAfterHealthySync
|
||||
? {
|
||||
postHealthySyncFailureCount: failureState.failureCount,
|
||||
}
|
||||
: {}),
|
||||
},
|
||||
);
|
||||
if (failureState.warning) {
|
||||
logger.warn(formatMatrixPostHealthySyncDecryptionHint(auth.accountId), {
|
||||
roomId,
|
||||
eventId: event.event_id,
|
||||
failureCount: failureState.failureCount,
|
||||
roomCount: failureState.warning.roomCount,
|
||||
rooms: failureState.warning.rooms,
|
||||
senderCount: failureState.warning.senderCount,
|
||||
senders: failureState.warning.senders,
|
||||
sampleEventIds: failureState.warning.eventIds,
|
||||
latestError: failureState.warning.latestError,
|
||||
windowMs: failureState.warning.windowMs,
|
||||
});
|
||||
}
|
||||
if (senderMatchesOwnUser) {
|
||||
logger.warn(formatMatrixSelfDecryptionHint(auth.accountId), {
|
||||
roomId,
|
||||
@@ -133,7 +301,7 @@ export function registerMatrixMonitorEvents(params: {
|
||||
});
|
||||
}
|
||||
logVerboseMessage(
|
||||
`matrix: failed decrypt room=${roomId} id=${event.event_id ?? "unknown"} error=${error.message}`,
|
||||
`matrix: failed decrypt room=${roomId} id=${event.event_id ?? "unknown"} freshAfterHealthySync=${String(failureState.freshAfterHealthySync)} error=${error.message}`,
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
@@ -122,6 +122,7 @@ const hoisted = vi.hoisted(() => {
|
||||
resolveSharedMatrixClient,
|
||||
resolveTextChunkLimit,
|
||||
runMatrixStartupMaintenance,
|
||||
registeredHealthySyncGetter: undefined as undefined | (() => number | undefined),
|
||||
setActiveMatrixClient,
|
||||
setMatrixRuntime,
|
||||
setStatus,
|
||||
@@ -339,10 +340,12 @@ vi.mock("./direct.js", () => ({
|
||||
vi.mock("./events.js", () => ({
|
||||
registerMatrixMonitorEvents: vi.fn(
|
||||
(params: {
|
||||
getHealthySyncSinceMs?: () => number | undefined;
|
||||
onRoomMessage: (roomId: string, event: unknown) => Promise<void>;
|
||||
runDetachedTask?: (label: string, task: () => Promise<void>) => Promise<void>;
|
||||
}) => {
|
||||
hoisted.callOrder.push("register-events");
|
||||
hoisted.registeredHealthySyncGetter = params.getHealthySyncSinceMs;
|
||||
hoisted.registeredOnRoomMessage = (roomId: string, event: unknown) =>
|
||||
params.runDetachedTask
|
||||
? params.runDetachedTask("test room message", async () => {
|
||||
@@ -429,6 +432,7 @@ describe("monitorMatrixProvider", () => {
|
||||
});
|
||||
hoisted.getMemberDisplayName.mockReset().mockResolvedValue("Bot");
|
||||
hoisted.registeredOnRoomMessage = null;
|
||||
hoisted.registeredHealthySyncGetter = undefined;
|
||||
hoisted.setActiveMatrixClient.mockReset();
|
||||
hoisted.stopThreadBindingManager.mockReset();
|
||||
hoisted.client.removeAllListeners();
|
||||
@@ -497,6 +501,58 @@ describe("monitorMatrixProvider", () => {
|
||||
await expect(monitorPromise).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("re-arms the healthy-sync milestone across reconnect transitions", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z"));
|
||||
const abortController = new AbortController();
|
||||
try {
|
||||
const monitorPromise = monitorMatrixProvider({
|
||||
abortSignal: abortController.signal,
|
||||
setStatus: hoisted.setStatus,
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(hoisted.callOrder).toContain("start-client");
|
||||
});
|
||||
|
||||
const getHealthySyncSinceMs = hoisted.registeredHealthySyncGetter;
|
||||
if (!getHealthySyncSinceMs) {
|
||||
throw new Error("expected healthy sync getter to be registered");
|
||||
}
|
||||
|
||||
expect(getHealthySyncSinceMs()).toBeUndefined();
|
||||
|
||||
hoisted.client.emit("sync.state", "SYNCING", "RECONNECTING", undefined);
|
||||
const firstHealthySyncSinceMs = Date.now();
|
||||
expect(getHealthySyncSinceMs()).toBe(firstHealthySyncSinceMs);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(3_000);
|
||||
hoisted.client.emit("sync.state", "CATCHUP", "SYNCING", undefined);
|
||||
expect(getHealthySyncSinceMs()).toBe(firstHealthySyncSinceMs);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
hoisted.client.emit("sync.state", "PREPARED", "CATCHUP", undefined);
|
||||
expect(getHealthySyncSinceMs()).toBe(firstHealthySyncSinceMs);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(5_000);
|
||||
hoisted.client.emit("sync.state", "RECONNECTING", "SYNCING", new Error("network flap"));
|
||||
expect(getHealthySyncSinceMs()).toBeUndefined();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(7_000);
|
||||
hoisted.client.emit("sync.state", "SYNCING", "RECONNECTING", undefined);
|
||||
const rearmedHealthySyncSinceMs = Date.now();
|
||||
expect(getHealthySyncSinceMs()).toBe(rearmedHealthySyncSinceMs);
|
||||
|
||||
abortController.abort();
|
||||
await expect(monitorPromise).resolves.toBeUndefined();
|
||||
|
||||
hoisted.client.emit("sync.state", "RECONNECTING", "SYNCING", new Error("late noise"));
|
||||
expect(getHealthySyncSinceMs()).toBe(rearmedHealthySyncSinceMs);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("contains room-message handler rejections inside monitor task tracking", async () => {
|
||||
const abortController = new AbortController();
|
||||
const unhandled: unknown[] = [];
|
||||
|
||||
@@ -27,6 +27,11 @@ import {
|
||||
import { releaseSharedClientInstance } from "../client/shared.js";
|
||||
import type { MatrixClient } from "../sdk.js";
|
||||
import { isMatrixStartupAbortError } from "../startup-abort.js";
|
||||
import {
|
||||
isMatrixDisconnectedSyncState,
|
||||
isMatrixReadySyncState,
|
||||
type MatrixSyncState,
|
||||
} from "../sync-state.js";
|
||||
import { createMatrixThreadBindingManager } from "../thread-bindings.js";
|
||||
import { registerMatrixAutoJoin } from "./auto-join.js";
|
||||
import { resolveMatrixMonitorConfig } from "./config.js";
|
||||
@@ -184,6 +189,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
await releaseSharedClientInstance(client, mode);
|
||||
}
|
||||
} finally {
|
||||
client?.off("sync.state", onSyncState);
|
||||
syncLifecycle?.dispose();
|
||||
statusController.markStopped();
|
||||
setActiveMatrixClient(null, auth.accountId);
|
||||
@@ -241,6 +247,19 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
const startupGraceMs = 0;
|
||||
const warnedEncryptedRooms = new Set<string>();
|
||||
const warnedCryptoMissingRooms = new Set<string>();
|
||||
let healthySyncSinceMs: number | undefined;
|
||||
const noteSyncHealthState = (state: MatrixSyncState, at = Date.now()) => {
|
||||
if (isMatrixReadySyncState(state)) {
|
||||
healthySyncSinceMs ??= at;
|
||||
return;
|
||||
}
|
||||
if (isMatrixDisconnectedSyncState(state)) {
|
||||
healthySyncSinceMs = undefined;
|
||||
}
|
||||
};
|
||||
const onSyncState = (state: MatrixSyncState) => {
|
||||
noteSyncHealthState(state);
|
||||
};
|
||||
|
||||
try {
|
||||
client = await resolveSharedMatrixClient({
|
||||
@@ -259,6 +278,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
statusController,
|
||||
isStopping: () => cleanedUp || opts.abortSignal?.aborted === true,
|
||||
});
|
||||
client.on("sync.state", onSyncState);
|
||||
// Cold starts should ignore old room history, but once we have a persisted
|
||||
// /sync cursor we want restart backlogs to replay just like other channels.
|
||||
const dropPreStartupMessages = !client.hasPersistedSyncState();
|
||||
@@ -358,6 +378,8 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
warnedEncryptedRooms,
|
||||
warnedCryptoMissingRooms,
|
||||
logger,
|
||||
startupGraceMs,
|
||||
getHealthySyncSinceMs: () => healthySyncSinceMs,
|
||||
formatNativeDependencyHint: core.system.formatNativeDependencyHint,
|
||||
onRoomMessage: handleRoomMessage,
|
||||
runDetachedTask: monitorTaskRunner.runDetachedTask,
|
||||
|
||||
Reference in New Issue
Block a user