mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 18:20:44 +00:00
Matrix: drain decrypt retries before shutdown persist
This commit is contained in:
@@ -16,6 +16,7 @@ const hoisted = vi.hoisted(() => {
|
|||||||
id: "matrix-client",
|
id: "matrix-client",
|
||||||
hasPersistedSyncState: vi.fn(() => false),
|
hasPersistedSyncState: vi.fn(() => false),
|
||||||
stopSyncWithoutPersist: vi.fn(),
|
stopSyncWithoutPersist: vi.fn(),
|
||||||
|
drainPendingDecryptions: vi.fn(async () => undefined),
|
||||||
};
|
};
|
||||||
const createMatrixRoomMessageHandler = vi.fn(() => vi.fn());
|
const createMatrixRoomMessageHandler = vi.fn(() => vi.fn());
|
||||||
const resolveTextChunkLimit = vi.fn<
|
const resolveTextChunkLimit = vi.fn<
|
||||||
@@ -236,6 +237,7 @@ describe("monitorMatrixProvider", () => {
|
|||||||
hoisted.stopThreadBindingManager.mockReset();
|
hoisted.stopThreadBindingManager.mockReset();
|
||||||
hoisted.client.hasPersistedSyncState.mockReset().mockReturnValue(false);
|
hoisted.client.hasPersistedSyncState.mockReset().mockReturnValue(false);
|
||||||
hoisted.client.stopSyncWithoutPersist.mockReset();
|
hoisted.client.stopSyncWithoutPersist.mockReset();
|
||||||
|
hoisted.client.drainPendingDecryptions.mockReset().mockResolvedValue(undefined);
|
||||||
hoisted.inboundDeduper.claimEvent.mockReset().mockReturnValue(true);
|
hoisted.inboundDeduper.claimEvent.mockReset().mockReturnValue(true);
|
||||||
hoisted.inboundDeduper.commitEvent.mockReset().mockResolvedValue(undefined);
|
hoisted.inboundDeduper.commitEvent.mockReset().mockResolvedValue(undefined);
|
||||||
hoisted.inboundDeduper.releaseEvent.mockReset();
|
hoisted.inboundDeduper.releaseEvent.mockReset();
|
||||||
@@ -303,7 +305,7 @@ describe("monitorMatrixProvider", () => {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("stops sync before waiting for in-flight handlers, then flushes dedupe before persisting", async () => {
|
it("stops sync, drains decryptions, then waits for in-flight handlers before persisting", async () => {
|
||||||
const { monitorMatrixProvider } = await import("./index.js");
|
const { monitorMatrixProvider } = await import("./index.js");
|
||||||
const abortController = new AbortController();
|
const abortController = new AbortController();
|
||||||
let resolveHandler: (() => void) | null = null;
|
let resolveHandler: (() => void) | null = null;
|
||||||
@@ -322,6 +324,9 @@ describe("monitorMatrixProvider", () => {
|
|||||||
hoisted.client.stopSyncWithoutPersist.mockImplementation(() => {
|
hoisted.client.stopSyncWithoutPersist.mockImplementation(() => {
|
||||||
hoisted.callOrder.push("pause-client");
|
hoisted.callOrder.push("pause-client");
|
||||||
});
|
});
|
||||||
|
hoisted.client.drainPendingDecryptions.mockImplementation(async () => {
|
||||||
|
hoisted.callOrder.push("drain-decrypts");
|
||||||
|
});
|
||||||
hoisted.releaseSharedClientInstance.mockImplementation(async () => {
|
hoisted.releaseSharedClientInstance.mockImplementation(async () => {
|
||||||
hoisted.callOrder.push("release-client");
|
hoisted.callOrder.push("release-client");
|
||||||
return true;
|
return true;
|
||||||
@@ -354,6 +359,9 @@ describe("monitorMatrixProvider", () => {
|
|||||||
await monitorPromise;
|
await monitorPromise;
|
||||||
|
|
||||||
expect(hoisted.callOrder.indexOf("pause-client")).toBeLessThan(
|
expect(hoisted.callOrder.indexOf("pause-client")).toBeLessThan(
|
||||||
|
hoisted.callOrder.indexOf("drain-decrypts"),
|
||||||
|
);
|
||||||
|
expect(hoisted.callOrder.indexOf("drain-decrypts")).toBeLessThan(
|
||||||
hoisted.callOrder.indexOf("handler-done"),
|
hoisted.callOrder.indexOf("handler-done"),
|
||||||
);
|
);
|
||||||
expect(hoisted.callOrder.indexOf("handler-done")).toBeLessThan(
|
expect(hoisted.callOrder.indexOf("handler-done")).toBeLessThan(
|
||||||
|
|||||||
@@ -155,6 +155,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
|||||||
try {
|
try {
|
||||||
threadBindingManager?.stop();
|
threadBindingManager?.stop();
|
||||||
client.stopSyncWithoutPersist();
|
client.stopSyncWithoutPersist();
|
||||||
|
await client.drainPendingDecryptions("matrix monitor shutdown");
|
||||||
await waitForInFlightRoomMessages();
|
await waitForInFlightRoomMessages();
|
||||||
await inboundDeduper.stop();
|
await inboundDeduper.stop();
|
||||||
await releaseSharedClientInstance(client, "persist");
|
await releaseSharedClientInstance(client, "persist");
|
||||||
|
|||||||
@@ -684,6 +684,52 @@ describe("MatrixClient event bridge", () => {
|
|||||||
expect(delivered).toEqual(["m.room.message"]);
|
expect(delivered).toEqual(["m.room.message"]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("can drain pending decrypt retries after sync stops", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const client = new MatrixClient("https://matrix.example.org", "token");
|
||||||
|
const delivered: string[] = [];
|
||||||
|
|
||||||
|
client.on("room.message", (_roomId, event) => {
|
||||||
|
delivered.push(event.type);
|
||||||
|
});
|
||||||
|
|
||||||
|
const encrypted = new FakeMatrixEvent({
|
||||||
|
roomId: "!room:example.org",
|
||||||
|
eventId: "$event",
|
||||||
|
sender: "@alice:example.org",
|
||||||
|
type: "m.room.encrypted",
|
||||||
|
ts: Date.now(),
|
||||||
|
content: {},
|
||||||
|
decryptionFailure: true,
|
||||||
|
});
|
||||||
|
const decrypted = new FakeMatrixEvent({
|
||||||
|
roomId: "!room:example.org",
|
||||||
|
eventId: "$event",
|
||||||
|
sender: "@alice:example.org",
|
||||||
|
type: "m.room.message",
|
||||||
|
ts: Date.now(),
|
||||||
|
content: {
|
||||||
|
msgtype: "m.text",
|
||||||
|
body: "hello",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
matrixJsClient.decryptEventIfNeeded = vi.fn(async () => {
|
||||||
|
encrypted.emit("decrypted", decrypted);
|
||||||
|
});
|
||||||
|
|
||||||
|
await client.start();
|
||||||
|
matrixJsClient.emit("event", encrypted);
|
||||||
|
encrypted.emit("decrypted", encrypted, new Error("missing room key"));
|
||||||
|
|
||||||
|
client.stopSyncWithoutPersist();
|
||||||
|
await client.drainPendingDecryptions("test shutdown");
|
||||||
|
|
||||||
|
expect(matrixJsClient.stopClient).toHaveBeenCalledTimes(1);
|
||||||
|
expect(matrixJsClient.decryptEventIfNeeded).toHaveBeenCalledTimes(1);
|
||||||
|
expect(delivered).toEqual(["m.room.message"]);
|
||||||
|
});
|
||||||
|
|
||||||
it("retries failed decryptions immediately on crypto key update signals", async () => {
|
it("retries failed decryptions immediately on crypto key update signals", async () => {
|
||||||
vi.useFakeTimers();
|
vi.useFakeTimers();
|
||||||
const client = new MatrixClient("https://matrix.example.org", "token", undefined, undefined, {
|
const client = new MatrixClient("https://matrix.example.org", "token", undefined, undefined, {
|
||||||
|
|||||||
@@ -370,13 +370,17 @@ export class MatrixClient {
|
|||||||
clearInterval(this.idbPersistTimer);
|
clearInterval(this.idbPersistTimer);
|
||||||
this.idbPersistTimer = null;
|
this.idbPersistTimer = null;
|
||||||
}
|
}
|
||||||
this.decryptBridge.stop();
|
|
||||||
this.client.stopClient();
|
this.client.stopClient();
|
||||||
this.started = false;
|
this.started = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async drainPendingDecryptions(reason = "matrix client shutdown"): Promise<void> {
|
||||||
|
await this.decryptBridge.drainPendingDecryptions(reason);
|
||||||
|
}
|
||||||
|
|
||||||
stop(): void {
|
stop(): void {
|
||||||
this.stopSyncWithoutPersist();
|
this.stopSyncWithoutPersist();
|
||||||
|
this.decryptBridge.stop();
|
||||||
// Final persist on shutdown
|
// Final persist on shutdown
|
||||||
this.syncStore?.markCleanShutdown();
|
this.syncStore?.markCleanShutdown();
|
||||||
this.stopPersistPromise = Promise.all([
|
this.stopPersistPromise = Promise.all([
|
||||||
|
|||||||
@@ -51,6 +51,8 @@ export class MatrixDecryptBridge<TRawEvent extends DecryptBridgeRawEvent> {
|
|||||||
private readonly decryptedMessageDedupe = new Map<string, number>();
|
private readonly decryptedMessageDedupe = new Map<string, number>();
|
||||||
private readonly decryptRetries = new Map<string, MatrixDecryptRetryState>();
|
private readonly decryptRetries = new Map<string, MatrixDecryptRetryState>();
|
||||||
private readonly failedDecryptionsNotified = new Set<string>();
|
private readonly failedDecryptionsNotified = new Set<string>();
|
||||||
|
private activeRetryRuns = 0;
|
||||||
|
private readonly retryIdleResolvers = new Set<() => void>();
|
||||||
private cryptoRetrySignalsBound = false;
|
private cryptoRetrySignalsBound = false;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
@@ -139,6 +141,22 @@ export class MatrixDecryptBridge<TRawEvent extends DecryptBridgeRawEvent> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async drainPendingDecryptions(reason: string): Promise<void> {
|
||||||
|
for (let attempts = 0; attempts < MATRIX_DECRYPT_RETRY_MAX_ATTEMPTS; attempts += 1) {
|
||||||
|
if (this.decryptRetries.size === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.retryPendingNow(reason);
|
||||||
|
await this.waitForActiveRetryRunsToFinish();
|
||||||
|
const hasPendingRetryTimers = Array.from(this.decryptRetries.values()).some(
|
||||||
|
(state) => state.timer || state.inFlight,
|
||||||
|
);
|
||||||
|
if (!hasPendingRetryTimers) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private handleEncryptedEventDecrypted(params: {
|
private handleEncryptedEventDecrypted(params: {
|
||||||
roomId: string;
|
roomId: string;
|
||||||
encryptedEvent: MatrixEvent;
|
encryptedEvent: MatrixEvent;
|
||||||
@@ -246,9 +264,12 @@ export class MatrixDecryptBridge<TRawEvent extends DecryptBridgeRawEvent> {
|
|||||||
|
|
||||||
state.inFlight = true;
|
state.inFlight = true;
|
||||||
state.timer = null;
|
state.timer = null;
|
||||||
|
this.activeRetryRuns += 1;
|
||||||
const canDecrypt = typeof this.deps.client.decryptEventIfNeeded === "function";
|
const canDecrypt = typeof this.deps.client.decryptEventIfNeeded === "function";
|
||||||
if (!canDecrypt) {
|
if (!canDecrypt) {
|
||||||
this.clearDecryptRetry(retryKey);
|
this.clearDecryptRetry(retryKey);
|
||||||
|
this.activeRetryRuns = Math.max(0, this.activeRetryRuns - 1);
|
||||||
|
this.resolveRetryIdleIfNeeded();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -260,8 +281,13 @@ export class MatrixDecryptBridge<TRawEvent extends DecryptBridgeRawEvent> {
|
|||||||
// Retry with backoff until we hit the configured retry cap.
|
// Retry with backoff until we hit the configured retry cap.
|
||||||
} finally {
|
} finally {
|
||||||
state.inFlight = false;
|
state.inFlight = false;
|
||||||
|
this.activeRetryRuns = Math.max(0, this.activeRetryRuns - 1);
|
||||||
|
this.resolveRetryIdleIfNeeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.decryptRetries.get(retryKey) !== state) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (isDecryptionFailure(state.event)) {
|
if (isDecryptionFailure(state.event)) {
|
||||||
this.scheduleDecryptRetry(state);
|
this.scheduleDecryptRetry(state);
|
||||||
return;
|
return;
|
||||||
@@ -304,4 +330,23 @@ export class MatrixDecryptBridge<TRawEvent extends DecryptBridgeRawEvent> {
|
|||||||
this.decryptedMessageDedupe.delete(oldest);
|
this.decryptedMessageDedupe.delete(oldest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async waitForActiveRetryRunsToFinish(): Promise<void> {
|
||||||
|
if (this.activeRetryRuns === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
this.retryIdleResolvers.add(resolve);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private resolveRetryIdleIfNeeded(): void {
|
||||||
|
if (this.activeRetryRuns !== 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (const resolve of this.retryIdleResolvers) {
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
this.retryIdleResolvers.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user