From 874eebe539cc5b76450e8060b6eded24f0566f16 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Tue, 14 Apr 2026 20:05:26 -0400 Subject: [PATCH] QA: extract Matrix event modules --- .../qa-matrix/src/runners/contract/runtime.ts | 34 +- .../src/runners/contract/scenarios.ts | 3 +- .../qa-matrix/src/substrate/artifacts.ts | 25 ++ extensions/qa-matrix/src/substrate/client.ts | 293 +++--------------- extensions/qa-matrix/src/substrate/events.ts | 145 +++++++++ extensions/qa-matrix/src/substrate/request.ts | 55 ++++ extensions/qa-matrix/src/substrate/sync.ts | 107 +++++++ 7 files changed, 373 insertions(+), 289 deletions(-) create mode 100644 extensions/qa-matrix/src/substrate/artifacts.ts create mode 100644 extensions/qa-matrix/src/substrate/events.ts create mode 100644 extensions/qa-matrix/src/substrate/request.ts create mode 100644 extensions/qa-matrix/src/substrate/sync.ts diff --git a/extensions/qa-matrix/src/runners/contract/runtime.ts b/extensions/qa-matrix/src/runners/contract/runtime.ts index 14d41eb8664..da9d560bec8 100644 --- a/extensions/qa-matrix/src/runners/contract/runtime.ts +++ b/extensions/qa-matrix/src/runners/contract/runtime.ts @@ -12,12 +12,10 @@ import { appendLiveLaneIssue, buildLiveLaneArtifactsError, } from "../../shared/live-lane-helpers.js"; -import { - provisionMatrixQaRoom, - type MatrixQaObservedEvent, - type MatrixQaProvisionResult, -} from "../../substrate/client.js"; +import { buildMatrixQaObservedEventsArtifact } from "../../substrate/artifacts.js"; +import { provisionMatrixQaRoom, type MatrixQaProvisionResult } from "../../substrate/client.js"; import { buildMatrixQaConfig, type MatrixQaConfigOverrides } from "../../substrate/config.js"; +import type { MatrixQaObservedEvent } from "../../substrate/events.js"; import { startMatrixQaHarness } from "../../substrate/harness.runtime.js"; import { resolveMatrixQaModels } from "./model-selection.js"; import { @@ -140,29 +138,6 @@ function buildMatrixQaSummary(params: { }; } -function buildObservedEventsArtifact(params: { - includeContent: boolean; - observedEvents: MatrixQaObservedEvent[]; -}) { - return params.observedEvents.map((event) => - params.includeContent - ? event - : { - roomId: event.roomId, - eventId: event.eventId, - sender: event.sender, - stateKey: event.stateKey, - type: event.type, - originServerTs: event.originServerTs, - msgtype: event.msgtype, - membership: event.membership, - relatesTo: event.relatesTo, - mentions: event.mentions, - reaction: event.reaction, - }, - ); -} - function isMatrixAccountReady(entry?: { connected?: boolean; healthState?: string; @@ -531,7 +506,7 @@ export async function runMatrixQaLive(params: { await fs.writeFile( observedEventsPath, `${JSON.stringify( - buildObservedEventsArtifact({ + buildMatrixQaObservedEventsArtifact({ includeContent: includeObservedEventContent, observedEvents, }), @@ -581,7 +556,6 @@ export const __testing = { buildMatrixQaSummary, MATRIX_QA_SCENARIOS, buildMatrixQaConfig, - buildObservedEventsArtifact, isMatrixAccountReady, resolveMatrixQaModels, waitForMatrixChannelReady, diff --git a/extensions/qa-matrix/src/runners/contract/scenarios.ts b/extensions/qa-matrix/src/runners/contract/scenarios.ts index 5c07c9c25f6..8a54ab5e15b 100644 --- a/extensions/qa-matrix/src/runners/contract/scenarios.ts +++ b/extensions/qa-matrix/src/runners/contract/scenarios.ts @@ -4,8 +4,9 @@ import { selectLiveTransportScenarios, type LiveTransportScenarioDefinition, } from "../../shared/live-transport-scenarios.js"; -import { createMatrixQaClient, type MatrixQaObservedEvent } from "../../substrate/client.js"; +import { createMatrixQaClient } from "../../substrate/client.js"; import { type MatrixQaConfigOverrides } from "../../substrate/config.js"; +import type { MatrixQaObservedEvent } from "../../substrate/events.js"; import { buildDefaultMatrixQaTopologySpec, findMatrixQaProvisionedRoom, diff --git a/extensions/qa-matrix/src/substrate/artifacts.ts b/extensions/qa-matrix/src/substrate/artifacts.ts new file mode 100644 index 00000000000..a8da02d598d --- /dev/null +++ b/extensions/qa-matrix/src/substrate/artifacts.ts @@ -0,0 +1,25 @@ +import type { MatrixQaObservedEvent } from "./events.js"; + +export function buildMatrixQaObservedEventsArtifact(params: { + includeContent: boolean; + observedEvents: MatrixQaObservedEvent[]; +}) { + return params.observedEvents.map((event) => + params.includeContent + ? event + : { + kind: event.kind, + roomId: event.roomId, + eventId: event.eventId, + sender: event.sender, + stateKey: event.stateKey, + type: event.type, + originServerTs: event.originServerTs, + msgtype: event.msgtype, + membership: event.membership, + relatesTo: event.relatesTo, + mentions: event.mentions, + reaction: event.reaction, + }, + ); +} diff --git a/extensions/qa-matrix/src/substrate/client.ts b/extensions/qa-matrix/src/substrate/client.ts index ef5158eebfc..f7e34c08267 100644 --- a/extensions/qa-matrix/src/substrate/client.ts +++ b/extensions/qa-matrix/src/substrate/client.ts @@ -1,5 +1,13 @@ import { randomUUID } from "node:crypto"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; +import type { MatrixQaObservedEvent } from "./events.js"; +import { requestMatrixJson, type MatrixQaFetchLike } from "./request.js"; +import { + primeMatrixQaRoom, + waitForMatrixQaRoomEvent, + waitForOptionalMatrixQaRoomEvent, + type MatrixQaRoomEventWaitResult, +} from "./sync.js"; import { findMatrixQaProvisionedRoom, type MatrixQaParticipantRole, @@ -8,15 +16,11 @@ import { type MatrixQaTopologySpec, } from "./topology.js"; -type FetchLike = typeof fetch; +export type { MatrixQaObservedEvent } from "./events.js"; +export type { MatrixQaRoomEventWaitResult } from "./sync.js"; type MatrixQaAuthStage = "m.login.dummy" | "m.login.registration_token"; -type MatrixQaRequestResult = { - status: number; - body: T; -}; - type MatrixQaRegisterResponse = { access_token?: string; device_id?: string; @@ -53,62 +57,12 @@ type MatrixQaSendReactionContent = { }; }; -type MatrixQaSyncResponse = { - next_batch?: string; - rooms?: { - join?: Record< - string, - { - timeline?: { - events?: MatrixQaRoomEvent[]; - }; - } - >; - }; -}; - type MatrixQaUiaaResponse = { completed?: string[]; flows?: Array<{ stages?: string[] }>; session?: string; }; -type MatrixQaRoomEvent = { - content?: Record; - event_id?: string; - origin_server_ts?: number; - sender?: string; - state_key?: string; - type?: string; -}; - -export type MatrixQaObservedEvent = { - roomId: string; - eventId: string; - sender?: string; - stateKey?: string; - type: string; - originServerTs?: number; - body?: string; - formattedBody?: string; - msgtype?: string; - membership?: string; - relatesTo?: { - eventId?: string; - inReplyToId?: string; - isFallingBack?: boolean; - relType?: string; - }; - mentions?: { - room?: boolean; - userIds?: string[]; - }; - reaction?: { - eventId?: string; - key?: string; - }; -}; - export type MatrixQaRegisteredAccount = { accessToken: string; deviceId?: string; @@ -125,17 +79,6 @@ export type MatrixQaProvisionResult = { topology: MatrixQaProvisionedTopology; }; -export type MatrixQaRoomEventWaitResult = - | { - event: MatrixQaObservedEvent; - matched: true; - since?: string; - } - | { - matched: false; - since?: string; - }; - function buildMatrixThreadRelation(threadRootEventId: string, replyToEventId?: string) { return { "m.relates_to": { @@ -243,89 +186,6 @@ function buildMatrixQaMessageContent(params: { }; } -function normalizeMentionUserIds(value: unknown) { - return Array.isArray(value) - ? value.filter((entry): entry is string => typeof entry === "string" && entry.trim().length > 0) - : undefined; -} - -export function normalizeMatrixQaObservedEvent( - roomId: string, - event: MatrixQaRoomEvent, -): MatrixQaObservedEvent | null { - const eventId = event.event_id?.trim(); - const type = event.type?.trim(); - if (!eventId || !type) { - return null; - } - const content = event.content ?? {}; - const relatesToRaw = content["m.relates_to"]; - const relatesTo = - typeof relatesToRaw === "object" && relatesToRaw !== null - ? (relatesToRaw as Record) - : null; - const inReplyToRaw = relatesTo?.["m.in_reply_to"]; - const inReplyTo = - typeof inReplyToRaw === "object" && inReplyToRaw !== null - ? (inReplyToRaw as Record) - : null; - const mentionsRaw = content["m.mentions"]; - const mentions = - typeof mentionsRaw === "object" && mentionsRaw !== null - ? (mentionsRaw as Record) - : null; - const mentionUserIds = normalizeMentionUserIds(mentions?.user_ids); - const reactionKey = - type === "m.reaction" && typeof relatesTo?.key === "string" ? relatesTo.key : undefined; - const reactionEventId = - type === "m.reaction" && typeof relatesTo?.event_id === "string" - ? relatesTo.event_id - : undefined; - - return { - roomId, - eventId, - sender: typeof event.sender === "string" ? event.sender : undefined, - stateKey: typeof event.state_key === "string" ? event.state_key : undefined, - type, - originServerTs: - typeof event.origin_server_ts === "number" ? Math.floor(event.origin_server_ts) : undefined, - body: typeof content.body === "string" ? content.body : undefined, - formattedBody: typeof content.formatted_body === "string" ? content.formatted_body : undefined, - msgtype: typeof content.msgtype === "string" ? content.msgtype : undefined, - membership: typeof content.membership === "string" ? content.membership : undefined, - ...(relatesTo - ? { - relatesTo: { - eventId: typeof relatesTo.event_id === "string" ? relatesTo.event_id : undefined, - inReplyToId: typeof inReplyTo?.event_id === "string" ? inReplyTo.event_id : undefined, - isFallingBack: - typeof relatesTo.is_falling_back === "boolean" - ? relatesTo.is_falling_back - : undefined, - relType: typeof relatesTo.rel_type === "string" ? relatesTo.rel_type : undefined, - }, - } - : {}), - ...(mentions - ? { - mentions: { - ...(mentions.room === true ? { room: true } : {}), - ...(mentionUserIds ? { userIds: mentionUserIds } : {}), - }, - } - : {}), - ...(reactionEventId || reactionKey - ? { - reaction: { - ...(reactionEventId ? { eventId: reactionEventId } : {}), - ...(reactionKey ? { key: reactionKey } : {}), - }, - } - : {}), - }; -} - export function resolveNextRegistrationAuth(params: { registrationToken: string; response: MatrixQaUiaaResponse; @@ -377,55 +237,6 @@ export function resolveNextRegistrationAuth(params: { ); } -async function requestMatrixJson(params: { - accessToken?: string; - baseUrl: string; - body?: unknown; - endpoint: string; - fetchImpl: FetchLike; - method: "GET" | "POST" | "PUT"; - okStatuses?: number[]; - query?: Record; - timeoutMs?: number; -}) { - const url = new URL(params.endpoint, params.baseUrl); - for (const [key, value] of Object.entries(params.query ?? {})) { - if (value !== undefined) { - url.searchParams.set(key, String(value)); - } - } - const response = await params.fetchImpl(url, { - method: params.method, - headers: { - accept: "application/json", - ...(params.body !== undefined ? { "content-type": "application/json" } : {}), - ...(params.accessToken ? { authorization: `Bearer ${params.accessToken}` } : {}), - }, - ...(params.body !== undefined ? { body: JSON.stringify(params.body) } : {}), - signal: AbortSignal.timeout(params.timeoutMs ?? 20_000), - }); - let body: unknown = {}; - try { - body = (await response.json()) as unknown; - } catch { - body = {}; - } - const okStatuses = params.okStatuses ?? [200]; - if (!okStatuses.includes(response.status)) { - const details = - typeof body === "object" && - body !== null && - typeof (body as { error?: unknown }).error === "string" - ? (body as { error: string }).error - : `${params.method} ${params.endpoint} failed with status ${response.status}`; - throw new Error(details); - } - return { - status: response.status, - body: body as T, - } satisfies MatrixQaRequestResult; -} - function buildRegisteredAccount(params: { localpart: string; password: string; @@ -448,53 +259,10 @@ function buildRegisteredAccount(params: { export function createMatrixQaClient(params: { accessToken?: string; baseUrl: string; - fetchImpl?: FetchLike; + fetchImpl?: MatrixQaFetchLike; }) { const fetchImpl = params.fetchImpl ?? fetch; - async function waitForOptionalRoomEvent(opts: { - observedEvents: MatrixQaObservedEvent[]; - predicate: (event: MatrixQaObservedEvent) => boolean; - roomId: string; - since?: string; - timeoutMs: number; - }): Promise { - const startedAt = Date.now(); - let since = opts.since; - while (Date.now() - startedAt < opts.timeoutMs) { - const remainingMs = Math.max(1_000, opts.timeoutMs - (Date.now() - startedAt)); - const response = await requestMatrixJson({ - accessToken: params.accessToken, - baseUrl: params.baseUrl, - endpoint: "/_matrix/client/v3/sync", - fetchImpl, - method: "GET", - query: { - ...(since ? { since } : {}), - timeout: Math.min(10_000, remainingMs), - }, - timeoutMs: Math.min(15_000, remainingMs + 5_000), - }); - since = response.body.next_batch?.trim() || since; - const roomEvents = response.body.rooms?.join?.[opts.roomId]?.timeline?.events ?? []; - let matchedEvent: MatrixQaObservedEvent | null = null; - for (const event of roomEvents) { - const normalized = normalizeMatrixQaObservedEvent(opts.roomId, event); - if (!normalized) { - continue; - } - opts.observedEvents.push(normalized); - if (matchedEvent === null && opts.predicate(normalized)) { - matchedEvent = normalized; - } - } - if (matchedEvent) { - return { event: matchedEvent, matched: true, since }; - } - } - return { matched: false, since }; - } - return { async createPrivateRoom(opts: { inviteUserIds: string[]; isDirect?: boolean; name: string }) { const result = await requestMatrixJson({ @@ -525,15 +293,11 @@ export function createMatrixQaClient(params: { return roomId; }, async primeRoom() { - const response = await requestMatrixJson({ + return await primeMatrixQaRoom({ accessToken: params.accessToken, baseUrl: params.baseUrl, - endpoint: "/_matrix/client/v3/sync", fetchImpl, - method: "GET", - query: { timeout: 0 }, }); - return response.body.next_batch?.trim() || undefined; }, async registerWithToken(opts: { deviceName: string; @@ -661,7 +425,20 @@ export function createMatrixQaClient(params: { method: "POST", }); }, - waitForOptionalRoomEvent, + waitForOptionalRoomEvent(opts: { + observedEvents: MatrixQaObservedEvent[]; + predicate: (event: MatrixQaObservedEvent) => boolean; + roomId: string; + since?: string; + timeoutMs: number; + }) { + return waitForOptionalMatrixQaRoomEvent({ + accessToken: params.accessToken, + baseUrl: params.baseUrl, + fetchImpl, + ...opts, + }); + }, async waitForRoomEvent(opts: { observedEvents: MatrixQaObservedEvent[]; predicate: (event: MatrixQaObservedEvent) => boolean; @@ -669,11 +446,12 @@ export function createMatrixQaClient(params: { since?: string; timeoutMs: number; }) { - const result = await waitForOptionalRoomEvent(opts); - if (result.matched) { - return { event: result.event, since: result.since }; - } - throw new Error(`timed out after ${opts.timeoutMs}ms waiting for Matrix room event`); + return await waitForMatrixQaRoomEvent({ + accessToken: params.accessToken, + baseUrl: params.baseUrl, + fetchImpl, + ...opts, + }); }, }; } @@ -681,7 +459,7 @@ export function createMatrixQaClient(params: { async function joinRoomWithRetry(params: { accessToken: string; baseUrl: string; - fetchImpl?: FetchLike; + fetchImpl?: MatrixQaFetchLike; roomId: string; }) { const client = createMatrixQaClient({ @@ -723,7 +501,7 @@ function resolveTopologyMemberAccounts( async function provisionMatrixQaTopology(params: { accounts: Record; baseUrl: string; - fetchImpl?: FetchLike; + fetchImpl?: MatrixQaFetchLike; spec: MatrixQaTopologySpec; }): Promise { const rooms = []; @@ -779,7 +557,7 @@ async function provisionMatrixQaTopology(params: { export async function provisionMatrixQaRoom(params: { baseUrl: string; - fetchImpl?: FetchLike; + fetchImpl?: MatrixQaFetchLike; topology?: MatrixQaTopologySpec; roomName: string; driverLocalpart: string; @@ -845,6 +623,5 @@ export const __testing = { buildMatrixQaMessageContent, buildMatrixReactionRelation, buildMatrixThreadRelation, - normalizeMatrixQaObservedEvent, resolveNextRegistrationAuth, }; diff --git a/extensions/qa-matrix/src/substrate/events.ts b/extensions/qa-matrix/src/substrate/events.ts new file mode 100644 index 00000000000..0313057baed --- /dev/null +++ b/extensions/qa-matrix/src/substrate/events.ts @@ -0,0 +1,145 @@ +export type MatrixQaRoomEvent = { + content?: Record; + event_id?: string; + origin_server_ts?: number; + sender?: string; + state_key?: string; + type?: string; +}; + +export type MatrixQaObservedEventKind = + | "membership" + | "message" + | "notice" + | "redaction" + | "reaction" + | "room-event"; + +export type MatrixQaObservedEvent = { + kind: MatrixQaObservedEventKind; + roomId: string; + eventId: string; + sender?: string; + stateKey?: string; + type: string; + originServerTs?: number; + body?: string; + formattedBody?: string; + msgtype?: string; + membership?: string; + relatesTo?: { + eventId?: string; + inReplyToId?: string; + isFallingBack?: boolean; + relType?: string; + }; + mentions?: { + room?: boolean; + userIds?: string[]; + }; + reaction?: { + eventId?: string; + key?: string; + }; +}; + +function normalizeMentionUserIds(value: unknown) { + return Array.isArray(value) + ? value.filter((entry): entry is string => typeof entry === "string" && entry.trim().length > 0) + : undefined; +} + +function resolveMatrixQaObservedEventKind(params: { msgtype?: string; type: string }) { + if (params.type === "m.reaction") { + return "reaction" as const; + } + if (params.type === "m.room.redaction") { + return "redaction" as const; + } + if (params.type === "m.room.member") { + return "membership" as const; + } + if (params.type === "m.room.message") { + return params.msgtype === "m.notice" ? ("notice" as const) : ("message" as const); + } + return "room-event" as const; +} + +export function normalizeMatrixQaObservedEvent( + roomId: string, + event: MatrixQaRoomEvent, +): MatrixQaObservedEvent | null { + const eventId = event.event_id?.trim(); + const type = event.type?.trim(); + if (!eventId || !type) { + return null; + } + const content = event.content ?? {}; + const msgtype = typeof content.msgtype === "string" ? content.msgtype : undefined; + const relatesToRaw = content["m.relates_to"]; + const relatesTo = + typeof relatesToRaw === "object" && relatesToRaw !== null + ? (relatesToRaw as Record) + : null; + const inReplyToRaw = relatesTo?.["m.in_reply_to"]; + const inReplyTo = + typeof inReplyToRaw === "object" && inReplyToRaw !== null + ? (inReplyToRaw as Record) + : null; + const mentionsRaw = content["m.mentions"]; + const mentions = + typeof mentionsRaw === "object" && mentionsRaw !== null + ? (mentionsRaw as Record) + : null; + const mentionUserIds = normalizeMentionUserIds(mentions?.user_ids); + const reactionKey = + type === "m.reaction" && typeof relatesTo?.key === "string" ? relatesTo.key : undefined; + const reactionEventId = + type === "m.reaction" && typeof relatesTo?.event_id === "string" + ? relatesTo.event_id + : undefined; + + return { + kind: resolveMatrixQaObservedEventKind({ msgtype, type }), + roomId, + eventId, + sender: typeof event.sender === "string" ? event.sender : undefined, + stateKey: typeof event.state_key === "string" ? event.state_key : undefined, + type, + originServerTs: + typeof event.origin_server_ts === "number" ? Math.floor(event.origin_server_ts) : undefined, + body: typeof content.body === "string" ? content.body : undefined, + formattedBody: typeof content.formatted_body === "string" ? content.formatted_body : undefined, + msgtype, + membership: typeof content.membership === "string" ? content.membership : undefined, + ...(relatesTo + ? { + relatesTo: { + eventId: typeof relatesTo.event_id === "string" ? relatesTo.event_id : undefined, + inReplyToId: typeof inReplyTo?.event_id === "string" ? inReplyTo.event_id : undefined, + isFallingBack: + typeof relatesTo.is_falling_back === "boolean" + ? relatesTo.is_falling_back + : undefined, + relType: typeof relatesTo.rel_type === "string" ? relatesTo.rel_type : undefined, + }, + } + : {}), + ...(mentions + ? { + mentions: { + ...(mentions.room === true ? { room: true } : {}), + ...(mentionUserIds ? { userIds: mentionUserIds } : {}), + }, + } + : {}), + ...(reactionEventId || reactionKey + ? { + reaction: { + ...(reactionEventId ? { eventId: reactionEventId } : {}), + ...(reactionKey ? { key: reactionKey } : {}), + }, + } + : {}), + }; +} diff --git a/extensions/qa-matrix/src/substrate/request.ts b/extensions/qa-matrix/src/substrate/request.ts new file mode 100644 index 00000000000..8c09505d066 --- /dev/null +++ b/extensions/qa-matrix/src/substrate/request.ts @@ -0,0 +1,55 @@ +export type MatrixQaFetchLike = typeof fetch; + +type MatrixQaRequestResult = { + status: number; + body: T; +}; + +export async function requestMatrixJson(params: { + accessToken?: string; + baseUrl: string; + body?: unknown; + endpoint: string; + fetchImpl: MatrixQaFetchLike; + method: "GET" | "POST" | "PUT"; + okStatuses?: number[]; + query?: Record; + timeoutMs?: number; +}): Promise> { + const url = new URL(params.endpoint, params.baseUrl); + for (const [key, value] of Object.entries(params.query ?? {})) { + if (value !== undefined) { + url.searchParams.set(key, String(value)); + } + } + const response = await params.fetchImpl(url, { + method: params.method, + headers: { + accept: "application/json", + ...(params.body !== undefined ? { "content-type": "application/json" } : {}), + ...(params.accessToken ? { authorization: `Bearer ${params.accessToken}` } : {}), + }, + ...(params.body !== undefined ? { body: JSON.stringify(params.body) } : {}), + signal: AbortSignal.timeout(params.timeoutMs ?? 20_000), + }); + let body: unknown = {}; + try { + body = (await response.json()) as unknown; + } catch { + body = {}; + } + const okStatuses = params.okStatuses ?? [200]; + if (!okStatuses.includes(response.status)) { + const details = + typeof body === "object" && + body !== null && + typeof (body as { error?: unknown }).error === "string" + ? (body as { error: string }).error + : `${params.method} ${params.endpoint} failed with status ${response.status}`; + throw new Error(details); + } + return { + status: response.status, + body: body as T, + }; +} diff --git a/extensions/qa-matrix/src/substrate/sync.ts b/extensions/qa-matrix/src/substrate/sync.ts new file mode 100644 index 00000000000..ee3f3678807 --- /dev/null +++ b/extensions/qa-matrix/src/substrate/sync.ts @@ -0,0 +1,107 @@ +import type { MatrixQaObservedEvent } from "./events.js"; +import { normalizeMatrixQaObservedEvent, type MatrixQaRoomEvent } from "./events.js"; +import { requestMatrixJson, type MatrixQaFetchLike } from "./request.js"; + +type MatrixQaSyncResponse = { + next_batch?: string; + rooms?: { + join?: Record< + string, + { + timeline?: { + events?: MatrixQaRoomEvent[]; + }; + } + >; + }; +}; + +export type MatrixQaRoomEventWaitResult = + | { + event: MatrixQaObservedEvent; + matched: true; + since?: string; + } + | { + matched: false; + since?: string; + }; + +type MatrixQaSyncParams = { + accessToken?: string; + baseUrl: string; + fetchImpl: MatrixQaFetchLike; +}; + +export async function primeMatrixQaRoom(params: MatrixQaSyncParams) { + const response = await requestMatrixJson({ + accessToken: params.accessToken, + baseUrl: params.baseUrl, + endpoint: "/_matrix/client/v3/sync", + fetchImpl: params.fetchImpl, + method: "GET", + query: { timeout: 0 }, + }); + return response.body.next_batch?.trim() || undefined; +} + +export async function waitForOptionalMatrixQaRoomEvent( + params: MatrixQaSyncParams & { + observedEvents: MatrixQaObservedEvent[]; + predicate: (event: MatrixQaObservedEvent) => boolean; + roomId: string; + since?: string; + timeoutMs: number; + }, +): Promise { + const startedAt = Date.now(); + let since = params.since; + while (Date.now() - startedAt < params.timeoutMs) { + const remainingMs = Math.max(1_000, params.timeoutMs - (Date.now() - startedAt)); + const response = await requestMatrixJson({ + accessToken: params.accessToken, + baseUrl: params.baseUrl, + endpoint: "/_matrix/client/v3/sync", + fetchImpl: params.fetchImpl, + method: "GET", + query: { + ...(since ? { since } : {}), + timeout: Math.min(10_000, remainingMs), + }, + timeoutMs: Math.min(15_000, remainingMs + 5_000), + }); + since = response.body.next_batch?.trim() || since; + const roomEvents = response.body.rooms?.join?.[params.roomId]?.timeline?.events ?? []; + let matchedEvent: MatrixQaObservedEvent | null = null; + for (const event of roomEvents) { + const normalized = normalizeMatrixQaObservedEvent(params.roomId, event); + if (!normalized) { + continue; + } + params.observedEvents.push(normalized); + if (matchedEvent === null && params.predicate(normalized)) { + matchedEvent = normalized; + } + } + if (matchedEvent) { + return { event: matchedEvent, matched: true, since }; + } + } + return { matched: false, since }; +} + +export async function waitForMatrixQaRoomEvent( + params: MatrixQaSyncParams & { + observedEvents: MatrixQaObservedEvent[]; + predicate: (event: MatrixQaObservedEvent) => boolean; + roomId: string; + since?: string; + timeoutMs: number; + }, +) { + const result = await waitForOptionalMatrixQaRoomEvent(params); + if (result.matched) { + return { event: result.event, since: result.since }; + } + throw new Error(`timed out after ${params.timeoutMs}ms waiting for Matrix room event`); +}