fix(matrix): make delivery replay retries explicit

This commit is contained in:
Vincent Koc
2026-04-13 16:02:55 +01:00
parent c73e80b5a7
commit 6c4cfa585f
2 changed files with 127 additions and 11 deletions

View File

@@ -9,7 +9,7 @@ import {
import { beforeEach, describe, expect, it, vi } from "vitest";
import { installMatrixMonitorTestRuntime } from "../../test-runtime.js";
import { MATRIX_OPENCLAW_FINALIZED_PREVIEW_KEY } from "../send/types.js";
import { createMatrixRoomMessageHandler } from "./handler.js";
import { createMatrixRoomMessageHandler, MatrixRetryableInboundError } from "./handler.js";
import {
createMatrixHandlerTestHarness,
createMatrixReactionEvent,
@@ -1845,7 +1845,7 @@ describe("matrix monitor handler durable inbound dedupe", () => {
expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("matrix handler failed"));
});
it("releases a claimed event when queued final delivery fails", async () => {
it("keeps replay committed when queued final delivery fails after a generic error", async () => {
const inboundDeduper = {
claimEvent: vi.fn(() => true),
commitEvent: vi.fn(async () => undefined),
@@ -1882,18 +1882,18 @@ describe("matrix monitor handler durable inbound dedupe", () => {
}),
);
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({
expect(inboundDeduper.commitEvent).toHaveBeenCalledWith({
roomId: "!room:example.org",
eventId: "$release-on-final-delivery-error",
});
expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled();
expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining("matrix final reply failed"),
);
});
it.each(["tool", "block"] as const)(
"releases a claimed event when queued %s delivery fails and no final reply exists",
"keeps replay committed when queued %s delivery fails after a generic error and no final reply exists",
async (kind) => {
const inboundDeduper = {
claimEvent: vi.fn(() => true),
@@ -1935,17 +1935,108 @@ describe("matrix monitor handler durable inbound dedupe", () => {
}),
);
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({
expect(inboundDeduper.commitEvent).toHaveBeenCalledWith({
roomId: "!room:example.org",
eventId: `$release-on-${kind}-delivery-error`,
});
expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled();
expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining(`matrix ${kind} reply failed`),
);
},
);
it("releases a claimed event when queued final delivery fails with an explicit retryable error", async () => {
const inboundDeduper = {
claimEvent: vi.fn(() => true),
commitEvent: vi.fn(async () => undefined),
releaseEvent: vi.fn(),
};
const runtime = {
error: vi.fn(),
};
const { handler } = createMatrixHandlerTestHarness({
inboundDeduper,
runtime: runtime as never,
dispatchReplyFromConfig: vi.fn(async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
})),
createReplyDispatcherWithTyping: (params) => ({
dispatcher: {
markComplete: () => {},
waitForIdle: async () => {
params?.onError?.(new MatrixRetryableInboundError("retry send"), { kind: "final" });
},
},
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
}),
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: "$retryable-final-delivery-error",
body: "hello",
}),
);
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({
roomId: "!room:example.org",
eventId: "$retryable-final-delivery-error",
});
});
it.each(["tool", "block"] as const)(
"releases a claimed event when queued %s delivery fails with an explicit retryable error and no final reply exists",
async (kind) => {
const inboundDeduper = {
claimEvent: vi.fn(() => true),
commitEvent: vi.fn(async () => undefined),
releaseEvent: vi.fn(),
};
const { handler } = createMatrixHandlerTestHarness({
inboundDeduper,
dispatchReplyFromConfig: vi.fn(async () => ({
queuedFinal: false,
counts: {
final: 0,
block: kind === "block" ? 1 : 0,
tool: kind === "tool" ? 1 : 0,
},
})),
createReplyDispatcherWithTyping: (params) => ({
dispatcher: {
markComplete: () => {},
waitForIdle: async () => {
params?.onError?.(new MatrixRetryableInboundError("retry send"), { kind });
},
},
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
}),
});
await handler(
"!room:example.org",
createMatrixTextMessageEvent({
eventId: `$retryable-${kind}-delivery-error`,
body: "hello",
}),
);
expect(inboundDeduper.commitEvent).not.toHaveBeenCalled();
expect(inboundDeduper.releaseEvent).toHaveBeenCalledWith({
roomId: "!room:example.org",
eventId: `$retryable-${kind}-delivery-error`,
});
},
);
it("commits a claimed event when dispatch completes without a final reply", async () => {
const callOrder: string[] = [];
const inboundDeduper = {

View File

@@ -84,6 +84,13 @@ const MAX_TRACKED_PAIRING_REPLY_SENDERS = 512;
const MAX_TRACKED_SHARED_DM_CONTEXT_NOTICES = 512;
type MatrixAllowBotsMode = "off" | "mentions" | "all";
export class MatrixRetryableInboundError extends Error {
constructor(message: string, options?: ErrorOptions) {
super(message, options);
this.name = "MatrixRetryableInboundError";
}
}
async function redactMatrixDraftEvent(
client: MatrixClient,
roomId: string,
@@ -1273,6 +1280,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, _route.agentId);
let finalReplyDeliveryFailed = false;
let nonFinalReplyDeliveryFailed = false;
let retryableReplyDeliveryFailed = false;
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
cfg,
agentId: _route.agentId,
@@ -1568,6 +1576,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
}
},
onError: (err: unknown, info: { kind: "tool" | "block" | "final" }) => {
if (err instanceof MatrixRetryableInboundError) {
retryableReplyDeliveryFailed = true;
}
if (info.kind === "final") {
finalReplyDeliveryFailed = true;
} else {
@@ -1632,17 +1643,31 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
},
});
if (finalReplyDeliveryFailed) {
if (retryableReplyDeliveryFailed) {
logVerboseMessage(
`matrix: final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`,
);
// Explicit retryable failures reopen replay so the same history can be retried.
return;
}
logVerboseMessage(
`matrix: final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`,
`matrix: final reply delivery failed room=${roomId} id=${_messageId}; keeping replay committed`,
);
// Do not advance watermark — the event will be retried and should see the same history.
await commitInboundEventIfClaimed();
return;
}
if (!queuedFinal && nonFinalReplyDeliveryFailed) {
if (retryableReplyDeliveryFailed) {
logVerboseMessage(
`matrix: non-final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`,
);
// Explicit retryable failures reopen replay.
return;
}
logVerboseMessage(
`matrix: non-final reply delivery failed room=${roomId} id=${_messageId}; leaving event uncommitted`,
`matrix: non-final reply delivery failed room=${roomId} id=${_messageId}; keeping replay committed`,
);
// Do not advance watermark — the event will be retried.
await commitInboundEventIfClaimed();
return;
}
// Advance the per-agent watermark now that the reply succeeded (or no reply was needed).