test: share matrix restart replay helpers

This commit is contained in:
Peter Steinberger
2026-04-20 20:18:07 +01:00
parent 704feda9da
commit 33254ca696

View File

@@ -23,6 +23,9 @@ import {
} from "./scenario-runtime-state-files.js";
import type { MatrixQaScenarioExecution } from "./scenario-types.js";
type MatrixQaDriverClient = Awaited<ReturnType<typeof primeMatrixQaDriverScenarioClient>>["client"];
type MatrixReplyArtifact = ReturnType<typeof buildMatrixReplyArtifact>;
export async function runHomeserverRestartResumeScenario(context: MatrixQaScenarioContext) {
if (!context.interruptTransport) {
throw new Error("Matrix homeserver restart scenario requires a transport interruption hook");
@@ -191,72 +194,113 @@ export async function runInitialCatchupThenIncrementalScenario(context: MatrixQa
} satisfies MatrixQaScenarioExecution;
}
export async function runRestartReplayDedupeScenario(context: MatrixQaScenarioContext) {
if (!context.restartGateway) {
throw new Error("Matrix restart replay dedupe scenario requires a gateway restart callback");
}
const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_RESTART_ROOM_KEY);
const { client, startSince } = await primeMatrixQaDriverScenarioClient(context);
const replayToken = buildMatrixQaToken("MATRIX_QA_REPLAY_DEDUPE");
const replayBody = buildMentionPrompt(context.sutUserId, replayToken);
async function sendAndAssertRestartReplayReply(params: {
context: MatrixQaScenarioContext;
replyLabel: string;
roomId: string;
tokenPrefix: string;
}) {
const { client, startSince } = await primeMatrixQaDriverScenarioClient(params.context);
const replayToken = buildMatrixQaToken(params.tokenPrefix);
const replayBody = buildMentionPrompt(params.context.sutUserId, replayToken);
const replayDriverEventId = await client.sendTextMessage({
body: replayBody,
mentionUserIds: [context.sutUserId],
roomId,
mentionUserIds: [params.context.sutUserId],
roomId: params.roomId,
});
const firstMatched = await client.waitForRoomEvent({
observedEvents: context.observedEvents,
observedEvents: params.context.observedEvents,
predicate: (event) =>
isMatrixQaExactMarkerReply(event, {
roomId,
sutUserId: context.sutUserId,
roomId: params.roomId,
sutUserId: params.context.sutUserId,
token: replayToken,
}) && event.relatesTo === undefined,
roomId,
roomId: params.roomId,
since: startSince,
timeoutMs: context.timeoutMs,
timeoutMs: params.context.timeoutMs,
});
advanceMatrixQaActorCursor({
actorId: "driver",
syncState: context.syncState,
syncState: params.context.syncState,
nextSince: firstMatched.since,
startSince,
});
const firstReply = buildMatrixReplyArtifact(firstMatched.event, replayToken);
assertTopLevelReplyArtifact("first replay-dedupe reply", firstReply);
assertTopLevelReplyArtifact(params.replyLabel, firstReply);
await context.restartGateway();
return { client, firstMatched, firstReply, replayDriverEventId, replayToken, startSince };
}
const duplicate = await client.waitForOptionalRoomEvent({
observedEvents: context.observedEvents,
async function assertNoRestartReplayDuplicate(params: {
client: MatrixQaDriverClient;
context: MatrixQaScenarioContext;
errorDetails: string[];
errorTitle: string;
firstMatchedSince: string | undefined;
firstReply: MatrixReplyArtifact;
replayToken: string;
roomId: string;
startSince: string;
}) {
const duplicate = await params.client.waitForOptionalRoomEvent({
observedEvents: params.context.observedEvents,
predicate: (event) =>
event.eventId !== firstReply.eventId &&
event.eventId !== params.firstReply.eventId &&
isMatrixQaExactMarkerReply(event, {
roomId,
sutUserId: context.sutUserId,
token: replayToken,
roomId: params.roomId,
sutUserId: params.context.sutUserId,
token: params.replayToken,
}),
roomId,
timeoutMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs),
roomId: params.roomId,
timeoutMs: Math.min(NO_REPLY_WINDOW_MS, params.context.timeoutMs),
});
if (duplicate.matched) {
throw new Error(
[
"Matrix restart replayed an already handled event",
`original driver event: ${replayDriverEventId}`,
...buildMatrixReplyDetails("first reply", firstReply),
params.errorTitle,
...params.errorDetails,
...buildMatrixReplyDetails("first reply", params.firstReply),
...buildMatrixReplyDetails(
"duplicate reply",
buildMatrixReplyArtifact(duplicate.event, replayToken),
buildMatrixReplyArtifact(duplicate.event, params.replayToken),
),
].join("\n"),
);
}
advanceMatrixQaActorCursor({
actorId: "driver",
syncState: context.syncState,
syncState: params.context.syncState,
nextSince: duplicate.since,
startSince: firstMatched.since ?? startSince,
startSince: params.firstMatchedSince ?? params.startSince,
});
}
export async function runRestartReplayDedupeScenario(context: MatrixQaScenarioContext) {
if (!context.restartGateway) {
throw new Error("Matrix restart replay dedupe scenario requires a gateway restart callback");
}
const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_RESTART_ROOM_KEY);
const { client, firstMatched, firstReply, replayDriverEventId, replayToken, startSince } =
await sendAndAssertRestartReplayReply({
context,
replyLabel: "first replay-dedupe reply",
roomId,
tokenPrefix: "MATRIX_QA_REPLAY_DEDUPE",
});
await context.restartGateway();
await assertNoRestartReplayDuplicate({
client,
context,
errorDetails: [`original driver event: ${replayDriverEventId}`],
errorTitle: "Matrix restart replayed an already handled event",
firstMatchedSince: firstMatched.since,
firstReply,
replayToken,
roomId,
startSince,
});
const postRestart = await runAssertedDriverTopLevelScenario({
@@ -308,34 +352,13 @@ export async function runStaleSyncReplayDedupeScenario(context: MatrixQaScenario
});
const staleCursor = syncStore.cursor;
const { client, startSince } = await primeMatrixQaDriverScenarioClient(context);
const replayToken = buildMatrixQaToken("MATRIX_QA_STALE_SYNC_DEDUPE");
const replayBody = buildMentionPrompt(context.sutUserId, replayToken);
const replayDriverEventId = await client.sendTextMessage({
body: replayBody,
mentionUserIds: [context.sutUserId],
roomId,
});
const firstMatched = await client.waitForRoomEvent({
observedEvents: context.observedEvents,
predicate: (event) =>
isMatrixQaExactMarkerReply(event, {
roomId,
sutUserId: context.sutUserId,
token: replayToken,
}) && event.relatesTo === undefined,
roomId,
since: startSince,
timeoutMs: context.timeoutMs,
});
advanceMatrixQaActorCursor({
actorId: "driver",
syncState: context.syncState,
nextSince: firstMatched.since,
startSince,
});
const firstReply = buildMatrixReplyArtifact(firstMatched.event, replayToken);
assertTopLevelReplyArtifact("first stale-sync replay-dedupe reply", firstReply);
const { client, firstMatched, firstReply, replayDriverEventId, replayToken, startSince } =
await sendAndAssertRestartReplayReply({
context,
replyLabel: "first stale-sync replay-dedupe reply",
roomId,
tokenPrefix: "MATRIX_QA_STALE_SYNC_DEDUPE",
});
await waitForMatrixInboundDedupeEntry({
context,
@@ -352,37 +375,19 @@ export async function runStaleSyncReplayDedupeScenario(context: MatrixQaScenario
});
});
const duplicate = await client.waitForOptionalRoomEvent({
observedEvents: context.observedEvents,
predicate: (event) =>
event.eventId !== firstReply.eventId &&
isMatrixQaExactMarkerReply(event, {
roomId,
sutUserId: context.sutUserId,
token: replayToken,
}),
await assertNoRestartReplayDuplicate({
client,
context,
errorDetails: [
`original driver event: ${replayDriverEventId}`,
`stale sync cursor: ${staleCursor}`,
],
errorTitle: "Matrix stale sync cursor replayed an already handled event",
firstMatchedSince: firstMatched.since,
firstReply,
replayToken,
roomId,
timeoutMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs),
});
if (duplicate.matched) {
throw new Error(
[
"Matrix stale sync cursor replayed an already handled event",
`original driver event: ${replayDriverEventId}`,
`stale sync cursor: ${staleCursor}`,
...buildMatrixReplyDetails("first reply", firstReply),
...buildMatrixReplyDetails(
"duplicate reply",
buildMatrixReplyArtifact(duplicate.event, replayToken),
),
].join("\n"),
);
}
advanceMatrixQaActorCursor({
actorId: "driver",
syncState: context.syncState,
nextSince: duplicate.since,
startSince: firstMatched.since ?? startSince,
startSince,
});
const postRestart = await runAssertedDriverTopLevelScenario({