From 6c4cfa585fb91fe062c2cfe7bdd5e4db5564c597 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 13 Apr 2026 16:02:55 +0100 Subject: [PATCH] fix(matrix): make delivery replay retries explicit --- .../matrix/src/matrix/monitor/handler.test.ts | 105 ++++++++++++++++-- .../matrix/src/matrix/monitor/handler.ts | 33 +++++- 2 files changed, 127 insertions(+), 11 deletions(-) diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 85cd887c66a..d79a9ef8a49 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -9,7 +9,7 @@ import { import { beforeEach, describe, expect, it, vi } from "vitest"; import { installMatrixMonitorTestRuntime } from "../../test-runtime.js"; import { MATRIX_OPENCLAW_FINALIZED_PREVIEW_KEY } from "../send/types.js"; -import { createMatrixRoomMessageHandler } from "./handler.js"; +import { createMatrixRoomMessageHandler, MatrixRetryableInboundError } from "./handler.js"; import { createMatrixHandlerTestHarness, createMatrixReactionEvent, @@ -1845,7 +1845,7 @@ describe("matrix monitor handler durable inbound dedupe", () => { expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("matrix handler failed")); }); - it("releases a claimed event when queued final delivery fails", async () => { + it("keeps replay committed when queued final delivery fails after a generic error", async () => { const inboundDeduper = { claimEvent: vi.fn(() => true), commitEvent: vi.fn(async () => undefined), @@ -1882,18 +1882,18 @@ describe("matrix monitor handler durable inbound dedupe", () => { }), ); - expect(inboundDeduper.commitEvent).not.toHaveBeenCalled(); - expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({ + expect(inboundDeduper.commitEvent).toHaveBeenCalledWith({ roomId: "!room:example.org", eventId: "$release-on-final-delivery-error", }); + expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled(); expect(runtime.error).toHaveBeenCalledWith( expect.stringContaining("matrix final reply failed"), ); }); it.each(["tool", "block"] as const)( - "releases a claimed event when queued %s delivery fails and no final reply exists", + "keeps replay committed when queued %s delivery fails after a generic error and no final reply exists", async (kind) => { const inboundDeduper = { claimEvent: vi.fn(() => true), @@ -1935,17 +1935,108 @@ describe("matrix monitor handler durable inbound dedupe", () => { }), ); - expect(inboundDeduper.commitEvent).not.toHaveBeenCalled(); - expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({ + expect(inboundDeduper.commitEvent).toHaveBeenCalledWith({ roomId: "!room:example.org", eventId: `$release-on-${kind}-delivery-error`, }); + expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled(); expect(runtime.error).toHaveBeenCalledWith( expect.stringContaining(`matrix ${kind} reply failed`), ); }, ); + it("releases a claimed event when queued final delivery fails with an explicit retryable error", async () => { + const inboundDeduper = { + claimEvent: vi.fn(() => true), + commitEvent: vi.fn(async () => undefined), + releaseEvent: vi.fn(), + }; + const runtime = { + error: vi.fn(), + }; + const { handler } = createMatrixHandlerTestHarness({ + inboundDeduper, + runtime: runtime as never, + dispatchReplyFromConfig: vi.fn(async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + })), + createReplyDispatcherWithTyping: (params) => ({ + dispatcher: { + markComplete: () => {}, + waitForIdle: async () => { + params?.onError?.(new MatrixRetryableInboundError("retry send"), { kind: "final" }); + }, + }, + replyOptions: {}, + markDispatchIdle: () => {}, + markRunComplete: () => {}, + }), + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ + eventId: "$retryable-final-delivery-error", + body: "hello", + }), + ); + + expect(inboundDeduper.commitEvent).not.toHaveBeenCalled(); + expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({ + roomId: "!room:example.org", + eventId: "$retryable-final-delivery-error", + }); + }); + + it.each(["tool", "block"] as const)( + "releases a claimed event when queued %s delivery fails with an explicit retryable error and no final reply exists", + async (kind) => { + const inboundDeduper = { + claimEvent: vi.fn(() => true), + commitEvent: vi.fn(async () => undefined), + releaseEvent: vi.fn(), + }; + const { handler } = createMatrixHandlerTestHarness({ + inboundDeduper, + dispatchReplyFromConfig: vi.fn(async () => ({ + queuedFinal: false, + counts: { + final: 0, + block: kind === "block" ? 1 : 0, + tool: kind === "tool" ? 1 : 0, + }, + })), + createReplyDispatcherWithTyping: (params) => ({ + dispatcher: { + markComplete: () => {}, + waitForIdle: async () => { + params?.onError?.(new MatrixRetryableInboundError("retry send"), { kind }); + }, + }, + replyOptions: {}, + markDispatchIdle: () => {}, + markRunComplete: () => {}, + }), + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ + eventId: `$retryable-${kind}-delivery-error`, + body: "hello", + }), + ); + + expect(inboundDeduper.commitEvent).not.toHaveBeenCalled(); + expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({ + roomId: "!room:example.org", + eventId: `$retryable-${kind}-delivery-error`, + }); + }, + ); + it("commits a claimed event when dispatch completes without a final reply", async () => { const callOrder: string[] = []; const inboundDeduper = { diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index d508a613653..c711aa7b1b9 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -84,6 +84,13 @@ const MAX_TRACKED_PAIRING_REPLY_SENDERS = 512; const MAX_TRACKED_SHARED_DM_CONTEXT_NOTICES = 512; type MatrixAllowBotsMode = "off" | "mentions" | "all"; +export class MatrixRetryableInboundError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "MatrixRetryableInboundError"; + } +} + async function redactMatrixDraftEvent( client: MatrixClient, roomId: string, @@ -1273,6 +1280,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, _route.agentId); let finalReplyDeliveryFailed = false; let nonFinalReplyDeliveryFailed = false; + let retryableReplyDeliveryFailed = false; const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ cfg, agentId: _route.agentId, @@ -1568,6 +1576,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam } }, onError: (err: unknown, info: { kind: "tool" | "block" | "final" }) => { + if (err instanceof MatrixRetryableInboundError) { + retryableReplyDeliveryFailed = true; + } if (info.kind === "final") { finalReplyDeliveryFailed = true; } else { @@ -1632,17 +1643,31 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }, }); if (finalReplyDeliveryFailed) { + if (retryableReplyDeliveryFailed) { + logVerboseMessage( + `matrix: final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`, + ); + // Explicit retryable failures reopen replay so the same history can be retried. + return; + } logVerboseMessage( - `matrix: final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`, + `matrix: final reply delivery failed room=${roomId} id=${_messageId}; keeping replay committed`, ); - // Do not advance watermark — the event will be retried and should see the same history. + await commitInboundEventIfClaimed(); return; } if (!queuedFinal && nonFinalReplyDeliveryFailed) { + if (retryableReplyDeliveryFailed) { + logVerboseMessage( + `matrix: non-final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`, + ); + // Explicit retryable failures reopen replay. + return; + } logVerboseMessage( - `matrix: non-final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`, + `matrix: non-final reply delivery failed room=${roomId} id=${_messageId}; keeping replay committed`, ); - // Do not advance watermark — the event will be retried. + await commitInboundEventIfClaimed(); return; } // Advance the per-agent watermark now that the reply succeeded (or no reply was needed).