Matrix: avoid idle ingress serialization

This commit is contained in:
Gustavo Madeira Santana
2026-03-30 11:51:05 -04:00
parent 7eac926f44
commit 97b5229620
4 changed files with 104 additions and 9 deletions

View File

@@ -60,6 +60,14 @@ beforeEach(() => {
installMatrixMonitorTestRuntime();
});
function deferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void;
const promise = new Promise<T>((res) => {
resolve = res;
});
return { promise, resolve };
}
describe("matrix group chat history — scenario 1: basic accumulation", () => {
it("pending messages appear in InboundHistory; trigger itself does not", async () => {
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
@@ -185,6 +193,39 @@ describe("matrix group chat history — scenario 1: basic accumulation", () => {
expect(history ?? []).toHaveLength(0);
});
it("historyLimit=0 does not serialize same-room ingress", async () => {
const firstUserId = deferred<string>();
let getUserIdCalls = 0;
const { handler } = createMatrixHandlerTestHarness({
historyLimit: 0,
groupPolicy: "open",
isDirectMessage: false,
client: {
getUserId: async () => {
getUserIdCalls += 1;
if (getUserIdCalls === 1) {
return await firstUserId.promise;
}
return "@bot:example.org";
},
},
dispatchReplyFromConfig: async () => ({
queuedFinal: true,
counts: { final: 1, block: 0, tool: 0 },
}),
});
const first = handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$a", body: "first" }));
await Promise.resolve();
const second = handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$b", body: "second" }));
await Promise.resolve();
expect(getUserIdCalls).toBe(2);
firstUserId.resolve("@bot:example.org");
await Promise.all([first, second]);
});
it("DMs do not accumulate history (group chat only)", async () => {
const finalizeInboundContext = vi.fn((ctx: unknown) => ctx);
const { handler } = createMatrixHandlerTestHarness({

View File

@@ -287,6 +287,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
}
};
const runHistoryAwareRoomIngress = async <T>(
roomId: string,
task: () => Promise<T>,
): Promise<T> => (historyLimit > 0 ? runRoomIngress(roomId, task) : task());
return async (roomId: string, event: MatrixRawEvent) => {
const eventId = typeof event.event_id === "string" ? event.event_id.trim() : "";
let claimedInboundEvent = false;
@@ -331,7 +336,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
await inboundDeduper.commitEvent({ roomId, eventId });
claimedInboundEvent = false;
};
const ingressResult = await runRoomIngress(roomId, async () => {
const ingressResult = await runHistoryAwareRoomIngress(roomId, async () => {
const selfUserId = await client.getUserId();
if (senderId === selfUserId) {
return;

View File

@@ -91,6 +91,41 @@ describe("createRoomHistoryTracker — watermark monotonicity", () => {
expect(room1History).toHaveLength(1);
expect(room1History[0]?.body).toBe("new msg in room1");
});
it("refreshes prepared-trigger recency before capped eviction on retry hits", () => {
const tracker = createRoomHistoryTracker(200, 10, 5000, 2);
const room1 = "!room1:test";
tracker.prepareTrigger(AGENT, room1, 100, {
sender: "user",
body: "trigger1",
messageId: "$trigger1",
});
tracker.prepareTrigger(AGENT, room1, 100, {
sender: "user",
body: "trigger2",
messageId: "$trigger2",
});
// Retry hit should refresh trigger1 so trigger2 becomes the stale entry.
const retried = tracker.prepareTrigger(AGENT, room1, 100, {
sender: "user",
body: "trigger1",
messageId: "$trigger1",
});
tracker.prepareTrigger(AGENT, room1, 100, {
sender: "user",
body: "trigger3",
messageId: "$trigger3",
});
const reused = tracker.prepareTrigger(AGENT, room1, 100, {
sender: "user",
body: "trigger1",
messageId: "$trigger1",
});
expect(reused.snapshotIdx).toBe(retried.snapshotIdx);
});
});
describe("createRoomHistoryTracker — roomQueues eviction", () => {

View File

@@ -86,6 +86,7 @@ export function createRoomHistoryTracker(
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE,
maxRoomQueues = DEFAULT_MAX_ROOM_QUEUES,
maxWatermarkEntries = MAX_WATERMARK_ENTRIES,
maxPreparedTriggerEntries = MAX_PREPARED_TRIGGER_ENTRIES,
): RoomHistoryTracker {
const roomQueues = new Map<string, RoomQueue>();
/** Maps `${agentId}:${roomId}` → absolute consumed-up-to index */
@@ -153,6 +154,25 @@ export function createRoomHistoryTracker(
}
}
function rememberPreparedTrigger(
queue: RoomQueue,
retryKey: string,
prepared: PreparedTriggerResult,
): PreparedTriggerResult {
if (queue.preparedTriggers.has(retryKey)) {
// Refresh insertion order so capped eviction keeps actively retried events hot.
queue.preparedTriggers.delete(retryKey);
}
queue.preparedTriggers.set(retryKey, prepared);
if (queue.preparedTriggers.size > maxPreparedTriggerEntries) {
const oldest = queue.preparedTriggers.keys().next().value;
if (oldest !== undefined) {
queue.preparedTriggers.delete(oldest);
}
}
return prepared;
}
function computePendingHistory(
queue: RoomQueue,
agentId: string,
@@ -193,7 +213,7 @@ export function createRoomHistoryTracker(
if (retryKey) {
const prepared = queue.preparedTriggers.get(retryKey);
if (prepared) {
return prepared;
return rememberPreparedTrigger(queue, retryKey, prepared);
}
}
const prepared = {
@@ -201,13 +221,7 @@ export function createRoomHistoryTracker(
snapshotIdx: appendToQueue(queue, entry),
};
if (retryKey) {
queue.preparedTriggers.set(retryKey, prepared);
if (queue.preparedTriggers.size > MAX_PREPARED_TRIGGER_ENTRIES) {
const oldest = queue.preparedTriggers.keys().next().value;
if (oldest !== undefined) {
queue.preparedTriggers.delete(oldest);
}
}
return rememberPreparedTrigger(queue, retryKey, prepared);
}
return prepared;
},