diff --git a/extensions/qa-lab/src/gateway-child.ts b/extensions/qa-lab/src/gateway-child.ts index 093674a17a4..a4e3ed4c586 100644 --- a/extensions/qa-lab/src/gateway-child.ts +++ b/extensions/qa-lab/src/gateway-child.ts @@ -42,6 +42,14 @@ import type { QaTransportAdapter } from "./qa-transport.js"; export type { QaCliBackendAuthMode } from "./providers/env.js"; const QA_GATEWAY_CHILD_STARTUP_MAX_ATTEMPTS = 5; + +export type QaGatewayChildStateMutationContext = { + configPath: string; + runtimeEnv: NodeJS.ProcessEnv; + stateDir: string; + tempRoot: string; +}; + async function getFreePort() { return await new Promise((resolve, reject) => { const server = net.createServer(); @@ -694,10 +702,95 @@ export async function startQaGatewayChild(params: { if (!child || !cfg || !baseUrl || !wsUrl || !rpcClient || !env) { throw new Error("qa gateway child failed to start"); } - const runningChild = child; - const runningRpcClient = rpcClient; + let activeChild = child; + let activeRpcClient = rpcClient; const runningEnv = env; + const spawnReplacementGatewayChild = async () => { + const nextChild = spawn( + nodeExecPath, + [ + distEntryPath, + "gateway", + "run", + "--port", + String(gatewayPort), + "--bind", + "loopback", + "--allow-unconfigured", + ], + { + cwd: runtimeCwd, + env: runningEnv, + detached: process.platform !== "win32", + stdio: ["ignore", "pipe", "pipe"], + }, + ); + nextChild.stdout.on("data", (chunk) => { + const buffer = Buffer.from(chunk); + stdout.push(buffer); + stdoutLog.write(buffer); + }); + nextChild.stderr.on("data", (chunk) => { + const buffer = Buffer.from(chunk); + stderr.push(buffer); + stderrLog.write(buffer); + }); + + try { + await waitForGatewayReady({ + baseUrl, + logs, + child: nextChild, + timeoutMs: 120_000, + }); + const nextRpcClient = await startQaGatewayRpcClient({ + wsUrl, + token: gatewayToken, + logs, + }); + try { + let rpcReady = false; + let lastRpcStartupError: unknown = null; + for (let rpcAttempt = 1; rpcAttempt <= 4; rpcAttempt += 1) { + try { + await nextRpcClient.request("config.get", {}, { timeoutMs: 10_000 }); + rpcReady = true; + break; + } catch (error) { + lastRpcStartupError = error; + if (rpcAttempt >= 4 || !isRetryableRpcStartupError(error)) { + throw error; + } + await sleep(500 * rpcAttempt); + await waitForGatewayReady({ + baseUrl, + logs, + child: nextChild, + timeoutMs: 15_000, + }); + } + } + if (!rpcReady) { + throw lastRpcStartupError ?? new Error("qa gateway rpc client failed to start"); + } + } catch (error) { + await nextRpcClient.stop().catch(() => {}); + throw error; + } + return { + child: nextChild, + rpcClient: nextRpcClient, + }; + } catch (error) { + await stopQaGatewayChildProcessTree(nextChild, { + gracefulTimeoutMs: 1_500, + forceTimeoutMs: 1_500, + }); + throw error; + } + }; + return { cfg, baseUrl, @@ -710,10 +803,27 @@ export async function startQaGatewayChild(params: { runtimeEnv: runningEnv, logs, async restart(signal: NodeJS.Signals = "SIGUSR1") { - if (!runningChild.pid) { + if (!activeChild.pid) { throw new Error("qa gateway child has no pid"); } - process.kill(runningChild.pid, signal); + process.kill(activeChild.pid, signal); + }, + async restartAfterStateMutation( + mutateState: (context: QaGatewayChildStateMutationContext) => Promise, + ) { + await activeRpcClient.stop().catch(() => {}); + await stopQaGatewayChildProcessTree(activeChild); + await mutateState({ + configPath, + runtimeEnv: runningEnv, + stateDir, + tempRoot, + }); + const restarted = await spawnReplacementGatewayChild(); + activeChild = restarted.child; + activeRpcClient = restarted.rpcClient; + child = activeChild; + rpcClient = activeRpcClient; }, async call( method: string, @@ -724,7 +834,7 @@ export async function startQaGatewayChild(params: { let lastDetails = ""; for (let attempt = 1; attempt <= 3; attempt += 1) { try { - return await runningRpcClient.request(method, rpcParams, { + return await activeRpcClient.request(method, rpcParams, { ...opts, timeoutMs, }); @@ -737,7 +847,7 @@ export async function startQaGatewayChild(params: { await waitForGatewayReady({ baseUrl, logs, - child: runningChild, + child: activeChild, timeoutMs: Math.max(10_000, timeoutMs), }); } @@ -745,8 +855,8 @@ export async function startQaGatewayChild(params: { throw new Error(`${lastDetails}${formatQaGatewayLogsForError(logs())}`); }, async stop(opts?: { keepTemp?: boolean; preserveToDir?: string }) { - await runningRpcClient.stop().catch(() => {}); - await stopQaGatewayChildProcessTree(runningChild); + await activeRpcClient.stop().catch(() => {}); + await stopQaGatewayChildProcessTree(activeChild); await closeWriteStream(stdoutLog); await closeWriteStream(stderrLog); if (opts?.preserveToDir && !(opts?.keepTemp ?? keepTemp)) { diff --git a/extensions/qa-matrix/src/runners/contract/runtime.ts b/extensions/qa-matrix/src/runners/contract/runtime.ts index 853810f0ce9..b378fa60daa 100644 --- a/extensions/qa-matrix/src/runners/contract/runtime.ts +++ b/extensions/qa-matrix/src/runners/contract/runtime.ts @@ -42,7 +42,11 @@ type MatrixQaGatewayChild = { params: Record, options?: { timeoutMs?: number }, ): Promise; + restartAfterStateMutation?: ( + mutateState: (context: { stateDir: string }) => Promise, + ) => Promise; restart(): Promise; + runtimeEnv?: NodeJS.ProcessEnv; }; type MatrixQaLiveLaneGatewayHarness = { @@ -653,6 +657,7 @@ export async function runMatrixQaLive(params: { observerDeviceId: provisioning.observer.deviceId, observerPassword: provisioning.observer.password, observerUserId: provisioning.observer.userId, + gatewayStateDir: scenarioGateway.harness.gateway.runtimeEnv?.OPENCLAW_STATE_DIR, outputDir, restartGateway: async () => { if (!gatewayHarness) { @@ -669,6 +674,30 @@ export async function runMatrixQaLive(params: { `gateway restart done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`, ); }, + restartGatewayAfterStateMutation: async (mutateState) => { + if (!gatewayHarness) { + throw new Error( + "Matrix persisted-state restart scenario requires a live gateway", + ); + } + const restartAfterStateMutation = + scenarioGateway.harness.gateway.restartAfterStateMutation; + if (!restartAfterStateMutation) { + throw new Error( + "Matrix persisted-state restart scenario requires a hard restart callback", + ); + } + writeMatrixQaProgress(`gateway hard restart start ${scenario.id}`); + const measuredRestart = await measureMatrixQaStep(async () => { + await restartAfterStateMutation(mutateState); + await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId); + }); + gatewayRestartMs += measuredRestart.durationMs; + scenarioRestartGatewayMs += measuredRestart.durationMs; + writeMatrixQaProgress( + `gateway hard restart done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`, + ); + }, restartGatewayWithQueuedMessage: async (queueMessage) => { if (!gatewayHarness) { throw new Error("Matrix restart catchup scenario requires a live gateway"); diff --git a/extensions/qa-matrix/src/runners/contract/scenario-catalog.ts b/extensions/qa-matrix/src/runners/contract/scenario-catalog.ts index 270b99797bc..88d3c9f4f78 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-catalog.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-catalog.ts @@ -41,6 +41,8 @@ export type MatrixQaScenarioId = | "matrix-restart-resume" | "matrix-post-restart-room-continue" | "matrix-initial-catchup-then-incremental" + | "matrix-restart-replay-dedupe" + | "matrix-stale-sync-replay-dedupe" | "matrix-room-membership-loss" | "matrix-homeserver-restart-resume" | "matrix-mention-gating" @@ -433,6 +435,18 @@ export const MATRIX_QA_SCENARIOS: MatrixQaScenarioDefinition[] = [ title: "Matrix initial catchup is followed by incremental replies", topology: MATRIX_QA_RESTART_ROOM_TOPOLOGY, }, + { + id: "matrix-restart-replay-dedupe", + timeoutMs: 90_000, + title: "Matrix restart does not redeliver a handled event", + topology: MATRIX_QA_RESTART_ROOM_TOPOLOGY, + }, + { + id: "matrix-stale-sync-replay-dedupe", + timeoutMs: 90_000, + title: "Matrix stale sync replay is absorbed by inbound dedupe", + topology: MATRIX_QA_RESTART_ROOM_TOPOLOGY, + }, { id: "matrix-room-membership-loss", timeoutMs: 75_000, diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts index c2f92c3a3d9..8e21afa97d3 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts @@ -11,10 +11,16 @@ import { isMatrixQaExactMarkerReply, assertTopLevelReplyArtifact, advanceMatrixQaActorCursor, + NO_REPLY_WINDOW_MS, primeMatrixQaDriverScenarioClient, runAssertedDriverTopLevelScenario, type MatrixQaScenarioContext, } from "./scenario-runtime-shared.js"; +import { + rewriteMatrixSyncStoreCursor, + waitForMatrixInboundDedupeEntry, + waitForMatrixSyncStoreWithCursor, +} from "./scenario-runtime-state-files.js"; import type { MatrixQaScenarioExecution } from "./scenario-types.js"; export async function runHomeserverRestartResumeScenario(context: MatrixQaScenarioContext) { @@ -184,3 +190,231 @@ export async function runInitialCatchupThenIncrementalScenario(context: MatrixQa ].join("\n"), } satisfies MatrixQaScenarioExecution; } + +export async function runRestartReplayDedupeScenario(context: MatrixQaScenarioContext) { + if (!context.restartGateway) { + throw new Error("Matrix restart replay dedupe scenario requires a gateway restart callback"); + } + const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_RESTART_ROOM_KEY); + const { client, startSince } = await primeMatrixQaDriverScenarioClient(context); + const replayToken = buildMatrixQaToken("MATRIX_QA_REPLAY_DEDUPE"); + const replayBody = buildMentionPrompt(context.sutUserId, replayToken); + const replayDriverEventId = await client.sendTextMessage({ + body: replayBody, + mentionUserIds: [context.sutUserId], + roomId, + }); + const firstMatched = await client.waitForRoomEvent({ + observedEvents: context.observedEvents, + predicate: (event) => + isMatrixQaExactMarkerReply(event, { + roomId, + sutUserId: context.sutUserId, + token: replayToken, + }) && event.relatesTo === undefined, + roomId, + since: startSince, + timeoutMs: context.timeoutMs, + }); + advanceMatrixQaActorCursor({ + actorId: "driver", + syncState: context.syncState, + nextSince: firstMatched.since, + startSince, + }); + const firstReply = buildMatrixReplyArtifact(firstMatched.event, replayToken); + assertTopLevelReplyArtifact("first replay-dedupe reply", firstReply); + + await context.restartGateway(); + + const duplicate = await client.waitForOptionalRoomEvent({ + observedEvents: context.observedEvents, + predicate: (event) => + event.eventId !== firstReply.eventId && + isMatrixQaExactMarkerReply(event, { + roomId, + sutUserId: context.sutUserId, + token: replayToken, + }), + roomId, + timeoutMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs), + }); + if (duplicate.matched) { + throw new Error( + [ + "Matrix restart replayed an already handled event", + `original driver event: ${replayDriverEventId}`, + ...buildMatrixReplyDetails("first reply", firstReply), + ...buildMatrixReplyDetails( + "duplicate reply", + buildMatrixReplyArtifact(duplicate.event, replayToken), + ), + ].join("\n"), + ); + } + advanceMatrixQaActorCursor({ + actorId: "driver", + syncState: context.syncState, + nextSince: duplicate.since, + startSince: firstMatched.since ?? startSince, + }); + + const postRestart = await runAssertedDriverTopLevelScenario({ + context, + label: "fresh post-restart reply", + roomId, + tokenPrefix: "MATRIX_QA_REPLAY_DEDUPE_FRESH", + }); + + return { + artifacts: { + duplicateWindowMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs), + firstDriverEventId: replayDriverEventId, + firstReply, + firstToken: replayToken, + freshDriverEventId: postRestart.driverEventId, + freshReply: postRestart.reply, + freshToken: postRestart.token, + restartSignal: "SIGUSR1", + roomId, + }, + details: [ + `room id: ${roomId}`, + "restart signal: SIGUSR1", + `first driver event: ${replayDriverEventId}`, + ...buildMatrixReplyDetails("first reply", firstReply), + `duplicate replay window: ${Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs)}ms`, + `fresh post-restart driver event: ${postRestart.driverEventId}`, + ...buildMatrixReplyDetails("fresh reply", postRestart.reply), + ].join("\n"), + } satisfies MatrixQaScenarioExecution; +} + +export async function runStaleSyncReplayDedupeScenario(context: MatrixQaScenarioContext) { + if (!context.restartGatewayAfterStateMutation) { + throw new Error( + "Matrix stale sync replay dedupe scenario requires a persisted-state restart callback", + ); + } + if (!context.gatewayStateDir) { + throw new Error("Matrix stale sync replay dedupe scenario requires a gateway state directory"); + } + const stateDir = context.gatewayStateDir; + const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_RESTART_ROOM_KEY); + const syncStore = await waitForMatrixSyncStoreWithCursor({ + context, + stateDir, + timeoutMs: Math.min(5_000, context.timeoutMs), + }); + const staleCursor = syncStore.cursor; + + const { client, startSince } = await primeMatrixQaDriverScenarioClient(context); + const replayToken = buildMatrixQaToken("MATRIX_QA_STALE_SYNC_DEDUPE"); + const replayBody = buildMentionPrompt(context.sutUserId, replayToken); + const replayDriverEventId = await client.sendTextMessage({ + body: replayBody, + mentionUserIds: [context.sutUserId], + roomId, + }); + const firstMatched = await client.waitForRoomEvent({ + observedEvents: context.observedEvents, + predicate: (event) => + isMatrixQaExactMarkerReply(event, { + roomId, + sutUserId: context.sutUserId, + token: replayToken, + }) && event.relatesTo === undefined, + roomId, + since: startSince, + timeoutMs: context.timeoutMs, + }); + advanceMatrixQaActorCursor({ + actorId: "driver", + syncState: context.syncState, + nextSince: firstMatched.since, + startSince, + }); + const firstReply = buildMatrixReplyArtifact(firstMatched.event, replayToken); + assertTopLevelReplyArtifact("first stale-sync replay-dedupe reply", firstReply); + + await waitForMatrixInboundDedupeEntry({ + context, + eventId: replayDriverEventId, + roomId, + stateDir, + timeoutMs: Math.min(5_000, context.timeoutMs), + }); + + await context.restartGatewayAfterStateMutation(async () => { + await rewriteMatrixSyncStoreCursor({ + cursor: staleCursor, + pathname: syncStore.pathname, + }); + }); + + const duplicate = await client.waitForOptionalRoomEvent({ + observedEvents: context.observedEvents, + predicate: (event) => + event.eventId !== firstReply.eventId && + isMatrixQaExactMarkerReply(event, { + roomId, + sutUserId: context.sutUserId, + token: replayToken, + }), + roomId, + timeoutMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs), + }); + if (duplicate.matched) { + throw new Error( + [ + "Matrix stale sync cursor replayed an already handled event", + `original driver event: ${replayDriverEventId}`, + `stale sync cursor: ${staleCursor}`, + ...buildMatrixReplyDetails("first reply", firstReply), + ...buildMatrixReplyDetails( + "duplicate reply", + buildMatrixReplyArtifact(duplicate.event, replayToken), + ), + ].join("\n"), + ); + } + advanceMatrixQaActorCursor({ + actorId: "driver", + syncState: context.syncState, + nextSince: duplicate.since, + startSince: firstMatched.since ?? startSince, + }); + + const postRestart = await runAssertedDriverTopLevelScenario({ + context, + label: "fresh post-stale-sync-restart reply", + roomId, + tokenPrefix: "MATRIX_QA_STALE_SYNC_DEDUPE_FRESH", + }); + + return { + artifacts: { + dedupeCommitObserved: true, + duplicateWindowMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs), + firstDriverEventId: replayDriverEventId, + firstReply, + firstToken: replayToken, + freshDriverEventId: postRestart.driverEventId, + freshReply: postRestart.reply, + freshToken: postRestart.token, + restartSignal: "hard-restart", + roomId, + staleSyncCursor: staleCursor, + }, + details: [ + `room id: ${roomId}`, + "restart signal: hard-restart", + `stale sync cursor: ${staleCursor}`, + `first driver event: ${replayDriverEventId}`, + ...buildMatrixReplyDetails("first reply", firstReply), + `duplicate replay window: ${Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs)}ms`, + `fresh post-restart driver event: ${postRestart.driverEventId}`, + ...buildMatrixReplyDetails("fresh reply", postRestart.reply), + ].join("\n"), + } satisfies MatrixQaScenarioExecution; +} diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime-shared.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime-shared.ts index e9bb84997bb..ec89f2b06d4 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-runtime-shared.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime-shared.ts @@ -27,8 +27,12 @@ export type MatrixQaScenarioContext = { observerDeviceId?: string; observerPassword?: string; observerUserId: string; + gatewayStateDir?: string; outputDir?: string; restartGateway?: () => Promise; + restartGatewayAfterStateMutation?: ( + mutateState: (context: { stateDir: string }) => Promise, + ) => Promise; restartGatewayWithQueuedMessage?: (queueMessage: () => Promise) => Promise; roomId: string; interruptTransport?: () => Promise; diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime-state-files.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime-state-files.ts new file mode 100644 index 00000000000..34e6d0da834 --- /dev/null +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime-state-files.ts @@ -0,0 +1,216 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { setTimeout as sleep } from "node:timers/promises"; +import type { MatrixQaScenarioContext } from "./scenario-runtime-shared.js"; + +const MATRIX_SYNC_STORE_FILENAME = "bot-storage.json"; +const MATRIX_INBOUND_DEDUPE_FILENAME = "inbound-dedupe.json"; +const MATRIX_STATE_POLL_INTERVAL_MS = 100; + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === "object" && !Array.isArray(value); +} + +async function readJsonFile(pathname: string): Promise { + return JSON.parse(await fs.readFile(pathname, "utf8")) as unknown; +} + +async function writeJsonFile(pathname: string, value: unknown) { + await fs.writeFile(pathname, `${JSON.stringify(value, null, 2)}\n`, "utf8"); +} + +async function findFilesByName(params: { + filename: string; + rootDir: string; + maxDepth?: number; +}): Promise { + const maxDepth = params.maxDepth ?? 8; + const matches: string[] = []; + async function visit(dir: string, depth: number): Promise { + if (depth > maxDepth) { + return; + } + let entries: Array<{ isDirectory(): boolean; isFile(): boolean; name: string }>; + try { + entries = await fs.readdir(dir, { withFileTypes: true }); + } catch { + return; + } + for (const entry of entries) { + const entryPath = path.join(dir, entry.name); + if (entry.isFile() && entry.name === params.filename) { + matches.push(entryPath); + continue; + } + if (entry.isDirectory()) { + await visit(entryPath, depth + 1); + } + } + } + await visit(params.rootDir, 0); + return matches.toSorted(); +} + +function readPersistedMatrixSyncCursor(parsed: unknown): string | null { + if (!isRecord(parsed)) { + return null; + } + const savedSync = parsed.savedSync; + if (isRecord(savedSync) && typeof savedSync.nextBatch === "string") { + return savedSync.nextBatch; + } + if (typeof parsed.next_batch === "string") { + return parsed.next_batch; + } + return null; +} + +function writePersistedMatrixSyncCursor(parsed: unknown, cursor: string): unknown { + if (!isRecord(parsed)) { + throw new Error("Matrix sync store was not a JSON object"); + } + const savedSync = parsed.savedSync; + if (isRecord(savedSync) && typeof savedSync.nextBatch === "string") { + return { + ...parsed, + savedSync: { + ...savedSync, + nextBatch: cursor, + }, + }; + } + if (typeof parsed.next_batch === "string") { + return { + ...parsed, + next_batch: cursor, + }; + } + throw new Error("Matrix sync store did not contain a persisted sync cursor"); +} + +async function readMatrixSyncStoreCursor(pathname: string): Promise { + return readPersistedMatrixSyncCursor(await readJsonFile(pathname)); +} + +export async function rewriteMatrixSyncStoreCursor(params: { cursor: string; pathname: string }) { + const parsed = await readJsonFile(params.pathname); + await writeJsonFile(params.pathname, writePersistedMatrixSyncCursor(parsed, params.cursor)); +} + +async function scoreMatrixStateFile(params: { + context: MatrixQaScenarioContext; + pathname: string; +}) { + let score = params.pathname.includes(`${path.sep}matrix${path.sep}`) ? 4 : 0; + try { + const metadata = await readJsonFile( + path.join(path.dirname(params.pathname), "storage-meta.json"), + ); + if (isRecord(metadata) && metadata.userId === params.context.sutUserId) { + score += 16; + } + if (isRecord(metadata) && metadata.accountId === params.context.sutAccountId) { + score += 8; + } + } catch { + // Missing metadata is allowed; the Matrix client may not have flushed it yet. + } + return score; +} + +async function resolveBestMatrixStateFile(params: { + context: MatrixQaScenarioContext; + filename: string; + stateDir: string; +}) { + const candidates = await findFilesByName({ + filename: params.filename, + rootDir: params.stateDir, + }); + if (candidates.length === 0) { + return null; + } + const scored = await Promise.all( + candidates.map(async (pathname) => ({ + pathname, + score: await scoreMatrixStateFile({ + context: params.context, + pathname, + }), + })), + ); + scored.sort((a, b) => b.score - a.score || a.pathname.localeCompare(b.pathname)); + return scored[0]?.pathname ?? null; +} + +export async function waitForMatrixSyncStoreWithCursor(params: { + context: MatrixQaScenarioContext; + stateDir: string; + timeoutMs: number; +}) { + const startedAt = Date.now(); + let lastPath: string | null = null; + while (Date.now() - startedAt < params.timeoutMs) { + const pathname = await resolveBestMatrixStateFile({ + context: params.context, + filename: MATRIX_SYNC_STORE_FILENAME, + stateDir: params.stateDir, + }); + lastPath = pathname; + if (pathname) { + const cursor = await readMatrixSyncStoreCursor(pathname); + if (cursor) { + return { cursor, pathname }; + } + } + await sleep(MATRIX_STATE_POLL_INTERVAL_MS); + } + throw new Error( + `timed out waiting for Matrix sync store cursor under ${params.stateDir}; last path ${lastPath ?? ""}`, + ); +} + +function hasPersistedMatrixDedupeEntry(params: { + parsed: unknown; + roomId: string; + eventId: string; +}) { + if (!isRecord(params.parsed) || !Array.isArray(params.parsed.entries)) { + return false; + } + const expectedKey = `${params.roomId}|${params.eventId}`; + return params.parsed.entries.some((entry) => isRecord(entry) && entry.key === expectedKey); +} + +export async function waitForMatrixInboundDedupeEntry(params: { + context: MatrixQaScenarioContext; + eventId: string; + roomId: string; + stateDir: string; + timeoutMs: number; +}) { + const startedAt = Date.now(); + while (Date.now() - startedAt < params.timeoutMs) { + const pathname = await resolveBestMatrixStateFile({ + context: params.context, + filename: MATRIX_INBOUND_DEDUPE_FILENAME, + stateDir: params.stateDir, + }); + if (pathname) { + const parsed = await readJsonFile(pathname); + if ( + hasPersistedMatrixDedupeEntry({ + parsed, + roomId: params.roomId, + eventId: params.eventId, + }) + ) { + return pathname; + } + } + await sleep(MATRIX_STATE_POLL_INTERVAL_MS); + } + throw new Error( + `timed out waiting for Matrix inbound dedupe commit for ${params.roomId}|${params.eventId}`, + ); +} diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime.ts index 34b64cdceb9..a55271c5544 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-runtime.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime.ts @@ -43,7 +43,9 @@ import { runHomeserverRestartResumeScenario, runInitialCatchupThenIncrementalScenario, runPostRestartRoomContinueScenario, + runRestartReplayDedupeScenario, runRestartResumeScenario, + runStaleSyncReplayDedupeScenario, } from "./scenario-runtime-restart.js"; import { runAllowlistHotReloadScenario, @@ -233,6 +235,10 @@ export async function runMatrixQaScenario( return await runPostRestartRoomContinueScenario(context); case "matrix-initial-catchup-then-incremental": return await runInitialCatchupThenIncrementalScenario(context); + case "matrix-restart-replay-dedupe": + return await runRestartReplayDedupeScenario(context); + case "matrix-stale-sync-replay-dedupe": + return await runStaleSyncReplayDedupeScenario(context); case "matrix-room-membership-loss": return await runMembershipLossScenario(context); case "matrix-homeserver-restart-resume": diff --git a/extensions/qa-matrix/src/runners/contract/scenario-types.ts b/extensions/qa-matrix/src/runners/contract/scenario-types.ts index 03c859ee918..102a4434cd4 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-types.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-types.ts @@ -35,6 +35,8 @@ export type MatrixQaScenarioArtifacts = { catchupDriverEventId?: string; catchupReply?: MatrixQaReplyArtifact; catchupToken?: string; + dedupeCommitObserved?: boolean; + duplicateWindowMs?: number; driverEventId?: string; editEventId?: string; editedToken?: string; @@ -42,6 +44,9 @@ export type MatrixQaScenarioArtifacts = { firstDriverEventId?: string; firstReply?: MatrixQaReplyArtifact; firstToken?: string; + freshDriverEventId?: string; + freshReply?: MatrixQaReplyArtifact; + freshToken?: string; incrementalDriverEventId?: string; incrementalReply?: MatrixQaReplyArtifact; incrementalToken?: string; @@ -68,6 +73,7 @@ export type MatrixQaScenarioArtifacts = { secondDriverEventId?: string; secondReply?: MatrixQaReplyArtifact; secondToken?: string; + staleSyncCursor?: string; subagentCompletion?: MatrixQaReplyArtifact; subagentIntro?: MatrixQaReplyArtifact; threadDriverEventId?: string; diff --git a/extensions/qa-matrix/src/runners/contract/scenarios.test.ts b/extensions/qa-matrix/src/runners/contract/scenarios.test.ts index c91cc6a38c2..db4afe9bb3b 100644 --- a/extensions/qa-matrix/src/runners/contract/scenarios.test.ts +++ b/extensions/qa-matrix/src/runners/contract/scenarios.test.ts @@ -1,3 +1,6 @@ +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; import { describe, expect, it, beforeEach, vi } from "vitest"; const { createMatrixQaClient } = vi.hoisted(() => ({ createMatrixQaClient: vi.fn(), @@ -60,6 +63,27 @@ function matrixQaScenarioContext(): MatrixQaScenarioContext { }; } +async function writeTestJsonFile(pathname: string, value: unknown) { + await writeFile(pathname, `${JSON.stringify(value, null, 2)}\n`); +} + +function matrixSyncStoreFixture(nextBatch: string) { + return { + version: 1, + cleanShutdown: true, + savedSync: { + nextBatch, + accountData: [], + roomsData: { + join: {}, + invite: {}, + leave: {}, + knock: {}, + }, + }, + }; +} + function matrixQaE2eeRoomKey( scenarioId: Parameters[0], ) { @@ -104,6 +128,8 @@ describe("matrix live qa scenarios", () => { "matrix-restart-resume", "matrix-post-restart-room-continue", "matrix-initial-catchup-then-incremental", + "matrix-restart-replay-dedupe", + "matrix-stale-sync-replay-dedupe", "matrix-room-membership-loss", "matrix-homeserver-restart-resume", "matrix-mention-gating", @@ -777,6 +803,253 @@ describe("matrix live qa scenarios", () => { ]); }); + it("fails if a handled Matrix event is redelivered after gateway restart", async () => { + const callOrder: string[] = []; + const primeRoom = vi.fn().mockResolvedValue("driver-sync-start"); + const sendTextMessage = vi.fn().mockImplementation(async (params) => { + const body = String(params.body); + const kind = body.includes("REPLAY_DEDUPE_FRESH") ? "fresh" : "first"; + callOrder.push(`send:${kind}`); + return kind === "fresh" ? "$fresh-trigger" : "$first-trigger"; + }); + const waitForRoomEvent = vi.fn().mockImplementation(async () => { + const sentBody = String(sendTextMessage.mock.calls.at(-1)?.[0]?.body ?? ""); + const token = sentBody.replace("@sut:matrix-qa.test reply with only this exact marker: ", ""); + const kind = token.includes("REPLAY_DEDUPE_FRESH") ? "fresh" : "first"; + callOrder.push(`wait:${kind}`); + return { + event: { + kind: "message", + roomId: "!restart:matrix-qa.test", + eventId: kind === "fresh" ? "$fresh-reply" : "$first-reply", + sender: "@sut:matrix-qa.test", + type: "m.room.message", + body: token, + }, + since: kind === "fresh" ? "driver-sync-after-fresh" : "driver-sync-after-first", + }; + }); + const waitForOptionalRoomEvent = vi.fn().mockImplementation(async () => { + callOrder.push("wait:no-duplicate"); + return { + matched: false, + since: "driver-sync-after-no-duplicate-window", + }; + }); + + createMatrixQaClient.mockReturnValue({ + primeRoom, + sendTextMessage, + waitForOptionalRoomEvent, + waitForRoomEvent, + }); + + const scenario = MATRIX_QA_SCENARIOS.find( + (entry) => entry.id === "matrix-restart-replay-dedupe", + ); + expect(scenario).toBeDefined(); + + await expect( + runMatrixQaScenario(scenario!, { + ...matrixQaScenarioContext(), + restartGateway: async () => { + callOrder.push("restart"); + }, + roomId: "!room:matrix-qa.test", + topology: { + defaultRoomId: "!room:matrix-qa.test", + defaultRoomKey: "main", + rooms: [ + { + key: "restart", + kind: "group", + memberRoles: ["driver", "observer", "sut"], + memberUserIds: [ + "@driver:matrix-qa.test", + "@observer:matrix-qa.test", + "@sut:matrix-qa.test", + ], + name: "Restart room", + requireMention: true, + roomId: "!restart:matrix-qa.test", + }, + ], + }, + }), + ).resolves.toMatchObject({ + artifacts: { + duplicateWindowMs: 8000, + firstDriverEventId: "$first-trigger", + firstReply: { + eventId: "$first-reply", + tokenMatched: true, + }, + freshDriverEventId: "$fresh-trigger", + freshReply: { + eventId: "$fresh-reply", + tokenMatched: true, + }, + }, + }); + + expect(callOrder).toEqual([ + "send:first", + "wait:first", + "restart", + "wait:no-duplicate", + "send:fresh", + "wait:fresh", + ]); + expect(waitForOptionalRoomEvent).toHaveBeenCalledWith( + expect.objectContaining({ + roomId: "!restart:matrix-qa.test", + timeoutMs: 8000, + }), + ); + }); + + it("forces a stale persisted Matrix sync cursor and expects inbound dedupe to absorb replay", async () => { + const stateRoot = await mkdtemp(path.join(os.tmpdir(), "matrix-stale-sync-")); + try { + const accountDir = path.join(stateRoot, "matrix", "accounts", "sut", "server", "token"); + const syncStorePath = path.join(accountDir, "bot-storage.json"); + const dedupeStorePath = path.join(accountDir, "inbound-dedupe.json"); + await mkdir(accountDir, { recursive: true }); + await writeTestJsonFile(path.join(accountDir, "storage-meta.json"), { + accountId: "sut", + userId: "@sut:matrix-qa.test", + }); + await writeTestJsonFile(syncStorePath, matrixSyncStoreFixture("driver-sync-start")); + + const callOrder: string[] = []; + const primeRoom = vi.fn().mockResolvedValue("driver-sync-start"); + const sendTextMessage = vi.fn().mockImplementation(async (params) => { + const body = String(params.body); + const kind = body.includes("STALE_SYNC_DEDUPE_FRESH") ? "fresh" : "first"; + callOrder.push(`send:${kind}`); + return kind === "fresh" ? "$fresh-trigger" : "$first-trigger"; + }); + const waitForRoomEvent = vi.fn().mockImplementation(async () => { + const sentBody = String(sendTextMessage.mock.calls.at(-1)?.[0]?.body ?? ""); + const token = sentBody.replace( + "@sut:matrix-qa.test reply with only this exact marker: ", + "", + ); + const kind = token.includes("STALE_SYNC_DEDUPE_FRESH") ? "fresh" : "first"; + callOrder.push(`wait:${kind}`); + if (kind === "first") { + await writeTestJsonFile(dedupeStorePath, { + version: 1, + entries: [ + { + key: "!restart:matrix-qa.test|$first-trigger", + ts: Date.now(), + }, + ], + }); + } + return { + event: { + kind: "message", + roomId: "!restart:matrix-qa.test", + eventId: kind === "fresh" ? "$fresh-reply" : "$first-reply", + sender: "@sut:matrix-qa.test", + type: "m.room.message", + body: token, + }, + since: kind === "fresh" ? "driver-sync-after-fresh" : "driver-sync-after-first", + }; + }); + const waitForOptionalRoomEvent = vi.fn().mockImplementation(async () => { + callOrder.push("wait:no-duplicate"); + return { + matched: false, + since: "driver-sync-after-no-duplicate-window", + }; + }); + + createMatrixQaClient.mockReturnValue({ + primeRoom, + sendTextMessage, + waitForOptionalRoomEvent, + waitForRoomEvent, + }); + + const scenario = MATRIX_QA_SCENARIOS.find( + (entry) => entry.id === "matrix-stale-sync-replay-dedupe", + ); + expect(scenario).toBeDefined(); + + await expect( + runMatrixQaScenario(scenario!, { + ...matrixQaScenarioContext(), + gatewayStateDir: stateRoot, + restartGatewayAfterStateMutation: async (mutateState) => { + callOrder.push("hard-restart"); + await writeTestJsonFile( + syncStorePath, + matrixSyncStoreFixture("driver-sync-after-first"), + ); + await mutateState({ stateDir: stateRoot }); + const persisted = JSON.parse(await readFile(syncStorePath, "utf8")) as { + savedSync?: { nextBatch?: string }; + }; + expect(persisted.savedSync?.nextBatch).toBe("driver-sync-start"); + }, + roomId: "!room:matrix-qa.test", + sutAccountId: "sut", + topology: { + defaultRoomId: "!room:matrix-qa.test", + defaultRoomKey: "main", + rooms: [ + { + key: "restart", + kind: "group", + memberRoles: ["driver", "observer", "sut"], + memberUserIds: [ + "@driver:matrix-qa.test", + "@observer:matrix-qa.test", + "@sut:matrix-qa.test", + ], + name: "Restart room", + requireMention: true, + roomId: "!restart:matrix-qa.test", + }, + ], + }, + }), + ).resolves.toMatchObject({ + artifacts: { + dedupeCommitObserved: true, + duplicateWindowMs: 8000, + firstDriverEventId: "$first-trigger", + firstReply: { + eventId: "$first-reply", + tokenMatched: true, + }, + freshDriverEventId: "$fresh-trigger", + freshReply: { + eventId: "$fresh-reply", + tokenMatched: true, + }, + restartSignal: "hard-restart", + staleSyncCursor: "driver-sync-start", + }, + }); + + expect(callOrder).toEqual([ + "send:first", + "wait:first", + "hard-restart", + "wait:no-duplicate", + "send:fresh", + "wait:fresh", + ]); + } finally { + await rm(stateRoot, { recursive: true, force: true }); + } + }); + it("runs the DM scenario against the provisioned DM room without a mention", async () => { const primeRoom = vi.fn().mockResolvedValue("driver-sync-start"); const sendTextMessage = vi.fn().mockResolvedValue("$dm-trigger");