From 33254ca6968b18bb17f4638502ee79f9a9e5b039 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 20 Apr 2026 20:18:07 +0100 Subject: [PATCH] test: share matrix restart replay helpers --- .../contract/scenario-runtime-restart.ts | 185 +++++++++--------- 1 file changed, 95 insertions(+), 90 deletions(-) diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts index 8e21afa97d3..16d03084eaf 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts @@ -23,6 +23,9 @@ import { } from "./scenario-runtime-state-files.js"; import type { MatrixQaScenarioExecution } from "./scenario-types.js"; +type MatrixQaDriverClient = Awaited>["client"]; +type MatrixReplyArtifact = ReturnType; + export async function runHomeserverRestartResumeScenario(context: MatrixQaScenarioContext) { if (!context.interruptTransport) { throw new Error("Matrix homeserver restart scenario requires a transport interruption hook"); @@ -191,72 +194,113 @@ export async function runInitialCatchupThenIncrementalScenario(context: MatrixQa } satisfies MatrixQaScenarioExecution; } -export async function runRestartReplayDedupeScenario(context: MatrixQaScenarioContext) { - if (!context.restartGateway) { - throw new Error("Matrix restart replay dedupe scenario requires a gateway restart callback"); - } - const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_RESTART_ROOM_KEY); - const { client, startSince } = await primeMatrixQaDriverScenarioClient(context); - const replayToken = buildMatrixQaToken("MATRIX_QA_REPLAY_DEDUPE"); - const replayBody = buildMentionPrompt(context.sutUserId, replayToken); +async function sendAndAssertRestartReplayReply(params: { + context: MatrixQaScenarioContext; + replyLabel: string; + roomId: string; + tokenPrefix: string; +}) { + const { client, startSince } = await primeMatrixQaDriverScenarioClient(params.context); + const replayToken = buildMatrixQaToken(params.tokenPrefix); + const replayBody = buildMentionPrompt(params.context.sutUserId, replayToken); const replayDriverEventId = await client.sendTextMessage({ body: replayBody, - mentionUserIds: [context.sutUserId], - roomId, + mentionUserIds: [params.context.sutUserId], + roomId: params.roomId, }); const firstMatched = await client.waitForRoomEvent({ - observedEvents: context.observedEvents, + observedEvents: params.context.observedEvents, predicate: (event) => isMatrixQaExactMarkerReply(event, { - roomId, - sutUserId: context.sutUserId, + roomId: params.roomId, + sutUserId: params.context.sutUserId, token: replayToken, }) && event.relatesTo === undefined, - roomId, + roomId: params.roomId, since: startSince, - timeoutMs: context.timeoutMs, + timeoutMs: params.context.timeoutMs, }); advanceMatrixQaActorCursor({ actorId: "driver", - syncState: context.syncState, + syncState: params.context.syncState, nextSince: firstMatched.since, startSince, }); const firstReply = buildMatrixReplyArtifact(firstMatched.event, replayToken); - assertTopLevelReplyArtifact("first replay-dedupe reply", firstReply); + assertTopLevelReplyArtifact(params.replyLabel, firstReply); - await context.restartGateway(); + return { client, firstMatched, firstReply, replayDriverEventId, replayToken, startSince }; +} - const duplicate = await client.waitForOptionalRoomEvent({ - observedEvents: context.observedEvents, +async function assertNoRestartReplayDuplicate(params: { + client: MatrixQaDriverClient; + context: MatrixQaScenarioContext; + errorDetails: string[]; + errorTitle: string; + firstMatchedSince: string | undefined; + firstReply: MatrixReplyArtifact; + replayToken: string; + roomId: string; + startSince: string; +}) { + const duplicate = await params.client.waitForOptionalRoomEvent({ + observedEvents: params.context.observedEvents, predicate: (event) => - event.eventId !== firstReply.eventId && + event.eventId !== params.firstReply.eventId && isMatrixQaExactMarkerReply(event, { - roomId, - sutUserId: context.sutUserId, - token: replayToken, + roomId: params.roomId, + sutUserId: params.context.sutUserId, + token: params.replayToken, }), - roomId, - timeoutMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs), + roomId: params.roomId, + timeoutMs: Math.min(NO_REPLY_WINDOW_MS, params.context.timeoutMs), }); if (duplicate.matched) { throw new Error( [ - "Matrix restart replayed an already handled event", - `original driver event: ${replayDriverEventId}`, - ...buildMatrixReplyDetails("first reply", firstReply), + params.errorTitle, + ...params.errorDetails, + ...buildMatrixReplyDetails("first reply", params.firstReply), ...buildMatrixReplyDetails( "duplicate reply", - buildMatrixReplyArtifact(duplicate.event, replayToken), + buildMatrixReplyArtifact(duplicate.event, params.replayToken), ), ].join("\n"), ); } advanceMatrixQaActorCursor({ actorId: "driver", - syncState: context.syncState, + syncState: params.context.syncState, nextSince: duplicate.since, - startSince: firstMatched.since ?? startSince, + startSince: params.firstMatchedSince ?? params.startSince, + }); +} + +export async function runRestartReplayDedupeScenario(context: MatrixQaScenarioContext) { + if (!context.restartGateway) { + throw new Error("Matrix restart replay dedupe scenario requires a gateway restart callback"); + } + const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_RESTART_ROOM_KEY); + const { client, firstMatched, firstReply, replayDriverEventId, replayToken, startSince } = + await sendAndAssertRestartReplayReply({ + context, + replyLabel: "first replay-dedupe reply", + roomId, + tokenPrefix: "MATRIX_QA_REPLAY_DEDUPE", + }); + + await context.restartGateway(); + + await assertNoRestartReplayDuplicate({ + client, + context, + errorDetails: [`original driver event: ${replayDriverEventId}`], + errorTitle: "Matrix restart replayed an already handled event", + firstMatchedSince: firstMatched.since, + firstReply, + replayToken, + roomId, + startSince, }); const postRestart = await runAssertedDriverTopLevelScenario({ @@ -308,34 +352,13 @@ export async function runStaleSyncReplayDedupeScenario(context: MatrixQaScenario }); const staleCursor = syncStore.cursor; - const { client, startSince } = await primeMatrixQaDriverScenarioClient(context); - const replayToken = buildMatrixQaToken("MATRIX_QA_STALE_SYNC_DEDUPE"); - const replayBody = buildMentionPrompt(context.sutUserId, replayToken); - const replayDriverEventId = await client.sendTextMessage({ - body: replayBody, - mentionUserIds: [context.sutUserId], - roomId, - }); - const firstMatched = await client.waitForRoomEvent({ - observedEvents: context.observedEvents, - predicate: (event) => - isMatrixQaExactMarkerReply(event, { - roomId, - sutUserId: context.sutUserId, - token: replayToken, - }) && event.relatesTo === undefined, - roomId, - since: startSince, - timeoutMs: context.timeoutMs, - }); - advanceMatrixQaActorCursor({ - actorId: "driver", - syncState: context.syncState, - nextSince: firstMatched.since, - startSince, - }); - const firstReply = buildMatrixReplyArtifact(firstMatched.event, replayToken); - assertTopLevelReplyArtifact("first stale-sync replay-dedupe reply", firstReply); + const { client, firstMatched, firstReply, replayDriverEventId, replayToken, startSince } = + await sendAndAssertRestartReplayReply({ + context, + replyLabel: "first stale-sync replay-dedupe reply", + roomId, + tokenPrefix: "MATRIX_QA_STALE_SYNC_DEDUPE", + }); await waitForMatrixInboundDedupeEntry({ context, @@ -352,37 +375,19 @@ export async function runStaleSyncReplayDedupeScenario(context: MatrixQaScenario }); }); - const duplicate = await client.waitForOptionalRoomEvent({ - observedEvents: context.observedEvents, - predicate: (event) => - event.eventId !== firstReply.eventId && - isMatrixQaExactMarkerReply(event, { - roomId, - sutUserId: context.sutUserId, - token: replayToken, - }), + await assertNoRestartReplayDuplicate({ + client, + context, + errorDetails: [ + `original driver event: ${replayDriverEventId}`, + `stale sync cursor: ${staleCursor}`, + ], + errorTitle: "Matrix stale sync cursor replayed an already handled event", + firstMatchedSince: firstMatched.since, + firstReply, + replayToken, roomId, - timeoutMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs), - }); - if (duplicate.matched) { - throw new Error( - [ - "Matrix stale sync cursor replayed an already handled event", - `original driver event: ${replayDriverEventId}`, - `stale sync cursor: ${staleCursor}`, - ...buildMatrixReplyDetails("first reply", firstReply), - ...buildMatrixReplyDetails( - "duplicate reply", - buildMatrixReplyArtifact(duplicate.event, replayToken), - ), - ].join("\n"), - ); - } - advanceMatrixQaActorCursor({ - actorId: "driver", - syncState: context.syncState, - nextSince: duplicate.since, - startSince: firstMatched.since ?? startSince, + startSince, }); const postRestart = await runAssertedDriverTopLevelScenario({