diff --git a/extensions/matrix/src/matrix/monitor/events.test.ts b/extensions/matrix/src/matrix/monitor/events.test.ts index 5267b22780e..0184bf43555 100644 --- a/extensions/matrix/src/matrix/monitor/events.test.ts +++ b/extensions/matrix/src/matrix/monitor/events.test.ts @@ -34,6 +34,7 @@ function createHarness(params?: { selfUserIdError?: Error; startupMs?: number; startupGraceMs?: number; + getHealthySyncSinceMs?: () => number | undefined; allowFrom?: string[]; dmEnabled?: boolean; dmPolicy?: "open" | "pairing" | "allowlist" | "disabled"; @@ -147,8 +148,10 @@ function createHarness(params?: { warnedEncryptedRooms: new Set(), warnedCryptoMissingRooms: new Set(), logger, - startupMs: params?.startupMs, startupGraceMs: params?.startupGraceMs, + getHealthySyncSinceMs: + params?.getHealthySyncSinceMs ?? + (typeof params?.startupMs === "number" ? () => params.startupMs : undefined), formatNativeDependencyHint, onRoomMessage, }); @@ -1489,14 +1492,14 @@ describe("registerMatrixMonitorEvents verification routing", () => { ); }); - it("classifies repeated fresh post-startup decrypt failures separately", async () => { + it("classifies repeated fresh post-healthy-sync decrypt failures separately", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z")); try { - const startupMs = Date.now() - 60_000; + const healthySyncSinceMs = Date.now() - 60_000; const { logger, failedDecryptListener } = createHarness({ accountId: "ops", - startupMs, + getHealthySyncSinceMs: () => healthySyncSinceMs, }); if (!failedDecryptListener) { throw new Error("room.failed_decryption listener was not registered"); @@ -1522,34 +1525,34 @@ describe("registerMatrixMonitorEvents verification routing", () => { expect(logger.warn).toHaveBeenNthCalledWith( 1, - "Failed to decrypt fresh post-startup message", + "Failed to decrypt fresh post-healthy-sync message", expect.objectContaining({ eventId: "$enc-fresh-1", - freshPostStartup: true, - postStartupFailureCount: 1, + freshAfterHealthySync: true, + postHealthySyncFailureCount: 1, }), ); expect(logger.warn).toHaveBeenNthCalledWith( 2, - "Failed to decrypt fresh post-startup message", + "Failed to decrypt fresh post-healthy-sync message", expect.objectContaining({ eventId: "$enc-fresh-2", - freshPostStartup: true, - postStartupFailureCount: 2, + freshAfterHealthySync: true, + postHealthySyncFailureCount: 2, }), ); expect(logger.warn).toHaveBeenNthCalledWith( 3, - "Failed to decrypt fresh post-startup message", + "Failed to decrypt fresh post-healthy-sync message", expect.objectContaining({ eventId: "$enc-fresh-3", - freshPostStartup: true, - postStartupFailureCount: 3, + freshAfterHealthySync: true, + postHealthySyncFailureCount: 3, }), ); expect(logger.warn).toHaveBeenNthCalledWith( 4, - "matrix: repeated fresh encrypted messages are still failing to decrypt after startup. Matrix sync is healthy, but this device may be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.", + "matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. This device may still be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.", expect.objectContaining({ failureCount: 3, roomCount: 3, @@ -1563,13 +1566,14 @@ describe("registerMatrixMonitorEvents verification routing", () => { } }); - it("keeps pre-startup decrypt failures on the generic warning path", async () => { + it("keeps decrypt failures before healthy sync on the generic warning path", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z")); try { + let healthySyncSinceMs: number | undefined; const { logger, failedDecryptListener } = createHarness({ accountId: "ops", - startupMs: Date.now(), + getHealthySyncSinceMs: () => healthySyncSinceMs, }); if (!failedDecryptListener) { throw new Error("room.failed_decryption listener was not registered"); @@ -1592,7 +1596,83 @@ describe("registerMatrixMonitorEvents verification routing", () => { "Failed to decrypt message", expect.objectContaining({ eventId: "$enc-old", - freshPostStartup: false, + freshAfterHealthySync: false, + }), + ); + + healthySyncSinceMs = Date.now(); + + await failedDecryptListener( + "!room:example.org", + { + event_id: "$enc-fresh-after-ready", + sender: "@alice:matrix.example.org", + type: EventType.RoomMessageEncrypted, + origin_server_ts: Date.now() + 1, + content: {}, + }, + new Error("The sender's device has not sent us the keys for this message."), + ); + + expect(logger.warn).toHaveBeenNthCalledWith( + 2, + "Failed to decrypt fresh post-healthy-sync message", + expect.objectContaining({ + eventId: "$enc-fresh-after-ready", + freshAfterHealthySync: true, + postHealthySyncFailureCount: 1, + }), + ); + } finally { + vi.useRealTimers(); + } + }); + + it("re-emits the aggregate warning for a new failure wave after the window clears", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z")); + try { + const healthySyncSinceMs = Date.now() - 60_000; + const { logger, failedDecryptListener } = createHarness({ + accountId: "ops", + getHealthySyncSinceMs: () => healthySyncSinceMs, + }); + if (!failedDecryptListener) { + throw new Error("room.failed_decryption listener was not registered"); + } + + for (const wave of [1, 2]) { + for (const index of [1, 2, 3]) { + await failedDecryptListener( + `!room-${wave}-${index}:example.org`, + { + event_id: `$enc-wave-${wave}-${index}`, + sender: `@alice${wave}${index}:matrix.example.org`, + type: EventType.RoomMessageEncrypted, + origin_server_ts: Date.now() - index * 1_000, + content: {}, + }, + new Error("The sender's device has not sent us the keys for this message."), + ); + } + + if (wave === 1) { + await vi.advanceTimersByTimeAsync(2 * 60_000 + 1); + } + } + + expect(logger.warn).toHaveBeenNthCalledWith( + 4, + "matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. This device may still be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.", + expect.objectContaining({ + sampleEventIds: ["$enc-wave-1-1", "$enc-wave-1-2", "$enc-wave-1-3"], + }), + ); + expect(logger.warn).toHaveBeenNthCalledWith( + 8, + "matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. This device may still be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.", + expect.objectContaining({ + sampleEventIds: ["$enc-wave-2-1", "$enc-wave-2-2", "$enc-wave-2-3"], }), ); } finally { diff --git a/extensions/matrix/src/matrix/monitor/events.ts b/extensions/matrix/src/matrix/monitor/events.ts index 571d9ea9948..907b3105fb2 100644 --- a/extensions/matrix/src/matrix/monitor/events.ts +++ b/extensions/matrix/src/matrix/monitor/events.ts @@ -8,11 +8,11 @@ import type { MatrixRawEvent } from "./types.js"; import { EventType } from "./types.js"; import { createMatrixVerificationEventRouter } from "./verification-events.js"; -const MATRIX_POST_STARTUP_DECRYPT_FAILURE_WINDOW_MS = 2 * 60_000; -const MATRIX_POST_STARTUP_DECRYPT_FAILURE_THRESHOLD = 3; -const MATRIX_POST_STARTUP_DECRYPT_FAILURE_SAMPLE_LIMIT = 3; +const MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_WINDOW_MS = 2 * 60_000; +const MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_THRESHOLD = 3; +const MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT = 3; -type MatrixPostStartupDecryptFailureObservation = { +type MatrixPostHealthySyncDecryptFailureObservation = { key: string; roomId: string; eventId: string; @@ -21,29 +21,29 @@ type MatrixPostStartupDecryptFailureObservation = { error: string; }; -function formatMatrixPostStartupDecryptionHint(accountId: string): string { +function formatMatrixPostHealthySyncDecryptionHint(accountId: string): string { return ( - "matrix: repeated fresh encrypted messages are still failing to decrypt after startup. " + - "Matrix sync is healthy, but this device may be missing new room keys. " + + "matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. " + + "This device may still be missing new room keys. " + `Check 'openclaw matrix verify status --verbose --account ${accountId}' and 'openclaw matrix devices list --account ${accountId}'.` ); } -function isFreshPostStartupDecryptFailure(params: { +function isFreshPostHealthySyncDecryptFailure(params: { event: MatrixRawEvent; - startupMs?: number; - startupGraceMs?: number; + healthySyncSinceMs?: number; + graceMs?: number; nowMs: number; }): boolean { - const { event, startupMs, startupGraceMs = 0, nowMs } = params; - if (typeof startupMs !== "number" || !Number.isFinite(startupMs)) { + const { event, healthySyncSinceMs, graceMs = 0, nowMs } = params; + if (typeof healthySyncSinceMs !== "number" || !Number.isFinite(healthySyncSinceMs)) { return false; } const eventTs = event.origin_server_ts; if (!Number.isFinite(eventTs) || eventTs <= 0) { return false; } - if (eventTs < startupMs + startupGraceMs) { + if (eventTs < healthySyncSinceMs + graceMs) { return false; } if (eventTs > nowMs + 60_000) { @@ -52,31 +52,34 @@ function isFreshPostStartupDecryptFailure(params: { return true; } -function createMatrixPostStartupDecryptFailureTracker(params: { - startupMs?: number; +function createMatrixPostHealthySyncDecryptFailureTracker(params: { + getHealthySyncSinceMs?: () => number | undefined; startupGraceMs?: number; }) { - let observations: MatrixPostStartupDecryptFailureObservation[] = []; + let observations: MatrixPostHealthySyncDecryptFailureObservation[] = []; let warningEmitted = false; const pruneObservations = (nowMs: number) => { observations = observations.filter( - (entry) => nowMs - entry.eventTs <= MATRIX_POST_STARTUP_DECRYPT_FAILURE_WINDOW_MS, + (entry) => nowMs - entry.eventTs <= MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_WINDOW_MS, ); + if (observations.length === 0) { + warningEmitted = false; + } }; return { recordFailure(roomId: string, event: MatrixRawEvent, error: Error) { const nowMs = Date.now(); if ( - !isFreshPostStartupDecryptFailure({ + !isFreshPostHealthySyncDecryptFailure({ event, - startupMs: params.startupMs, - startupGraceMs: params.startupGraceMs, + healthySyncSinceMs: params.getHealthySyncSinceMs?.(), + graceMs: params.startupGraceMs, nowMs, }) ) { - return { freshPostStartup: false, failureCount: 0 } as const; + return { freshAfterHealthySync: false, failureCount: 0 } as const; } pruneObservations(nowMs); @@ -94,25 +97,25 @@ function createMatrixPostStartupDecryptFailureTracker(params: { } const failureCount = observations.length; - if (warningEmitted || failureCount < MATRIX_POST_STARTUP_DECRYPT_FAILURE_THRESHOLD) { - return { freshPostStartup: true, failureCount } as const; + if (warningEmitted || failureCount < MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_THRESHOLD) { + return { freshAfterHealthySync: true, failureCount } as const; } warningEmitted = true; const rooms = [...new Set(observations.map((entry) => entry.roomId))].slice( 0, - MATRIX_POST_STARTUP_DECRYPT_FAILURE_SAMPLE_LIMIT, + MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT, ); const senders = [...new Set(observations.map((entry) => entry.sender).filter(Boolean))].slice( 0, - MATRIX_POST_STARTUP_DECRYPT_FAILURE_SAMPLE_LIMIT, + MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT, ); const eventIds = observations - .slice(-MATRIX_POST_STARTUP_DECRYPT_FAILURE_SAMPLE_LIMIT) + .slice(-MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT) .map((entry) => entry.eventId); const latestError = observations.at(-1)?.error ?? error.message; return { - freshPostStartup: true, + freshAfterHealthySync: true, failureCount, warning: { rooms, @@ -121,7 +124,7 @@ function createMatrixPostStartupDecryptFailureTracker(params: { senderCount: new Set(observations.map((entry) => entry.sender).filter(Boolean)).size, eventIds, latestError, - windowMs: MATRIX_POST_STARTUP_DECRYPT_FAILURE_WINDOW_MS, + windowMs: MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_WINDOW_MS, }, } as const; }, @@ -167,8 +170,8 @@ export function registerMatrixMonitorEvents(params: { warnedEncryptedRooms: Set; warnedCryptoMissingRooms: Set; logger: RuntimeLogger; - startupMs?: number; startupGraceMs?: number; + getHealthySyncSinceMs?: () => number | undefined; formatNativeDependencyHint: PluginRuntime["system"]["formatNativeDependencyHint"]; onRoomMessage: (roomId: string, event: MatrixRawEvent) => void | Promise; runDetachedTask?: (label: string, task: () => Promise) => Promise; @@ -186,14 +189,14 @@ export function registerMatrixMonitorEvents(params: { warnedEncryptedRooms, warnedCryptoMissingRooms, logger, - startupMs, startupGraceMs, + getHealthySyncSinceMs, formatNativeDependencyHint, onRoomMessage, runDetachedTask, } = params; - const postStartupDecryptFailureTracker = createMatrixPostStartupDecryptFailureTracker({ - startupMs, + const postHealthySyncDecryptFailureTracker = createMatrixPostHealthySyncDecryptFailureTracker({ + getHealthySyncSinceMs, startupGraceMs, }); const { routeVerificationEvent, routeVerificationSummary } = createMatrixVerificationEventRouter({ @@ -243,13 +246,13 @@ export function registerMatrixMonitorEvents(params: { client.on( "room.failed_decryption", async (roomId: string, event: MatrixRawEvent, error: Error) => { - const failureState = postStartupDecryptFailureTracker.recordFailure(roomId, event, error); + const failureState = postHealthySyncDecryptFailureTracker.recordFailure(roomId, event, error); const selfUserId = await resolveMatrixSelfUserId(client, logVerboseMessage); const sender = typeof event.sender === "string" ? event.sender : null; const senderMatchesOwnUser = Boolean(selfUserId && sender && selfUserId === sender); logger.warn( - failureState.freshPostStartup - ? "Failed to decrypt fresh post-startup message" + failureState.freshAfterHealthySync + ? "Failed to decrypt fresh post-healthy-sync message" : "Failed to decrypt message", { roomId, @@ -257,16 +260,16 @@ export function registerMatrixMonitorEvents(params: { sender, senderMatchesOwnUser, error: error.message, - freshPostStartup: failureState.freshPostStartup, - ...(failureState.freshPostStartup + freshAfterHealthySync: failureState.freshAfterHealthySync, + ...(failureState.freshAfterHealthySync ? { - postStartupFailureCount: failureState.failureCount, + postHealthySyncFailureCount: failureState.failureCount, } : {}), }, ); if (failureState.warning) { - logger.warn(formatMatrixPostStartupDecryptionHint(auth.accountId), { + logger.warn(formatMatrixPostHealthySyncDecryptionHint(auth.accountId), { roomId, eventId: event.event_id, failureCount: failureState.failureCount, @@ -287,7 +290,7 @@ export function registerMatrixMonitorEvents(params: { }); } logVerboseMessage( - `matrix: failed decrypt room=${roomId} id=${event.event_id ?? "unknown"} freshPostStartup=${String(failureState.freshPostStartup)} error=${error.message}`, + `matrix: failed decrypt room=${roomId} id=${event.event_id ?? "unknown"} freshAfterHealthySync=${String(failureState.freshAfterHealthySync)} error=${error.message}`, ); }, ); diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 7a951d4fbcd..0c775b310f9 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -241,6 +241,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const startupGraceMs = 0; const warnedEncryptedRooms = new Set(); const warnedCryptoMissingRooms = new Set(); + let healthySyncSinceMs: number | undefined; try { client = await resolveSharedMatrixClient({ @@ -358,8 +359,8 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi warnedEncryptedRooms, warnedCryptoMissingRooms, logger, - startupMs, startupGraceMs, + getHealthySyncSinceMs: () => healthySyncSinceMs, formatNativeDependencyHint: core.system.formatNativeDependencyHint, onRoomMessage: handleRoomMessage, runDetachedTask: monitorTaskRunner.runDetachedTask, @@ -374,6 +375,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi accountId: auth.accountId, abortSignal: opts.abortSignal, }); + healthySyncSinceMs ??= Date.now(); logVerboseMessage("matrix: client started"); // Shared client is already started via resolveSharedMatrixClient.