diff --git a/extensions/matrix/src/matrix/monitor/handler.group-history.test.ts b/extensions/matrix/src/matrix/monitor/handler.group-history.test.ts index e5dd7e94682..691eeb641ab 100644 --- a/extensions/matrix/src/matrix/monitor/handler.group-history.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.group-history.test.ts @@ -60,6 +60,14 @@ beforeEach(() => { installMatrixMonitorTestRuntime(); }); +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + const promise = new Promise((res) => { + resolve = res; + }); + return { promise, resolve }; +} + describe("matrix group chat history — scenario 1: basic accumulation", () => { it("pending messages appear in InboundHistory; trigger itself does not", async () => { const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); @@ -185,6 +193,39 @@ describe("matrix group chat history — scenario 1: basic accumulation", () => { expect(history ?? []).toHaveLength(0); }); + it("historyLimit=0 does not serialize same-room ingress", async () => { + const firstUserId = deferred(); + let getUserIdCalls = 0; + const { handler } = createMatrixHandlerTestHarness({ + historyLimit: 0, + groupPolicy: "open", + isDirectMessage: false, + client: { + getUserId: async () => { + getUserIdCalls += 1; + if (getUserIdCalls === 1) { + return await firstUserId.promise; + } + return "@bot:example.org"; + }, + }, + dispatchReplyFromConfig: async () => ({ + queuedFinal: true, + counts: { final: 1, block: 0, tool: 0 }, + }), + }); + + const first = handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$a", body: "first" })); + await Promise.resolve(); + const second = handler(DEFAULT_ROOM, makeRoomTriggerEvent({ eventId: "$b", body: "second" })); + await Promise.resolve(); + + expect(getUserIdCalls).toBe(2); + + firstUserId.resolve("@bot:example.org"); + await Promise.all([first, second]); + }); + it("DMs do not accumulate history (group chat only)", async () => { const finalizeInboundContext = vi.fn((ctx: unknown) => ctx); const { handler } = createMatrixHandlerTestHarness({ diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 33941101272..838d86df1f8 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -287,6 +287,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam } }; + const runHistoryAwareRoomIngress = async ( + roomId: string, + task: () => Promise, + ): Promise => (historyLimit > 0 ? runRoomIngress(roomId, task) : task()); + return async (roomId: string, event: MatrixRawEvent) => { const eventId = typeof event.event_id === "string" ? event.event_id.trim() : ""; let claimedInboundEvent = false; @@ -331,7 +336,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam await inboundDeduper.commitEvent({ roomId, eventId }); claimedInboundEvent = false; }; - const ingressResult = await runRoomIngress(roomId, async () => { + const ingressResult = await runHistoryAwareRoomIngress(roomId, async () => { const selfUserId = await client.getUserId(); if (senderId === selfUserId) { return; diff --git a/extensions/matrix/src/matrix/monitor/room-history.test.ts b/extensions/matrix/src/matrix/monitor/room-history.test.ts index edf493ba0f1..cb0e909f6f5 100644 --- a/extensions/matrix/src/matrix/monitor/room-history.test.ts +++ b/extensions/matrix/src/matrix/monitor/room-history.test.ts @@ -91,6 +91,41 @@ describe("createRoomHistoryTracker — watermark monotonicity", () => { expect(room1History).toHaveLength(1); expect(room1History[0]?.body).toBe("new msg in room1"); }); + + it("refreshes prepared-trigger recency before capped eviction on retry hits", () => { + const tracker = createRoomHistoryTracker(200, 10, 5000, 2); + const room1 = "!room1:test"; + + tracker.prepareTrigger(AGENT, room1, 100, { + sender: "user", + body: "trigger1", + messageId: "$trigger1", + }); + tracker.prepareTrigger(AGENT, room1, 100, { + sender: "user", + body: "trigger2", + messageId: "$trigger2", + }); + + // Retry hit should refresh trigger1 so trigger2 becomes the stale entry. + const retried = tracker.prepareTrigger(AGENT, room1, 100, { + sender: "user", + body: "trigger1", + messageId: "$trigger1", + }); + tracker.prepareTrigger(AGENT, room1, 100, { + sender: "user", + body: "trigger3", + messageId: "$trigger3", + }); + + const reused = tracker.prepareTrigger(AGENT, room1, 100, { + sender: "user", + body: "trigger1", + messageId: "$trigger1", + }); + expect(reused.snapshotIdx).toBe(retried.snapshotIdx); + }); }); describe("createRoomHistoryTracker — roomQueues eviction", () => { diff --git a/extensions/matrix/src/matrix/monitor/room-history.ts b/extensions/matrix/src/matrix/monitor/room-history.ts index aabc535e422..079f291e512 100644 --- a/extensions/matrix/src/matrix/monitor/room-history.ts +++ b/extensions/matrix/src/matrix/monitor/room-history.ts @@ -86,6 +86,7 @@ export function createRoomHistoryTracker( maxQueueSize = DEFAULT_MAX_QUEUE_SIZE, maxRoomQueues = DEFAULT_MAX_ROOM_QUEUES, maxWatermarkEntries = MAX_WATERMARK_ENTRIES, + maxPreparedTriggerEntries = MAX_PREPARED_TRIGGER_ENTRIES, ): RoomHistoryTracker { const roomQueues = new Map(); /** Maps `${agentId}:${roomId}` → absolute consumed-up-to index */ @@ -153,6 +154,25 @@ export function createRoomHistoryTracker( } } + function rememberPreparedTrigger( + queue: RoomQueue, + retryKey: string, + prepared: PreparedTriggerResult, + ): PreparedTriggerResult { + if (queue.preparedTriggers.has(retryKey)) { + // Refresh insertion order so capped eviction keeps actively retried events hot. + queue.preparedTriggers.delete(retryKey); + } + queue.preparedTriggers.set(retryKey, prepared); + if (queue.preparedTriggers.size > maxPreparedTriggerEntries) { + const oldest = queue.preparedTriggers.keys().next().value; + if (oldest !== undefined) { + queue.preparedTriggers.delete(oldest); + } + } + return prepared; + } + function computePendingHistory( queue: RoomQueue, agentId: string, @@ -193,7 +213,7 @@ export function createRoomHistoryTracker( if (retryKey) { const prepared = queue.preparedTriggers.get(retryKey); if (prepared) { - return prepared; + return rememberPreparedTrigger(queue, retryKey, prepared); } } const prepared = { @@ -201,13 +221,7 @@ export function createRoomHistoryTracker( snapshotIdx: appendToQueue(queue, entry), }; if (retryKey) { - queue.preparedTriggers.set(retryKey, prepared); - if (queue.preparedTriggers.size > MAX_PREPARED_TRIGGER_ENTRIES) { - const oldest = queue.preparedTriggers.keys().next().value; - if (oldest !== undefined) { - queue.preparedTriggers.delete(oldest); - } - } + return rememberPreparedTrigger(queue, retryKey, prepared); } return prepared; },