Matrix: reject stale history snapshots after room recreation

This commit is contained in:
Gustavo Madeira Santana
2026-03-30 14:08:49 -04:00
parent 1e46af399f
commit ec1655f331
3 changed files with 54 additions and 16 deletions

View File

@@ -825,7 +825,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
})
: undefined;
const inboundHistory = preparedTrigger?.history;
const triggerSnapshotIdx = preparedTrigger?.snapshotIdx ?? -1;
const triggerSnapshot = preparedTrigger;
return {
route: _route,
@@ -843,7 +843,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
media,
locationPayload,
messageId: _messageId,
triggerSnapshotIdx,
triggerSnapshot,
threadRootId: _threadRootId,
};
});
@@ -867,7 +867,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
media,
locationPayload,
messageId: _messageId,
triggerSnapshotIdx,
triggerSnapshot,
threadRootId: _threadRootId,
} = ingressResult;
@@ -1319,8 +1319,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
// Advance the per-agent watermark now that the reply succeeded (or no reply was needed).
// Only advance to the snapshot position — messages added during async processing remain
// visible for the next trigger.
if (isRoom && triggerSnapshotIdx >= 0) {
roomHistoryTracker.consumeHistory(_route.agentId, roomId, triggerSnapshotIdx, _messageId);
if (isRoom && triggerSnapshot) {
roomHistoryTracker.consumeHistory(_route.agentId, roomId, triggerSnapshot, _messageId);
}
if (!queuedFinal) {
await commitInboundEventIfClaimed();

View File

@@ -204,7 +204,7 @@ describe("createRoomHistoryTracker — roomQueues eviction", () => {
tracker.recordPending(room2, entry("msg in room2"));
// Late completion for the evicted room must not recreate a stale watermark.
tracker.consumeHistory(AGENT, room1, prepared.snapshotIdx, "$trigger");
tracker.consumeHistory(AGENT, room1, prepared, "$trigger");
// Recreate room1 and add fresh content.
tracker.recordPending(room1, entry("new msg in room1"));
@@ -212,4 +212,22 @@ describe("createRoomHistoryTracker — roomQueues eviction", () => {
expect(history).toHaveLength(1);
expect(history[0]?.body).toBe("new msg in room1");
});
it("rejects stale snapshots after the room queue is recreated", () => {
const tracker = createRoomHistoryTrackerForTests(200, 1);
const room1 = "!room1:test";
const room2 = "!room2:test";
tracker.recordPending(room1, entry("old msg in room1"));
const staleSnapshot = tracker.recordTrigger(room1, entry("trigger in room1"));
tracker.recordPending(room2, entry("msg in room2")); // evicts room1
tracker.recordPending(room1, entry("new msg in room1")); // recreates room1 with new generation
tracker.consumeHistory(AGENT, room1, staleSnapshot);
const history = tracker.getPendingHistory(AGENT, room1, 100);
expect(history).toHaveLength(1);
expect(history[0]?.body).toBe("new msg in room1");
});
});

View File

@@ -26,10 +26,14 @@ const MAX_PREPARED_TRIGGER_ENTRIES = 500;
export type { HistoryEntry };
export type HistorySnapshotToken = {
snapshotIdx: number;
queueGeneration: number;
};
export type PreparedTriggerResult = {
history: HistoryEntry[];
snapshotIdx: number;
};
} & HistorySnapshotToken;
export type RoomHistoryTracker = {
/**
@@ -57,7 +61,7 @@ export type RoomHistoryTracker = {
consumeHistory: (
agentId: string,
roomId: string,
snapshotIdx: number,
snapshot: HistorySnapshotToken,
messageId?: string,
) => void;
};
@@ -71,13 +75,14 @@ export type RoomHistoryTrackerTestApi = RoomHistoryTracker & {
/**
* Test-only helper for manually appending a trigger entry and snapshot index.
*/
recordTrigger: (roomId: string, entry: HistoryEntry) => number;
recordTrigger: (roomId: string, entry: HistoryEntry) => HistorySnapshotToken;
};
type RoomQueue = {
entries: HistoryEntry[];
/** Absolute index of entries[0] — increases as old entries are trimmed. */
baseIndex: number;
generation: number;
preparedTriggers: Map<string, PreparedTriggerResult>;
};
@@ -90,6 +95,7 @@ function createRoomHistoryTrackerInternal(
const roomQueues = new Map<string, RoomQueue>();
/** Maps `${agentId}:${roomId}` → absolute consumed-up-to index */
const agentWatermarks = new Map<string, number>();
let nextQueueGeneration = 1;
function clearRoomWatermarks(roomId: string): void {
const roomSuffix = `:${roomId}`;
@@ -103,7 +109,12 @@ function createRoomHistoryTrackerInternal(
function getOrCreateQueue(roomId: string): RoomQueue {
let queue = roomQueues.get(roomId);
if (!queue) {
queue = { entries: [], baseIndex: 0, preparedTriggers: new Map() };
queue = {
entries: [],
baseIndex: 0,
generation: nextQueueGeneration++,
preparedTriggers: new Map(),
};
roomQueues.set(roomId, queue);
// FIFO eviction to prevent unbounded growth across many rooms
if (roomQueues.size > maxRoomQueues) {
@@ -117,14 +128,17 @@ function createRoomHistoryTrackerInternal(
return queue;
}
function appendToQueue(queue: RoomQueue, entry: HistoryEntry): number {
function appendToQueue(queue: RoomQueue, entry: HistoryEntry): HistorySnapshotToken {
queue.entries.push(entry);
if (queue.entries.length > maxQueueSize) {
const overflow = queue.entries.length - maxQueueSize;
queue.entries.splice(0, overflow);
queue.baseIndex += overflow;
}
return queue.baseIndex + queue.entries.length;
return {
snapshotIdx: queue.baseIndex + queue.entries.length,
queueGeneration: queue.generation,
};
}
function wmKey(agentId: string, roomId: string): string {
@@ -217,7 +231,7 @@ function createRoomHistoryTrackerInternal(
}
const prepared = {
history: computePendingHistory(queue, agentId, roomId, limit),
snapshotIdx: appendToQueue(queue, entry),
...appendToQueue(queue, entry),
};
if (retryKey) {
return rememberPreparedTrigger(queue, retryKey, prepared);
@@ -225,7 +239,7 @@ function createRoomHistoryTrackerInternal(
return prepared;
},
consumeHistory(agentId, roomId, snapshotIdx, messageId) {
consumeHistory(agentId, roomId, snapshot, messageId) {
const key = wmKey(agentId, roomId);
const queue = roomQueues.get(roomId);
if (!queue) {
@@ -234,10 +248,16 @@ function createRoomHistoryTrackerInternal(
agentWatermarks.delete(key);
return;
}
if (queue.generation !== snapshot.queueGeneration) {
// The room was evicted and recreated before this trigger completed. Reject the stale
// snapshot so it cannot advance the watermark for the new queue generation.
agentWatermarks.delete(key);
return;
}
// Monotone write: never regress an already-advanced watermark.
// Guards against out-of-order completion when two triggers for the same
// (agentId, roomId) are in-flight concurrently.
rememberWatermark(key, snapshotIdx);
rememberWatermark(key, snapshot.snapshotIdx);
const retryKey = preparedTriggerKey(agentId, messageId);
if (queue && retryKey) {
queue.preparedTriggers.delete(retryKey);