test(matrix): add stale sync replay dedupe scenario

This commit is contained in:
Gustavo Madeira Santana
2026-04-20 05:50:38 -04:00
parent 94e2bf258d
commit ea37a833dc
9 changed files with 900 additions and 8 deletions

View File

@@ -42,6 +42,14 @@ import type { QaTransportAdapter } from "./qa-transport.js";
export type { QaCliBackendAuthMode } from "./providers/env.js";
const QA_GATEWAY_CHILD_STARTUP_MAX_ATTEMPTS = 5;
export type QaGatewayChildStateMutationContext = {
configPath: string;
runtimeEnv: NodeJS.ProcessEnv;
stateDir: string;
tempRoot: string;
};
async function getFreePort() {
return await new Promise<number>((resolve, reject) => {
const server = net.createServer();
@@ -694,10 +702,95 @@ export async function startQaGatewayChild(params: {
if (!child || !cfg || !baseUrl || !wsUrl || !rpcClient || !env) {
throw new Error("qa gateway child failed to start");
}
const runningChild = child;
const runningRpcClient = rpcClient;
let activeChild = child;
let activeRpcClient = rpcClient;
const runningEnv = env;
const spawnReplacementGatewayChild = async () => {
const nextChild = spawn(
nodeExecPath,
[
distEntryPath,
"gateway",
"run",
"--port",
String(gatewayPort),
"--bind",
"loopback",
"--allow-unconfigured",
],
{
cwd: runtimeCwd,
env: runningEnv,
detached: process.platform !== "win32",
stdio: ["ignore", "pipe", "pipe"],
},
);
nextChild.stdout.on("data", (chunk) => {
const buffer = Buffer.from(chunk);
stdout.push(buffer);
stdoutLog.write(buffer);
});
nextChild.stderr.on("data", (chunk) => {
const buffer = Buffer.from(chunk);
stderr.push(buffer);
stderrLog.write(buffer);
});
try {
await waitForGatewayReady({
baseUrl,
logs,
child: nextChild,
timeoutMs: 120_000,
});
const nextRpcClient = await startQaGatewayRpcClient({
wsUrl,
token: gatewayToken,
logs,
});
try {
let rpcReady = false;
let lastRpcStartupError: unknown = null;
for (let rpcAttempt = 1; rpcAttempt <= 4; rpcAttempt += 1) {
try {
await nextRpcClient.request("config.get", {}, { timeoutMs: 10_000 });
rpcReady = true;
break;
} catch (error) {
lastRpcStartupError = error;
if (rpcAttempt >= 4 || !isRetryableRpcStartupError(error)) {
throw error;
}
await sleep(500 * rpcAttempt);
await waitForGatewayReady({
baseUrl,
logs,
child: nextChild,
timeoutMs: 15_000,
});
}
}
if (!rpcReady) {
throw lastRpcStartupError ?? new Error("qa gateway rpc client failed to start");
}
} catch (error) {
await nextRpcClient.stop().catch(() => {});
throw error;
}
return {
child: nextChild,
rpcClient: nextRpcClient,
};
} catch (error) {
await stopQaGatewayChildProcessTree(nextChild, {
gracefulTimeoutMs: 1_500,
forceTimeoutMs: 1_500,
});
throw error;
}
};
return {
cfg,
baseUrl,
@@ -710,10 +803,27 @@ export async function startQaGatewayChild(params: {
runtimeEnv: runningEnv,
logs,
async restart(signal: NodeJS.Signals = "SIGUSR1") {
if (!runningChild.pid) {
if (!activeChild.pid) {
throw new Error("qa gateway child has no pid");
}
process.kill(runningChild.pid, signal);
process.kill(activeChild.pid, signal);
},
async restartAfterStateMutation(
mutateState: (context: QaGatewayChildStateMutationContext) => Promise<void>,
) {
await activeRpcClient.stop().catch(() => {});
await stopQaGatewayChildProcessTree(activeChild);
await mutateState({
configPath,
runtimeEnv: runningEnv,
stateDir,
tempRoot,
});
const restarted = await spawnReplacementGatewayChild();
activeChild = restarted.child;
activeRpcClient = restarted.rpcClient;
child = activeChild;
rpcClient = activeRpcClient;
},
async call(
method: string,
@@ -724,7 +834,7 @@ export async function startQaGatewayChild(params: {
let lastDetails = "";
for (let attempt = 1; attempt <= 3; attempt += 1) {
try {
return await runningRpcClient.request(method, rpcParams, {
return await activeRpcClient.request(method, rpcParams, {
...opts,
timeoutMs,
});
@@ -737,7 +847,7 @@ export async function startQaGatewayChild(params: {
await waitForGatewayReady({
baseUrl,
logs,
child: runningChild,
child: activeChild,
timeoutMs: Math.max(10_000, timeoutMs),
});
}
@@ -745,8 +855,8 @@ export async function startQaGatewayChild(params: {
throw new Error(`${lastDetails}${formatQaGatewayLogsForError(logs())}`);
},
async stop(opts?: { keepTemp?: boolean; preserveToDir?: string }) {
await runningRpcClient.stop().catch(() => {});
await stopQaGatewayChildProcessTree(runningChild);
await activeRpcClient.stop().catch(() => {});
await stopQaGatewayChildProcessTree(activeChild);
await closeWriteStream(stdoutLog);
await closeWriteStream(stderrLog);
if (opts?.preserveToDir && !(opts?.keepTemp ?? keepTemp)) {

View File

@@ -42,7 +42,11 @@ type MatrixQaGatewayChild = {
params: Record<string, unknown>,
options?: { timeoutMs?: number },
): Promise<unknown>;
restartAfterStateMutation?: (
mutateState: (context: { stateDir: string }) => Promise<void>,
) => Promise<void>;
restart(): Promise<void>;
runtimeEnv?: NodeJS.ProcessEnv;
};
type MatrixQaLiveLaneGatewayHarness = {
@@ -653,6 +657,7 @@ export async function runMatrixQaLive(params: {
observerDeviceId: provisioning.observer.deviceId,
observerPassword: provisioning.observer.password,
observerUserId: provisioning.observer.userId,
gatewayStateDir: scenarioGateway.harness.gateway.runtimeEnv?.OPENCLAW_STATE_DIR,
outputDir,
restartGateway: async () => {
if (!gatewayHarness) {
@@ -669,6 +674,30 @@ export async function runMatrixQaLive(params: {
`gateway restart done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`,
);
},
restartGatewayAfterStateMutation: async (mutateState) => {
if (!gatewayHarness) {
throw new Error(
"Matrix persisted-state restart scenario requires a live gateway",
);
}
const restartAfterStateMutation =
scenarioGateway.harness.gateway.restartAfterStateMutation;
if (!restartAfterStateMutation) {
throw new Error(
"Matrix persisted-state restart scenario requires a hard restart callback",
);
}
writeMatrixQaProgress(`gateway hard restart start ${scenario.id}`);
const measuredRestart = await measureMatrixQaStep(async () => {
await restartAfterStateMutation(mutateState);
await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId);
});
gatewayRestartMs += measuredRestart.durationMs;
scenarioRestartGatewayMs += measuredRestart.durationMs;
writeMatrixQaProgress(
`gateway hard restart done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`,
);
},
restartGatewayWithQueuedMessage: async (queueMessage) => {
if (!gatewayHarness) {
throw new Error("Matrix restart catchup scenario requires a live gateway");

View File

@@ -41,6 +41,8 @@ export type MatrixQaScenarioId =
| "matrix-restart-resume"
| "matrix-post-restart-room-continue"
| "matrix-initial-catchup-then-incremental"
| "matrix-restart-replay-dedupe"
| "matrix-stale-sync-replay-dedupe"
| "matrix-room-membership-loss"
| "matrix-homeserver-restart-resume"
| "matrix-mention-gating"
@@ -433,6 +435,18 @@ export const MATRIX_QA_SCENARIOS: MatrixQaScenarioDefinition[] = [
title: "Matrix initial catchup is followed by incremental replies",
topology: MATRIX_QA_RESTART_ROOM_TOPOLOGY,
},
{
id: "matrix-restart-replay-dedupe",
timeoutMs: 90_000,
title: "Matrix restart does not redeliver a handled event",
topology: MATRIX_QA_RESTART_ROOM_TOPOLOGY,
},
{
id: "matrix-stale-sync-replay-dedupe",
timeoutMs: 90_000,
title: "Matrix stale sync replay is absorbed by inbound dedupe",
topology: MATRIX_QA_RESTART_ROOM_TOPOLOGY,
},
{
id: "matrix-room-membership-loss",
timeoutMs: 75_000,

View File

@@ -11,10 +11,16 @@ import {
isMatrixQaExactMarkerReply,
assertTopLevelReplyArtifact,
advanceMatrixQaActorCursor,
NO_REPLY_WINDOW_MS,
primeMatrixQaDriverScenarioClient,
runAssertedDriverTopLevelScenario,
type MatrixQaScenarioContext,
} from "./scenario-runtime-shared.js";
import {
rewriteMatrixSyncStoreCursor,
waitForMatrixInboundDedupeEntry,
waitForMatrixSyncStoreWithCursor,
} from "./scenario-runtime-state-files.js";
import type { MatrixQaScenarioExecution } from "./scenario-types.js";
export async function runHomeserverRestartResumeScenario(context: MatrixQaScenarioContext) {
@@ -184,3 +190,231 @@ export async function runInitialCatchupThenIncrementalScenario(context: MatrixQa
].join("\n"),
} satisfies MatrixQaScenarioExecution;
}
export async function runRestartReplayDedupeScenario(context: MatrixQaScenarioContext) {
if (!context.restartGateway) {
throw new Error("Matrix restart replay dedupe scenario requires a gateway restart callback");
}
const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_RESTART_ROOM_KEY);
const { client, startSince } = await primeMatrixQaDriverScenarioClient(context);
const replayToken = buildMatrixQaToken("MATRIX_QA_REPLAY_DEDUPE");
const replayBody = buildMentionPrompt(context.sutUserId, replayToken);
const replayDriverEventId = await client.sendTextMessage({
body: replayBody,
mentionUserIds: [context.sutUserId],
roomId,
});
const firstMatched = await client.waitForRoomEvent({
observedEvents: context.observedEvents,
predicate: (event) =>
isMatrixQaExactMarkerReply(event, {
roomId,
sutUserId: context.sutUserId,
token: replayToken,
}) && event.relatesTo === undefined,
roomId,
since: startSince,
timeoutMs: context.timeoutMs,
});
advanceMatrixQaActorCursor({
actorId: "driver",
syncState: context.syncState,
nextSince: firstMatched.since,
startSince,
});
const firstReply = buildMatrixReplyArtifact(firstMatched.event, replayToken);
assertTopLevelReplyArtifact("first replay-dedupe reply", firstReply);
await context.restartGateway();
const duplicate = await client.waitForOptionalRoomEvent({
observedEvents: context.observedEvents,
predicate: (event) =>
event.eventId !== firstReply.eventId &&
isMatrixQaExactMarkerReply(event, {
roomId,
sutUserId: context.sutUserId,
token: replayToken,
}),
roomId,
timeoutMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs),
});
if (duplicate.matched) {
throw new Error(
[
"Matrix restart replayed an already handled event",
`original driver event: ${replayDriverEventId}`,
...buildMatrixReplyDetails("first reply", firstReply),
...buildMatrixReplyDetails(
"duplicate reply",
buildMatrixReplyArtifact(duplicate.event, replayToken),
),
].join("\n"),
);
}
advanceMatrixQaActorCursor({
actorId: "driver",
syncState: context.syncState,
nextSince: duplicate.since,
startSince: firstMatched.since ?? startSince,
});
const postRestart = await runAssertedDriverTopLevelScenario({
context,
label: "fresh post-restart reply",
roomId,
tokenPrefix: "MATRIX_QA_REPLAY_DEDUPE_FRESH",
});
return {
artifacts: {
duplicateWindowMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs),
firstDriverEventId: replayDriverEventId,
firstReply,
firstToken: replayToken,
freshDriverEventId: postRestart.driverEventId,
freshReply: postRestart.reply,
freshToken: postRestart.token,
restartSignal: "SIGUSR1",
roomId,
},
details: [
`room id: ${roomId}`,
"restart signal: SIGUSR1",
`first driver event: ${replayDriverEventId}`,
...buildMatrixReplyDetails("first reply", firstReply),
`duplicate replay window: ${Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs)}ms`,
`fresh post-restart driver event: ${postRestart.driverEventId}`,
...buildMatrixReplyDetails("fresh reply", postRestart.reply),
].join("\n"),
} satisfies MatrixQaScenarioExecution;
}
export async function runStaleSyncReplayDedupeScenario(context: MatrixQaScenarioContext) {
if (!context.restartGatewayAfterStateMutation) {
throw new Error(
"Matrix stale sync replay dedupe scenario requires a persisted-state restart callback",
);
}
if (!context.gatewayStateDir) {
throw new Error("Matrix stale sync replay dedupe scenario requires a gateway state directory");
}
const stateDir = context.gatewayStateDir;
const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_RESTART_ROOM_KEY);
const syncStore = await waitForMatrixSyncStoreWithCursor({
context,
stateDir,
timeoutMs: Math.min(5_000, context.timeoutMs),
});
const staleCursor = syncStore.cursor;
const { client, startSince } = await primeMatrixQaDriverScenarioClient(context);
const replayToken = buildMatrixQaToken("MATRIX_QA_STALE_SYNC_DEDUPE");
const replayBody = buildMentionPrompt(context.sutUserId, replayToken);
const replayDriverEventId = await client.sendTextMessage({
body: replayBody,
mentionUserIds: [context.sutUserId],
roomId,
});
const firstMatched = await client.waitForRoomEvent({
observedEvents: context.observedEvents,
predicate: (event) =>
isMatrixQaExactMarkerReply(event, {
roomId,
sutUserId: context.sutUserId,
token: replayToken,
}) && event.relatesTo === undefined,
roomId,
since: startSince,
timeoutMs: context.timeoutMs,
});
advanceMatrixQaActorCursor({
actorId: "driver",
syncState: context.syncState,
nextSince: firstMatched.since,
startSince,
});
const firstReply = buildMatrixReplyArtifact(firstMatched.event, replayToken);
assertTopLevelReplyArtifact("first stale-sync replay-dedupe reply", firstReply);
await waitForMatrixInboundDedupeEntry({
context,
eventId: replayDriverEventId,
roomId,
stateDir,
timeoutMs: Math.min(5_000, context.timeoutMs),
});
await context.restartGatewayAfterStateMutation(async () => {
await rewriteMatrixSyncStoreCursor({
cursor: staleCursor,
pathname: syncStore.pathname,
});
});
const duplicate = await client.waitForOptionalRoomEvent({
observedEvents: context.observedEvents,
predicate: (event) =>
event.eventId !== firstReply.eventId &&
isMatrixQaExactMarkerReply(event, {
roomId,
sutUserId: context.sutUserId,
token: replayToken,
}),
roomId,
timeoutMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs),
});
if (duplicate.matched) {
throw new Error(
[
"Matrix stale sync cursor replayed an already handled event",
`original driver event: ${replayDriverEventId}`,
`stale sync cursor: ${staleCursor}`,
...buildMatrixReplyDetails("first reply", firstReply),
...buildMatrixReplyDetails(
"duplicate reply",
buildMatrixReplyArtifact(duplicate.event, replayToken),
),
].join("\n"),
);
}
advanceMatrixQaActorCursor({
actorId: "driver",
syncState: context.syncState,
nextSince: duplicate.since,
startSince: firstMatched.since ?? startSince,
});
const postRestart = await runAssertedDriverTopLevelScenario({
context,
label: "fresh post-stale-sync-restart reply",
roomId,
tokenPrefix: "MATRIX_QA_STALE_SYNC_DEDUPE_FRESH",
});
return {
artifacts: {
dedupeCommitObserved: true,
duplicateWindowMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs),
firstDriverEventId: replayDriverEventId,
firstReply,
firstToken: replayToken,
freshDriverEventId: postRestart.driverEventId,
freshReply: postRestart.reply,
freshToken: postRestart.token,
restartSignal: "hard-restart",
roomId,
staleSyncCursor: staleCursor,
},
details: [
`room id: ${roomId}`,
"restart signal: hard-restart",
`stale sync cursor: ${staleCursor}`,
`first driver event: ${replayDriverEventId}`,
...buildMatrixReplyDetails("first reply", firstReply),
`duplicate replay window: ${Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs)}ms`,
`fresh post-restart driver event: ${postRestart.driverEventId}`,
...buildMatrixReplyDetails("fresh reply", postRestart.reply),
].join("\n"),
} satisfies MatrixQaScenarioExecution;
}

View File

@@ -27,8 +27,12 @@ export type MatrixQaScenarioContext = {
observerDeviceId?: string;
observerPassword?: string;
observerUserId: string;
gatewayStateDir?: string;
outputDir?: string;
restartGateway?: () => Promise<void>;
restartGatewayAfterStateMutation?: (
mutateState: (context: { stateDir: string }) => Promise<void>,
) => Promise<void>;
restartGatewayWithQueuedMessage?: (queueMessage: () => Promise<void>) => Promise<void>;
roomId: string;
interruptTransport?: () => Promise<void>;

View File

@@ -0,0 +1,216 @@
import fs from "node:fs/promises";
import path from "node:path";
import { setTimeout as sleep } from "node:timers/promises";
import type { MatrixQaScenarioContext } from "./scenario-runtime-shared.js";
const MATRIX_SYNC_STORE_FILENAME = "bot-storage.json";
const MATRIX_INBOUND_DEDUPE_FILENAME = "inbound-dedupe.json";
const MATRIX_STATE_POLL_INTERVAL_MS = 100;
function isRecord(value: unknown): value is Record<string, unknown> {
return Boolean(value) && typeof value === "object" && !Array.isArray(value);
}
async function readJsonFile(pathname: string): Promise<unknown> {
return JSON.parse(await fs.readFile(pathname, "utf8")) as unknown;
}
async function writeJsonFile(pathname: string, value: unknown) {
await fs.writeFile(pathname, `${JSON.stringify(value, null, 2)}\n`, "utf8");
}
async function findFilesByName(params: {
filename: string;
rootDir: string;
maxDepth?: number;
}): Promise<string[]> {
const maxDepth = params.maxDepth ?? 8;
const matches: string[] = [];
async function visit(dir: string, depth: number): Promise<void> {
if (depth > maxDepth) {
return;
}
let entries: Array<{ isDirectory(): boolean; isFile(): boolean; name: string }>;
try {
entries = await fs.readdir(dir, { withFileTypes: true });
} catch {
return;
}
for (const entry of entries) {
const entryPath = path.join(dir, entry.name);
if (entry.isFile() && entry.name === params.filename) {
matches.push(entryPath);
continue;
}
if (entry.isDirectory()) {
await visit(entryPath, depth + 1);
}
}
}
await visit(params.rootDir, 0);
return matches.toSorted();
}
function readPersistedMatrixSyncCursor(parsed: unknown): string | null {
if (!isRecord(parsed)) {
return null;
}
const savedSync = parsed.savedSync;
if (isRecord(savedSync) && typeof savedSync.nextBatch === "string") {
return savedSync.nextBatch;
}
if (typeof parsed.next_batch === "string") {
return parsed.next_batch;
}
return null;
}
function writePersistedMatrixSyncCursor(parsed: unknown, cursor: string): unknown {
if (!isRecord(parsed)) {
throw new Error("Matrix sync store was not a JSON object");
}
const savedSync = parsed.savedSync;
if (isRecord(savedSync) && typeof savedSync.nextBatch === "string") {
return {
...parsed,
savedSync: {
...savedSync,
nextBatch: cursor,
},
};
}
if (typeof parsed.next_batch === "string") {
return {
...parsed,
next_batch: cursor,
};
}
throw new Error("Matrix sync store did not contain a persisted sync cursor");
}
async function readMatrixSyncStoreCursor(pathname: string): Promise<string | null> {
return readPersistedMatrixSyncCursor(await readJsonFile(pathname));
}
export async function rewriteMatrixSyncStoreCursor(params: { cursor: string; pathname: string }) {
const parsed = await readJsonFile(params.pathname);
await writeJsonFile(params.pathname, writePersistedMatrixSyncCursor(parsed, params.cursor));
}
async function scoreMatrixStateFile(params: {
context: MatrixQaScenarioContext;
pathname: string;
}) {
let score = params.pathname.includes(`${path.sep}matrix${path.sep}`) ? 4 : 0;
try {
const metadata = await readJsonFile(
path.join(path.dirname(params.pathname), "storage-meta.json"),
);
if (isRecord(metadata) && metadata.userId === params.context.sutUserId) {
score += 16;
}
if (isRecord(metadata) && metadata.accountId === params.context.sutAccountId) {
score += 8;
}
} catch {
// Missing metadata is allowed; the Matrix client may not have flushed it yet.
}
return score;
}
async function resolveBestMatrixStateFile(params: {
context: MatrixQaScenarioContext;
filename: string;
stateDir: string;
}) {
const candidates = await findFilesByName({
filename: params.filename,
rootDir: params.stateDir,
});
if (candidates.length === 0) {
return null;
}
const scored = await Promise.all(
candidates.map(async (pathname) => ({
pathname,
score: await scoreMatrixStateFile({
context: params.context,
pathname,
}),
})),
);
scored.sort((a, b) => b.score - a.score || a.pathname.localeCompare(b.pathname));
return scored[0]?.pathname ?? null;
}
export async function waitForMatrixSyncStoreWithCursor(params: {
context: MatrixQaScenarioContext;
stateDir: string;
timeoutMs: number;
}) {
const startedAt = Date.now();
let lastPath: string | null = null;
while (Date.now() - startedAt < params.timeoutMs) {
const pathname = await resolveBestMatrixStateFile({
context: params.context,
filename: MATRIX_SYNC_STORE_FILENAME,
stateDir: params.stateDir,
});
lastPath = pathname;
if (pathname) {
const cursor = await readMatrixSyncStoreCursor(pathname);
if (cursor) {
return { cursor, pathname };
}
}
await sleep(MATRIX_STATE_POLL_INTERVAL_MS);
}
throw new Error(
`timed out waiting for Matrix sync store cursor under ${params.stateDir}; last path ${lastPath ?? "<none>"}`,
);
}
function hasPersistedMatrixDedupeEntry(params: {
parsed: unknown;
roomId: string;
eventId: string;
}) {
if (!isRecord(params.parsed) || !Array.isArray(params.parsed.entries)) {
return false;
}
const expectedKey = `${params.roomId}|${params.eventId}`;
return params.parsed.entries.some((entry) => isRecord(entry) && entry.key === expectedKey);
}
export async function waitForMatrixInboundDedupeEntry(params: {
context: MatrixQaScenarioContext;
eventId: string;
roomId: string;
stateDir: string;
timeoutMs: number;
}) {
const startedAt = Date.now();
while (Date.now() - startedAt < params.timeoutMs) {
const pathname = await resolveBestMatrixStateFile({
context: params.context,
filename: MATRIX_INBOUND_DEDUPE_FILENAME,
stateDir: params.stateDir,
});
if (pathname) {
const parsed = await readJsonFile(pathname);
if (
hasPersistedMatrixDedupeEntry({
parsed,
roomId: params.roomId,
eventId: params.eventId,
})
) {
return pathname;
}
}
await sleep(MATRIX_STATE_POLL_INTERVAL_MS);
}
throw new Error(
`timed out waiting for Matrix inbound dedupe commit for ${params.roomId}|${params.eventId}`,
);
}

View File

@@ -43,7 +43,9 @@ import {
runHomeserverRestartResumeScenario,
runInitialCatchupThenIncrementalScenario,
runPostRestartRoomContinueScenario,
runRestartReplayDedupeScenario,
runRestartResumeScenario,
runStaleSyncReplayDedupeScenario,
} from "./scenario-runtime-restart.js";
import {
runAllowlistHotReloadScenario,
@@ -233,6 +235,10 @@ export async function runMatrixQaScenario(
return await runPostRestartRoomContinueScenario(context);
case "matrix-initial-catchup-then-incremental":
return await runInitialCatchupThenIncrementalScenario(context);
case "matrix-restart-replay-dedupe":
return await runRestartReplayDedupeScenario(context);
case "matrix-stale-sync-replay-dedupe":
return await runStaleSyncReplayDedupeScenario(context);
case "matrix-room-membership-loss":
return await runMembershipLossScenario(context);
case "matrix-homeserver-restart-resume":

View File

@@ -35,6 +35,8 @@ export type MatrixQaScenarioArtifacts = {
catchupDriverEventId?: string;
catchupReply?: MatrixQaReplyArtifact;
catchupToken?: string;
dedupeCommitObserved?: boolean;
duplicateWindowMs?: number;
driverEventId?: string;
editEventId?: string;
editedToken?: string;
@@ -42,6 +44,9 @@ export type MatrixQaScenarioArtifacts = {
firstDriverEventId?: string;
firstReply?: MatrixQaReplyArtifact;
firstToken?: string;
freshDriverEventId?: string;
freshReply?: MatrixQaReplyArtifact;
freshToken?: string;
incrementalDriverEventId?: string;
incrementalReply?: MatrixQaReplyArtifact;
incrementalToken?: string;
@@ -68,6 +73,7 @@ export type MatrixQaScenarioArtifacts = {
secondDriverEventId?: string;
secondReply?: MatrixQaReplyArtifact;
secondToken?: string;
staleSyncCursor?: string;
subagentCompletion?: MatrixQaReplyArtifact;
subagentIntro?: MatrixQaReplyArtifact;
threadDriverEventId?: string;

View File

@@ -1,3 +1,6 @@
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it, beforeEach, vi } from "vitest";
const { createMatrixQaClient } = vi.hoisted(() => ({
createMatrixQaClient: vi.fn(),
@@ -60,6 +63,27 @@ function matrixQaScenarioContext(): MatrixQaScenarioContext {
};
}
async function writeTestJsonFile(pathname: string, value: unknown) {
await writeFile(pathname, `${JSON.stringify(value, null, 2)}\n`);
}
function matrixSyncStoreFixture(nextBatch: string) {
return {
version: 1,
cleanShutdown: true,
savedSync: {
nextBatch,
accountData: [],
roomsData: {
join: {},
invite: {},
leave: {},
knock: {},
},
},
};
}
function matrixQaE2eeRoomKey(
scenarioId: Parameters<typeof scenarioTesting.buildMatrixQaE2eeScenarioRoomKey>[0],
) {
@@ -104,6 +128,8 @@ describe("matrix live qa scenarios", () => {
"matrix-restart-resume",
"matrix-post-restart-room-continue",
"matrix-initial-catchup-then-incremental",
"matrix-restart-replay-dedupe",
"matrix-stale-sync-replay-dedupe",
"matrix-room-membership-loss",
"matrix-homeserver-restart-resume",
"matrix-mention-gating",
@@ -777,6 +803,253 @@ describe("matrix live qa scenarios", () => {
]);
});
it("fails if a handled Matrix event is redelivered after gateway restart", async () => {
const callOrder: string[] = [];
const primeRoom = vi.fn().mockResolvedValue("driver-sync-start");
const sendTextMessage = vi.fn().mockImplementation(async (params) => {
const body = String(params.body);
const kind = body.includes("REPLAY_DEDUPE_FRESH") ? "fresh" : "first";
callOrder.push(`send:${kind}`);
return kind === "fresh" ? "$fresh-trigger" : "$first-trigger";
});
const waitForRoomEvent = vi.fn().mockImplementation(async () => {
const sentBody = String(sendTextMessage.mock.calls.at(-1)?.[0]?.body ?? "");
const token = sentBody.replace("@sut:matrix-qa.test reply with only this exact marker: ", "");
const kind = token.includes("REPLAY_DEDUPE_FRESH") ? "fresh" : "first";
callOrder.push(`wait:${kind}`);
return {
event: {
kind: "message",
roomId: "!restart:matrix-qa.test",
eventId: kind === "fresh" ? "$fresh-reply" : "$first-reply",
sender: "@sut:matrix-qa.test",
type: "m.room.message",
body: token,
},
since: kind === "fresh" ? "driver-sync-after-fresh" : "driver-sync-after-first",
};
});
const waitForOptionalRoomEvent = vi.fn().mockImplementation(async () => {
callOrder.push("wait:no-duplicate");
return {
matched: false,
since: "driver-sync-after-no-duplicate-window",
};
});
createMatrixQaClient.mockReturnValue({
primeRoom,
sendTextMessage,
waitForOptionalRoomEvent,
waitForRoomEvent,
});
const scenario = MATRIX_QA_SCENARIOS.find(
(entry) => entry.id === "matrix-restart-replay-dedupe",
);
expect(scenario).toBeDefined();
await expect(
runMatrixQaScenario(scenario!, {
...matrixQaScenarioContext(),
restartGateway: async () => {
callOrder.push("restart");
},
roomId: "!room:matrix-qa.test",
topology: {
defaultRoomId: "!room:matrix-qa.test",
defaultRoomKey: "main",
rooms: [
{
key: "restart",
kind: "group",
memberRoles: ["driver", "observer", "sut"],
memberUserIds: [
"@driver:matrix-qa.test",
"@observer:matrix-qa.test",
"@sut:matrix-qa.test",
],
name: "Restart room",
requireMention: true,
roomId: "!restart:matrix-qa.test",
},
],
},
}),
).resolves.toMatchObject({
artifacts: {
duplicateWindowMs: 8000,
firstDriverEventId: "$first-trigger",
firstReply: {
eventId: "$first-reply",
tokenMatched: true,
},
freshDriverEventId: "$fresh-trigger",
freshReply: {
eventId: "$fresh-reply",
tokenMatched: true,
},
},
});
expect(callOrder).toEqual([
"send:first",
"wait:first",
"restart",
"wait:no-duplicate",
"send:fresh",
"wait:fresh",
]);
expect(waitForOptionalRoomEvent).toHaveBeenCalledWith(
expect.objectContaining({
roomId: "!restart:matrix-qa.test",
timeoutMs: 8000,
}),
);
});
it("forces a stale persisted Matrix sync cursor and expects inbound dedupe to absorb replay", async () => {
const stateRoot = await mkdtemp(path.join(os.tmpdir(), "matrix-stale-sync-"));
try {
const accountDir = path.join(stateRoot, "matrix", "accounts", "sut", "server", "token");
const syncStorePath = path.join(accountDir, "bot-storage.json");
const dedupeStorePath = path.join(accountDir, "inbound-dedupe.json");
await mkdir(accountDir, { recursive: true });
await writeTestJsonFile(path.join(accountDir, "storage-meta.json"), {
accountId: "sut",
userId: "@sut:matrix-qa.test",
});
await writeTestJsonFile(syncStorePath, matrixSyncStoreFixture("driver-sync-start"));
const callOrder: string[] = [];
const primeRoom = vi.fn().mockResolvedValue("driver-sync-start");
const sendTextMessage = vi.fn().mockImplementation(async (params) => {
const body = String(params.body);
const kind = body.includes("STALE_SYNC_DEDUPE_FRESH") ? "fresh" : "first";
callOrder.push(`send:${kind}`);
return kind === "fresh" ? "$fresh-trigger" : "$first-trigger";
});
const waitForRoomEvent = vi.fn().mockImplementation(async () => {
const sentBody = String(sendTextMessage.mock.calls.at(-1)?.[0]?.body ?? "");
const token = sentBody.replace(
"@sut:matrix-qa.test reply with only this exact marker: ",
"",
);
const kind = token.includes("STALE_SYNC_DEDUPE_FRESH") ? "fresh" : "first";
callOrder.push(`wait:${kind}`);
if (kind === "first") {
await writeTestJsonFile(dedupeStorePath, {
version: 1,
entries: [
{
key: "!restart:matrix-qa.test|$first-trigger",
ts: Date.now(),
},
],
});
}
return {
event: {
kind: "message",
roomId: "!restart:matrix-qa.test",
eventId: kind === "fresh" ? "$fresh-reply" : "$first-reply",
sender: "@sut:matrix-qa.test",
type: "m.room.message",
body: token,
},
since: kind === "fresh" ? "driver-sync-after-fresh" : "driver-sync-after-first",
};
});
const waitForOptionalRoomEvent = vi.fn().mockImplementation(async () => {
callOrder.push("wait:no-duplicate");
return {
matched: false,
since: "driver-sync-after-no-duplicate-window",
};
});
createMatrixQaClient.mockReturnValue({
primeRoom,
sendTextMessage,
waitForOptionalRoomEvent,
waitForRoomEvent,
});
const scenario = MATRIX_QA_SCENARIOS.find(
(entry) => entry.id === "matrix-stale-sync-replay-dedupe",
);
expect(scenario).toBeDefined();
await expect(
runMatrixQaScenario(scenario!, {
...matrixQaScenarioContext(),
gatewayStateDir: stateRoot,
restartGatewayAfterStateMutation: async (mutateState) => {
callOrder.push("hard-restart");
await writeTestJsonFile(
syncStorePath,
matrixSyncStoreFixture("driver-sync-after-first"),
);
await mutateState({ stateDir: stateRoot });
const persisted = JSON.parse(await readFile(syncStorePath, "utf8")) as {
savedSync?: { nextBatch?: string };
};
expect(persisted.savedSync?.nextBatch).toBe("driver-sync-start");
},
roomId: "!room:matrix-qa.test",
sutAccountId: "sut",
topology: {
defaultRoomId: "!room:matrix-qa.test",
defaultRoomKey: "main",
rooms: [
{
key: "restart",
kind: "group",
memberRoles: ["driver", "observer", "sut"],
memberUserIds: [
"@driver:matrix-qa.test",
"@observer:matrix-qa.test",
"@sut:matrix-qa.test",
],
name: "Restart room",
requireMention: true,
roomId: "!restart:matrix-qa.test",
},
],
},
}),
).resolves.toMatchObject({
artifacts: {
dedupeCommitObserved: true,
duplicateWindowMs: 8000,
firstDriverEventId: "$first-trigger",
firstReply: {
eventId: "$first-reply",
tokenMatched: true,
},
freshDriverEventId: "$fresh-trigger",
freshReply: {
eventId: "$fresh-reply",
tokenMatched: true,
},
restartSignal: "hard-restart",
staleSyncCursor: "driver-sync-start",
},
});
expect(callOrder).toEqual([
"send:first",
"wait:first",
"hard-restart",
"wait:no-duplicate",
"send:fresh",
"wait:fresh",
]);
} finally {
await rm(stateRoot, { recursive: true, force: true });
}
});
it("runs the DM scenario against the provisioned DM room without a mention", async () => {
const primeRoom = vi.fn().mockResolvedValue("driver-sync-start");
const sendTextMessage = vi.fn().mockResolvedValue("$dm-trigger");