mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 22:10:43 +00:00
matrix: gate decrypt warnings on healthy sync
(cherry picked from commit a79c486042997fd72049330cf4e728dd63efbf46)
This commit is contained in:
@@ -34,6 +34,7 @@ function createHarness(params?: {
|
||||
selfUserIdError?: Error;
|
||||
startupMs?: number;
|
||||
startupGraceMs?: number;
|
||||
getHealthySyncSinceMs?: () => number | undefined;
|
||||
allowFrom?: string[];
|
||||
dmEnabled?: boolean;
|
||||
dmPolicy?: "open" | "pairing" | "allowlist" | "disabled";
|
||||
@@ -147,8 +148,10 @@ function createHarness(params?: {
|
||||
warnedEncryptedRooms: new Set<string>(),
|
||||
warnedCryptoMissingRooms: new Set<string>(),
|
||||
logger,
|
||||
startupMs: params?.startupMs,
|
||||
startupGraceMs: params?.startupGraceMs,
|
||||
getHealthySyncSinceMs:
|
||||
params?.getHealthySyncSinceMs ??
|
||||
(typeof params?.startupMs === "number" ? () => params.startupMs : undefined),
|
||||
formatNativeDependencyHint,
|
||||
onRoomMessage,
|
||||
});
|
||||
@@ -1489,14 +1492,14 @@ describe("registerMatrixMonitorEvents verification routing", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("classifies repeated fresh post-startup decrypt failures separately", async () => {
|
||||
it("classifies repeated fresh post-healthy-sync decrypt failures separately", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z"));
|
||||
try {
|
||||
const startupMs = Date.now() - 60_000;
|
||||
const healthySyncSinceMs = Date.now() - 60_000;
|
||||
const { logger, failedDecryptListener } = createHarness({
|
||||
accountId: "ops",
|
||||
startupMs,
|
||||
getHealthySyncSinceMs: () => healthySyncSinceMs,
|
||||
});
|
||||
if (!failedDecryptListener) {
|
||||
throw new Error("room.failed_decryption listener was not registered");
|
||||
@@ -1522,34 +1525,34 @@ describe("registerMatrixMonitorEvents verification routing", () => {
|
||||
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
"Failed to decrypt fresh post-startup message",
|
||||
"Failed to decrypt fresh post-healthy-sync message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-fresh-1",
|
||||
freshPostStartup: true,
|
||||
postStartupFailureCount: 1,
|
||||
freshAfterHealthySync: true,
|
||||
postHealthySyncFailureCount: 1,
|
||||
}),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
"Failed to decrypt fresh post-startup message",
|
||||
"Failed to decrypt fresh post-healthy-sync message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-fresh-2",
|
||||
freshPostStartup: true,
|
||||
postStartupFailureCount: 2,
|
||||
freshAfterHealthySync: true,
|
||||
postHealthySyncFailureCount: 2,
|
||||
}),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
3,
|
||||
"Failed to decrypt fresh post-startup message",
|
||||
"Failed to decrypt fresh post-healthy-sync message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-fresh-3",
|
||||
freshPostStartup: true,
|
||||
postStartupFailureCount: 3,
|
||||
freshAfterHealthySync: true,
|
||||
postHealthySyncFailureCount: 3,
|
||||
}),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
4,
|
||||
"matrix: repeated fresh encrypted messages are still failing to decrypt after startup. Matrix sync is healthy, but this device may be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.",
|
||||
"matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. This device may still be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.",
|
||||
expect.objectContaining({
|
||||
failureCount: 3,
|
||||
roomCount: 3,
|
||||
@@ -1563,13 +1566,14 @@ describe("registerMatrixMonitorEvents verification routing", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps pre-startup decrypt failures on the generic warning path", async () => {
|
||||
it("keeps decrypt failures before healthy sync on the generic warning path", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z"));
|
||||
try {
|
||||
let healthySyncSinceMs: number | undefined;
|
||||
const { logger, failedDecryptListener } = createHarness({
|
||||
accountId: "ops",
|
||||
startupMs: Date.now(),
|
||||
getHealthySyncSinceMs: () => healthySyncSinceMs,
|
||||
});
|
||||
if (!failedDecryptListener) {
|
||||
throw new Error("room.failed_decryption listener was not registered");
|
||||
@@ -1592,7 +1596,83 @@ describe("registerMatrixMonitorEvents verification routing", () => {
|
||||
"Failed to decrypt message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-old",
|
||||
freshPostStartup: false,
|
||||
freshAfterHealthySync: false,
|
||||
}),
|
||||
);
|
||||
|
||||
healthySyncSinceMs = Date.now();
|
||||
|
||||
await failedDecryptListener(
|
||||
"!room:example.org",
|
||||
{
|
||||
event_id: "$enc-fresh-after-ready",
|
||||
sender: "@alice:matrix.example.org",
|
||||
type: EventType.RoomMessageEncrypted,
|
||||
origin_server_ts: Date.now() + 1,
|
||||
content: {},
|
||||
},
|
||||
new Error("The sender's device has not sent us the keys for this message."),
|
||||
);
|
||||
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
"Failed to decrypt fresh post-healthy-sync message",
|
||||
expect.objectContaining({
|
||||
eventId: "$enc-fresh-after-ready",
|
||||
freshAfterHealthySync: true,
|
||||
postHealthySyncFailureCount: 1,
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("re-emits the aggregate warning for a new failure wave after the window clears", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-04-10T16:21:00.000Z"));
|
||||
try {
|
||||
const healthySyncSinceMs = Date.now() - 60_000;
|
||||
const { logger, failedDecryptListener } = createHarness({
|
||||
accountId: "ops",
|
||||
getHealthySyncSinceMs: () => healthySyncSinceMs,
|
||||
});
|
||||
if (!failedDecryptListener) {
|
||||
throw new Error("room.failed_decryption listener was not registered");
|
||||
}
|
||||
|
||||
for (const wave of [1, 2]) {
|
||||
for (const index of [1, 2, 3]) {
|
||||
await failedDecryptListener(
|
||||
`!room-${wave}-${index}:example.org`,
|
||||
{
|
||||
event_id: `$enc-wave-${wave}-${index}`,
|
||||
sender: `@alice${wave}${index}:matrix.example.org`,
|
||||
type: EventType.RoomMessageEncrypted,
|
||||
origin_server_ts: Date.now() - index * 1_000,
|
||||
content: {},
|
||||
},
|
||||
new Error("The sender's device has not sent us the keys for this message."),
|
||||
);
|
||||
}
|
||||
|
||||
if (wave === 1) {
|
||||
await vi.advanceTimersByTimeAsync(2 * 60_000 + 1);
|
||||
}
|
||||
}
|
||||
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
4,
|
||||
"matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. This device may still be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.",
|
||||
expect.objectContaining({
|
||||
sampleEventIds: ["$enc-wave-1-1", "$enc-wave-1-2", "$enc-wave-1-3"],
|
||||
}),
|
||||
);
|
||||
expect(logger.warn).toHaveBeenNthCalledWith(
|
||||
8,
|
||||
"matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. This device may still be missing new room keys. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.",
|
||||
expect.objectContaining({
|
||||
sampleEventIds: ["$enc-wave-2-1", "$enc-wave-2-2", "$enc-wave-2-3"],
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
|
||||
@@ -8,11 +8,11 @@ import type { MatrixRawEvent } from "./types.js";
|
||||
import { EventType } from "./types.js";
|
||||
import { createMatrixVerificationEventRouter } from "./verification-events.js";
|
||||
|
||||
const MATRIX_POST_STARTUP_DECRYPT_FAILURE_WINDOW_MS = 2 * 60_000;
|
||||
const MATRIX_POST_STARTUP_DECRYPT_FAILURE_THRESHOLD = 3;
|
||||
const MATRIX_POST_STARTUP_DECRYPT_FAILURE_SAMPLE_LIMIT = 3;
|
||||
const MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_WINDOW_MS = 2 * 60_000;
|
||||
const MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_THRESHOLD = 3;
|
||||
const MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT = 3;
|
||||
|
||||
type MatrixPostStartupDecryptFailureObservation = {
|
||||
type MatrixPostHealthySyncDecryptFailureObservation = {
|
||||
key: string;
|
||||
roomId: string;
|
||||
eventId: string;
|
||||
@@ -21,29 +21,29 @@ type MatrixPostStartupDecryptFailureObservation = {
|
||||
error: string;
|
||||
};
|
||||
|
||||
function formatMatrixPostStartupDecryptionHint(accountId: string): string {
|
||||
function formatMatrixPostHealthySyncDecryptionHint(accountId: string): string {
|
||||
return (
|
||||
"matrix: repeated fresh encrypted messages are still failing to decrypt after startup. " +
|
||||
"Matrix sync is healthy, but this device may be missing new room keys. " +
|
||||
"matrix: repeated fresh encrypted messages are still failing to decrypt after Matrix resumed healthy sync. " +
|
||||
"This device may still be missing new room keys. " +
|
||||
`Check 'openclaw matrix verify status --verbose --account ${accountId}' and 'openclaw matrix devices list --account ${accountId}'.`
|
||||
);
|
||||
}
|
||||
|
||||
function isFreshPostStartupDecryptFailure(params: {
|
||||
function isFreshPostHealthySyncDecryptFailure(params: {
|
||||
event: MatrixRawEvent;
|
||||
startupMs?: number;
|
||||
startupGraceMs?: number;
|
||||
healthySyncSinceMs?: number;
|
||||
graceMs?: number;
|
||||
nowMs: number;
|
||||
}): boolean {
|
||||
const { event, startupMs, startupGraceMs = 0, nowMs } = params;
|
||||
if (typeof startupMs !== "number" || !Number.isFinite(startupMs)) {
|
||||
const { event, healthySyncSinceMs, graceMs = 0, nowMs } = params;
|
||||
if (typeof healthySyncSinceMs !== "number" || !Number.isFinite(healthySyncSinceMs)) {
|
||||
return false;
|
||||
}
|
||||
const eventTs = event.origin_server_ts;
|
||||
if (!Number.isFinite(eventTs) || eventTs <= 0) {
|
||||
return false;
|
||||
}
|
||||
if (eventTs < startupMs + startupGraceMs) {
|
||||
if (eventTs < healthySyncSinceMs + graceMs) {
|
||||
return false;
|
||||
}
|
||||
if (eventTs > nowMs + 60_000) {
|
||||
@@ -52,31 +52,34 @@ function isFreshPostStartupDecryptFailure(params: {
|
||||
return true;
|
||||
}
|
||||
|
||||
function createMatrixPostStartupDecryptFailureTracker(params: {
|
||||
startupMs?: number;
|
||||
function createMatrixPostHealthySyncDecryptFailureTracker(params: {
|
||||
getHealthySyncSinceMs?: () => number | undefined;
|
||||
startupGraceMs?: number;
|
||||
}) {
|
||||
let observations: MatrixPostStartupDecryptFailureObservation[] = [];
|
||||
let observations: MatrixPostHealthySyncDecryptFailureObservation[] = [];
|
||||
let warningEmitted = false;
|
||||
|
||||
const pruneObservations = (nowMs: number) => {
|
||||
observations = observations.filter(
|
||||
(entry) => nowMs - entry.eventTs <= MATRIX_POST_STARTUP_DECRYPT_FAILURE_WINDOW_MS,
|
||||
(entry) => nowMs - entry.eventTs <= MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_WINDOW_MS,
|
||||
);
|
||||
if (observations.length === 0) {
|
||||
warningEmitted = false;
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
recordFailure(roomId: string, event: MatrixRawEvent, error: Error) {
|
||||
const nowMs = Date.now();
|
||||
if (
|
||||
!isFreshPostStartupDecryptFailure({
|
||||
!isFreshPostHealthySyncDecryptFailure({
|
||||
event,
|
||||
startupMs: params.startupMs,
|
||||
startupGraceMs: params.startupGraceMs,
|
||||
healthySyncSinceMs: params.getHealthySyncSinceMs?.(),
|
||||
graceMs: params.startupGraceMs,
|
||||
nowMs,
|
||||
})
|
||||
) {
|
||||
return { freshPostStartup: false, failureCount: 0 } as const;
|
||||
return { freshAfterHealthySync: false, failureCount: 0 } as const;
|
||||
}
|
||||
|
||||
pruneObservations(nowMs);
|
||||
@@ -94,25 +97,25 @@ function createMatrixPostStartupDecryptFailureTracker(params: {
|
||||
}
|
||||
|
||||
const failureCount = observations.length;
|
||||
if (warningEmitted || failureCount < MATRIX_POST_STARTUP_DECRYPT_FAILURE_THRESHOLD) {
|
||||
return { freshPostStartup: true, failureCount } as const;
|
||||
if (warningEmitted || failureCount < MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_THRESHOLD) {
|
||||
return { freshAfterHealthySync: true, failureCount } as const;
|
||||
}
|
||||
|
||||
warningEmitted = true;
|
||||
const rooms = [...new Set(observations.map((entry) => entry.roomId))].slice(
|
||||
0,
|
||||
MATRIX_POST_STARTUP_DECRYPT_FAILURE_SAMPLE_LIMIT,
|
||||
MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT,
|
||||
);
|
||||
const senders = [...new Set(observations.map((entry) => entry.sender).filter(Boolean))].slice(
|
||||
0,
|
||||
MATRIX_POST_STARTUP_DECRYPT_FAILURE_SAMPLE_LIMIT,
|
||||
MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT,
|
||||
);
|
||||
const eventIds = observations
|
||||
.slice(-MATRIX_POST_STARTUP_DECRYPT_FAILURE_SAMPLE_LIMIT)
|
||||
.slice(-MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_SAMPLE_LIMIT)
|
||||
.map((entry) => entry.eventId);
|
||||
const latestError = observations.at(-1)?.error ?? error.message;
|
||||
return {
|
||||
freshPostStartup: true,
|
||||
freshAfterHealthySync: true,
|
||||
failureCount,
|
||||
warning: {
|
||||
rooms,
|
||||
@@ -121,7 +124,7 @@ function createMatrixPostStartupDecryptFailureTracker(params: {
|
||||
senderCount: new Set(observations.map((entry) => entry.sender).filter(Boolean)).size,
|
||||
eventIds,
|
||||
latestError,
|
||||
windowMs: MATRIX_POST_STARTUP_DECRYPT_FAILURE_WINDOW_MS,
|
||||
windowMs: MATRIX_POST_HEALTHY_SYNC_DECRYPT_FAILURE_WINDOW_MS,
|
||||
},
|
||||
} as const;
|
||||
},
|
||||
@@ -167,8 +170,8 @@ export function registerMatrixMonitorEvents(params: {
|
||||
warnedEncryptedRooms: Set<string>;
|
||||
warnedCryptoMissingRooms: Set<string>;
|
||||
logger: RuntimeLogger;
|
||||
startupMs?: number;
|
||||
startupGraceMs?: number;
|
||||
getHealthySyncSinceMs?: () => number | undefined;
|
||||
formatNativeDependencyHint: PluginRuntime["system"]["formatNativeDependencyHint"];
|
||||
onRoomMessage: (roomId: string, event: MatrixRawEvent) => void | Promise<void>;
|
||||
runDetachedTask?: (label: string, task: () => Promise<void>) => Promise<void>;
|
||||
@@ -186,14 +189,14 @@ export function registerMatrixMonitorEvents(params: {
|
||||
warnedEncryptedRooms,
|
||||
warnedCryptoMissingRooms,
|
||||
logger,
|
||||
startupMs,
|
||||
startupGraceMs,
|
||||
getHealthySyncSinceMs,
|
||||
formatNativeDependencyHint,
|
||||
onRoomMessage,
|
||||
runDetachedTask,
|
||||
} = params;
|
||||
const postStartupDecryptFailureTracker = createMatrixPostStartupDecryptFailureTracker({
|
||||
startupMs,
|
||||
const postHealthySyncDecryptFailureTracker = createMatrixPostHealthySyncDecryptFailureTracker({
|
||||
getHealthySyncSinceMs,
|
||||
startupGraceMs,
|
||||
});
|
||||
const { routeVerificationEvent, routeVerificationSummary } = createMatrixVerificationEventRouter({
|
||||
@@ -243,13 +246,13 @@ export function registerMatrixMonitorEvents(params: {
|
||||
client.on(
|
||||
"room.failed_decryption",
|
||||
async (roomId: string, event: MatrixRawEvent, error: Error) => {
|
||||
const failureState = postStartupDecryptFailureTracker.recordFailure(roomId, event, error);
|
||||
const failureState = postHealthySyncDecryptFailureTracker.recordFailure(roomId, event, error);
|
||||
const selfUserId = await resolveMatrixSelfUserId(client, logVerboseMessage);
|
||||
const sender = typeof event.sender === "string" ? event.sender : null;
|
||||
const senderMatchesOwnUser = Boolean(selfUserId && sender && selfUserId === sender);
|
||||
logger.warn(
|
||||
failureState.freshPostStartup
|
||||
? "Failed to decrypt fresh post-startup message"
|
||||
failureState.freshAfterHealthySync
|
||||
? "Failed to decrypt fresh post-healthy-sync message"
|
||||
: "Failed to decrypt message",
|
||||
{
|
||||
roomId,
|
||||
@@ -257,16 +260,16 @@ export function registerMatrixMonitorEvents(params: {
|
||||
sender,
|
||||
senderMatchesOwnUser,
|
||||
error: error.message,
|
||||
freshPostStartup: failureState.freshPostStartup,
|
||||
...(failureState.freshPostStartup
|
||||
freshAfterHealthySync: failureState.freshAfterHealthySync,
|
||||
...(failureState.freshAfterHealthySync
|
||||
? {
|
||||
postStartupFailureCount: failureState.failureCount,
|
||||
postHealthySyncFailureCount: failureState.failureCount,
|
||||
}
|
||||
: {}),
|
||||
},
|
||||
);
|
||||
if (failureState.warning) {
|
||||
logger.warn(formatMatrixPostStartupDecryptionHint(auth.accountId), {
|
||||
logger.warn(formatMatrixPostHealthySyncDecryptionHint(auth.accountId), {
|
||||
roomId,
|
||||
eventId: event.event_id,
|
||||
failureCount: failureState.failureCount,
|
||||
@@ -287,7 +290,7 @@ export function registerMatrixMonitorEvents(params: {
|
||||
});
|
||||
}
|
||||
logVerboseMessage(
|
||||
`matrix: failed decrypt room=${roomId} id=${event.event_id ?? "unknown"} freshPostStartup=${String(failureState.freshPostStartup)} error=${error.message}`,
|
||||
`matrix: failed decrypt room=${roomId} id=${event.event_id ?? "unknown"} freshAfterHealthySync=${String(failureState.freshAfterHealthySync)} error=${error.message}`,
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
@@ -241,6 +241,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
const startupGraceMs = 0;
|
||||
const warnedEncryptedRooms = new Set<string>();
|
||||
const warnedCryptoMissingRooms = new Set<string>();
|
||||
let healthySyncSinceMs: number | undefined;
|
||||
|
||||
try {
|
||||
client = await resolveSharedMatrixClient({
|
||||
@@ -358,8 +359,8 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
warnedEncryptedRooms,
|
||||
warnedCryptoMissingRooms,
|
||||
logger,
|
||||
startupMs,
|
||||
startupGraceMs,
|
||||
getHealthySyncSinceMs: () => healthySyncSinceMs,
|
||||
formatNativeDependencyHint: core.system.formatNativeDependencyHint,
|
||||
onRoomMessage: handleRoomMessage,
|
||||
runDetachedTask: monitorTaskRunner.runDetachedTask,
|
||||
@@ -374,6 +375,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
accountId: auth.accountId,
|
||||
abortSignal: opts.abortSignal,
|
||||
});
|
||||
healthySyncSinceMs ??= Date.now();
|
||||
logVerboseMessage("matrix: client started");
|
||||
|
||||
// Shared client is already started via resolveSharedMatrixClient.
|
||||
|
||||
Reference in New Issue
Block a user