Matrix: harden inbound dedupe lifecycle

This commit is contained in:
Gustavo Madeira Santana
2026-03-20 01:20:33 -07:00
parent cc6ec7c51a
commit 696d3f13d7
5 changed files with 165 additions and 25 deletions

View File

@@ -56,12 +56,21 @@ type MatrixHandlerTestHarnessOptions = {
dispatcher: Record<string, unknown>;
replyOptions: Record<string, unknown>;
markDispatchIdle: () => void;
markRunComplete: () => void;
};
resolveHumanDelayConfig?: () => undefined;
dispatchReplyFromConfig?: () => Promise<{
queuedFinal: boolean;
counts: { final: number; block: number; tool: number };
}>;
withReplyDispatcher?: <T>(params: {
dispatcher: {
markComplete?: () => void;
waitForIdle?: () => Promise<void>;
};
run: () => Promise<T>;
onSettled?: () => void | Promise<void>;
}) => Promise<T>;
inboundDeduper?: MatrixMonitorHandlerParams["inboundDeduper"];
shouldAckReaction?: () => boolean;
enqueueSystemEvent?: (...args: unknown[]) => void;
@@ -139,9 +148,32 @@ export function createMatrixHandlerTestHarness(
dispatcher: {},
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
})),
resolveHumanDelayConfig: options.resolveHumanDelayConfig ?? (() => undefined),
dispatchReplyFromConfig,
withReplyDispatcher:
options.withReplyDispatcher ??
(async <T>(params: {
dispatcher: {
markComplete?: () => void;
waitForIdle?: () => Promise<void>;
};
run: () => Promise<T>;
onSettled?: () => void | Promise<void>;
}) => {
const { dispatcher, run, onSettled } = params;
try {
return await run();
} finally {
dispatcher.markComplete?.();
try {
await dispatcher.waitForIdle?.();
} finally {
await onSettled?.();
}
}
}),
},
reactions: {
shouldAckReaction: options.shouldAckReaction ?? (() => false),

View File

@@ -720,12 +720,36 @@ describe("matrix monitor handler pairing account scope", () => {
dispatcher: {},
replyOptions: {},
markDispatchIdle: () => {},
markRunComplete: () => {},
}),
resolveHumanDelayConfig: () => undefined,
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
withReplyDispatcher: async <T>({
dispatcher,
run,
onSettled,
}: {
dispatcher: {
markComplete?: () => void;
waitForIdle?: () => Promise<void>;
};
run: () => Promise<T>;
onSettled?: () => void | Promise<void>;
}) => {
try {
return await run();
} finally {
dispatcher.markComplete?.();
try {
await dispatcher.waitForIdle?.();
} finally {
await onSettled?.();
}
}
},
},
reactions: {
shouldAckReaction: () => false,
@@ -1022,7 +1046,7 @@ describe("matrix monitor handler durable inbound dedupe", () => {
expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled();
});
it("commits inbound events before reply side effects", async () => {
it("commits inbound events only after queued replies finish delivering", async () => {
const callOrder: string[] = [];
const inboundDeduper = {
claimEvent: vi.fn(() => {
@@ -1050,6 +1074,23 @@ describe("matrix monitor handler durable inbound dedupe", () => {
inboundDeduper,
recordInboundSession,
dispatchReplyFromConfig,
createReplyDispatcherWithTyping: () => ({
dispatcher: {
markComplete: () => {
callOrder.push("mark-complete");
},
waitForIdle: async () => {
callOrder.push("wait-for-idle");
},
},
replyOptions: {},
markDispatchIdle: () => {
callOrder.push("dispatch-idle");
},
markRunComplete: () => {
callOrder.push("run-complete");
},
}),
});
await handler(
@@ -1060,7 +1101,16 @@ describe("matrix monitor handler durable inbound dedupe", () => {
}),
);
expect(callOrder).toEqual(["claim", "record", "dispatch", "commit"]);
expect(callOrder).toEqual([
"claim",
"record",
"dispatch",
"run-complete",
"mark-complete",
"wait-for-idle",
"dispatch-idle",
"commit",
]);
expect(inboundDeduper.releaseEvent).not.toHaveBeenCalled();
});

View File

@@ -30,6 +30,7 @@ import {
} from "../send.js";
import { resolveMatrixMonitorAccessState } from "./access-state.js";
import { resolveMatrixAckReactionConfig } from "./ack-config.js";
import type { MatrixInboundEventDeduper } from "./inbound-dedupe.js";
import { resolveMatrixLocation, type MatrixLocationPayload } from "./location.js";
import { downloadMatrixMedia } from "./media.js";
import { resolveMentions } from "./mentions.js";
@@ -72,11 +73,7 @@ export type MatrixMonitorHandlerParams = {
startupMs: number;
startupGraceMs: number;
dropPreStartupMessages: boolean;
inboundDeduper?: {
claimEvent: (params: { roomId: string; eventId: string }) => boolean;
commitEvent: (params: { roomId: string; eventId: string; eventTs?: number }) => Promise<void>;
releaseEvent: (params: { roomId: string; eventId: string }) => void;
};
inboundDeduper?: Pick<MatrixInboundEventDeduper, "claimEvent" | "commitEvent" | "releaseEvent">;
directTracker: {
isDirectMessage: (params: {
roomId: string;
@@ -268,11 +265,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
if (!claimedInboundEvent || !inboundDeduper || !eventId) {
return;
}
await inboundDeduper.commitEvent({
roomId,
eventId,
eventTs: eventTs ?? undefined,
});
await inboundDeduper.commitEvent({ roomId, eventId });
claimedInboundEvent = false;
};
if (dropPreStartupMessages) {
@@ -871,7 +864,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
});
},
});
const { dispatcher, replyOptions, markDispatchIdle } =
const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } =
core.channel.reply.createReplyDispatcherWithTyping({
...prefixOptions,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
@@ -897,17 +890,28 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
onIdle: typingCallbacks.onIdle,
});
const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
const { queuedFinal, counts } = await core.channel.reply.withReplyDispatcher({
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: roomConfig?.skills,
onModelSelected,
onSettled: () => {
markDispatchIdle();
},
run: async () => {
try {
return await core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: roomConfig?.skills,
onModelSelected,
},
});
} finally {
markRunComplete();
}
},
});
markDispatchIdle();
if (!queuedFinal) {
await commitInboundEventIfClaimed();
return;

View File

@@ -97,4 +97,50 @@ describe("Matrix inbound event dedupe", () => {
expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$keep-2" })).toBe(false);
expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$keep-3" })).toBe(false);
});
it("retains replayed backlog events based on processing time", async () => {
const storagePath = createStoragePath();
let now = 100;
const first = await createMatrixInboundEventDeduper({
auth: auth as never,
storagePath,
ttlMs: 20,
nowMs: () => now,
});
expect(first.claimEvent({ roomId: "!room:example.org", eventId: "$backlog" })).toBe(true);
await first.commitEvent({
roomId: "!room:example.org",
eventId: "$backlog",
});
await first.stop();
now = 110;
const second = await createMatrixInboundEventDeduper({
auth: auth as never,
storagePath,
ttlMs: 20,
nowMs: () => now,
});
expect(second.claimEvent({ roomId: "!room:example.org", eventId: "$backlog" })).toBe(false);
});
it("treats stop persistence failures as best-effort cleanup", async () => {
const blockingPath = createStoragePath();
fs.writeFileSync(blockingPath, "blocking file", "utf8");
const deduper = await createMatrixInboundEventDeduper({
auth: auth as never,
storagePath: path.join(blockingPath, "nested", "inbound-dedupe.json"),
});
expect(deduper.claimEvent({ roomId: "!room:example.org", eventId: "$persist-fail" })).toBe(
true,
);
await deduper.commitEvent({
roomId: "!room:example.org",
eventId: "$persist-fail",
});
await expect(deduper.stop()).resolves.toBeUndefined();
});
});

View File

@@ -23,7 +23,7 @@ type StoredMatrixInboundDedupeState = {
export type MatrixInboundEventDeduper = {
claimEvent: (params: { roomId: string; eventId: string }) => boolean;
commitEvent: (params: { roomId: string; eventId: string; eventTs?: number }) => Promise<void>;
commitEvent: (params: { roomId: string; eventId: string }) => Promise<void>;
releaseEvent: (params: { roomId: string; eventId: string }) => void;
flush: () => Promise<void>;
stop: () => Promise<void>;
@@ -252,13 +252,13 @@ export async function createMatrixInboundEventDeduper(params: {
pending.add(key);
return true;
},
commitEvent: async ({ roomId, eventId, eventTs }) => {
commitEvent: async ({ roomId, eventId }) => {
const key = buildEventKey({ roomId, eventId });
if (!key) {
return;
}
pending.delete(key);
const ts = normalizeTimestamp(eventTs) ?? nowMs();
const ts = nowMs();
seen.delete(key);
seen.set(key, ts);
pruneSeenEvents({ seen, ttlMs, maxEntries, nowMs: nowMs() });
@@ -273,7 +273,15 @@ export async function createMatrixInboundEventDeduper(params: {
},
flush,
stop: async () => {
await flush();
try {
await flush();
} catch (err) {
LogService.warn(
"MatrixInboundDedupe",
"Failed to flush Matrix inbound dedupe store during stop():",
err,
);
}
},
};
}