mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:40:44 +00:00
QA: extract Matrix event modules
This commit is contained in:
@@ -12,12 +12,10 @@ import {
|
||||
appendLiveLaneIssue,
|
||||
buildLiveLaneArtifactsError,
|
||||
} from "../../shared/live-lane-helpers.js";
|
||||
import {
|
||||
provisionMatrixQaRoom,
|
||||
type MatrixQaObservedEvent,
|
||||
type MatrixQaProvisionResult,
|
||||
} from "../../substrate/client.js";
|
||||
import { buildMatrixQaObservedEventsArtifact } from "../../substrate/artifacts.js";
|
||||
import { provisionMatrixQaRoom, type MatrixQaProvisionResult } from "../../substrate/client.js";
|
||||
import { buildMatrixQaConfig, type MatrixQaConfigOverrides } from "../../substrate/config.js";
|
||||
import type { MatrixQaObservedEvent } from "../../substrate/events.js";
|
||||
import { startMatrixQaHarness } from "../../substrate/harness.runtime.js";
|
||||
import { resolveMatrixQaModels } from "./model-selection.js";
|
||||
import {
|
||||
@@ -140,29 +138,6 @@ function buildMatrixQaSummary(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function buildObservedEventsArtifact(params: {
|
||||
includeContent: boolean;
|
||||
observedEvents: MatrixQaObservedEvent[];
|
||||
}) {
|
||||
return params.observedEvents.map((event) =>
|
||||
params.includeContent
|
||||
? event
|
||||
: {
|
||||
roomId: event.roomId,
|
||||
eventId: event.eventId,
|
||||
sender: event.sender,
|
||||
stateKey: event.stateKey,
|
||||
type: event.type,
|
||||
originServerTs: event.originServerTs,
|
||||
msgtype: event.msgtype,
|
||||
membership: event.membership,
|
||||
relatesTo: event.relatesTo,
|
||||
mentions: event.mentions,
|
||||
reaction: event.reaction,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
function isMatrixAccountReady(entry?: {
|
||||
connected?: boolean;
|
||||
healthState?: string;
|
||||
@@ -531,7 +506,7 @@ export async function runMatrixQaLive(params: {
|
||||
await fs.writeFile(
|
||||
observedEventsPath,
|
||||
`${JSON.stringify(
|
||||
buildObservedEventsArtifact({
|
||||
buildMatrixQaObservedEventsArtifact({
|
||||
includeContent: includeObservedEventContent,
|
||||
observedEvents,
|
||||
}),
|
||||
@@ -581,7 +556,6 @@ export const __testing = {
|
||||
buildMatrixQaSummary,
|
||||
MATRIX_QA_SCENARIOS,
|
||||
buildMatrixQaConfig,
|
||||
buildObservedEventsArtifact,
|
||||
isMatrixAccountReady,
|
||||
resolveMatrixQaModels,
|
||||
waitForMatrixChannelReady,
|
||||
|
||||
@@ -4,8 +4,9 @@ import {
|
||||
selectLiveTransportScenarios,
|
||||
type LiveTransportScenarioDefinition,
|
||||
} from "../../shared/live-transport-scenarios.js";
|
||||
import { createMatrixQaClient, type MatrixQaObservedEvent } from "../../substrate/client.js";
|
||||
import { createMatrixQaClient } from "../../substrate/client.js";
|
||||
import { type MatrixQaConfigOverrides } from "../../substrate/config.js";
|
||||
import type { MatrixQaObservedEvent } from "../../substrate/events.js";
|
||||
import {
|
||||
buildDefaultMatrixQaTopologySpec,
|
||||
findMatrixQaProvisionedRoom,
|
||||
|
||||
25
extensions/qa-matrix/src/substrate/artifacts.ts
Normal file
25
extensions/qa-matrix/src/substrate/artifacts.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
import type { MatrixQaObservedEvent } from "./events.js";
|
||||
|
||||
export function buildMatrixQaObservedEventsArtifact(params: {
|
||||
includeContent: boolean;
|
||||
observedEvents: MatrixQaObservedEvent[];
|
||||
}) {
|
||||
return params.observedEvents.map((event) =>
|
||||
params.includeContent
|
||||
? event
|
||||
: {
|
||||
kind: event.kind,
|
||||
roomId: event.roomId,
|
||||
eventId: event.eventId,
|
||||
sender: event.sender,
|
||||
stateKey: event.stateKey,
|
||||
type: event.type,
|
||||
originServerTs: event.originServerTs,
|
||||
msgtype: event.msgtype,
|
||||
membership: event.membership,
|
||||
relatesTo: event.relatesTo,
|
||||
mentions: event.mentions,
|
||||
reaction: event.reaction,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -1,5 +1,13 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
|
||||
import type { MatrixQaObservedEvent } from "./events.js";
|
||||
import { requestMatrixJson, type MatrixQaFetchLike } from "./request.js";
|
||||
import {
|
||||
primeMatrixQaRoom,
|
||||
waitForMatrixQaRoomEvent,
|
||||
waitForOptionalMatrixQaRoomEvent,
|
||||
type MatrixQaRoomEventWaitResult,
|
||||
} from "./sync.js";
|
||||
import {
|
||||
findMatrixQaProvisionedRoom,
|
||||
type MatrixQaParticipantRole,
|
||||
@@ -8,15 +16,11 @@ import {
|
||||
type MatrixQaTopologySpec,
|
||||
} from "./topology.js";
|
||||
|
||||
type FetchLike = typeof fetch;
|
||||
export type { MatrixQaObservedEvent } from "./events.js";
|
||||
export type { MatrixQaRoomEventWaitResult } from "./sync.js";
|
||||
|
||||
type MatrixQaAuthStage = "m.login.dummy" | "m.login.registration_token";
|
||||
|
||||
type MatrixQaRequestResult<T> = {
|
||||
status: number;
|
||||
body: T;
|
||||
};
|
||||
|
||||
type MatrixQaRegisterResponse = {
|
||||
access_token?: string;
|
||||
device_id?: string;
|
||||
@@ -53,62 +57,12 @@ type MatrixQaSendReactionContent = {
|
||||
};
|
||||
};
|
||||
|
||||
type MatrixQaSyncResponse = {
|
||||
next_batch?: string;
|
||||
rooms?: {
|
||||
join?: Record<
|
||||
string,
|
||||
{
|
||||
timeline?: {
|
||||
events?: MatrixQaRoomEvent[];
|
||||
};
|
||||
}
|
||||
>;
|
||||
};
|
||||
};
|
||||
|
||||
type MatrixQaUiaaResponse = {
|
||||
completed?: string[];
|
||||
flows?: Array<{ stages?: string[] }>;
|
||||
session?: string;
|
||||
};
|
||||
|
||||
type MatrixQaRoomEvent = {
|
||||
content?: Record<string, unknown>;
|
||||
event_id?: string;
|
||||
origin_server_ts?: number;
|
||||
sender?: string;
|
||||
state_key?: string;
|
||||
type?: string;
|
||||
};
|
||||
|
||||
export type MatrixQaObservedEvent = {
|
||||
roomId: string;
|
||||
eventId: string;
|
||||
sender?: string;
|
||||
stateKey?: string;
|
||||
type: string;
|
||||
originServerTs?: number;
|
||||
body?: string;
|
||||
formattedBody?: string;
|
||||
msgtype?: string;
|
||||
membership?: string;
|
||||
relatesTo?: {
|
||||
eventId?: string;
|
||||
inReplyToId?: string;
|
||||
isFallingBack?: boolean;
|
||||
relType?: string;
|
||||
};
|
||||
mentions?: {
|
||||
room?: boolean;
|
||||
userIds?: string[];
|
||||
};
|
||||
reaction?: {
|
||||
eventId?: string;
|
||||
key?: string;
|
||||
};
|
||||
};
|
||||
|
||||
export type MatrixQaRegisteredAccount = {
|
||||
accessToken: string;
|
||||
deviceId?: string;
|
||||
@@ -125,17 +79,6 @@ export type MatrixQaProvisionResult = {
|
||||
topology: MatrixQaProvisionedTopology;
|
||||
};
|
||||
|
||||
export type MatrixQaRoomEventWaitResult =
|
||||
| {
|
||||
event: MatrixQaObservedEvent;
|
||||
matched: true;
|
||||
since?: string;
|
||||
}
|
||||
| {
|
||||
matched: false;
|
||||
since?: string;
|
||||
};
|
||||
|
||||
function buildMatrixThreadRelation(threadRootEventId: string, replyToEventId?: string) {
|
||||
return {
|
||||
"m.relates_to": {
|
||||
@@ -243,89 +186,6 @@ function buildMatrixQaMessageContent(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeMentionUserIds(value: unknown) {
|
||||
return Array.isArray(value)
|
||||
? value.filter((entry): entry is string => typeof entry === "string" && entry.trim().length > 0)
|
||||
: undefined;
|
||||
}
|
||||
|
||||
export function normalizeMatrixQaObservedEvent(
|
||||
roomId: string,
|
||||
event: MatrixQaRoomEvent,
|
||||
): MatrixQaObservedEvent | null {
|
||||
const eventId = event.event_id?.trim();
|
||||
const type = event.type?.trim();
|
||||
if (!eventId || !type) {
|
||||
return null;
|
||||
}
|
||||
const content = event.content ?? {};
|
||||
const relatesToRaw = content["m.relates_to"];
|
||||
const relatesTo =
|
||||
typeof relatesToRaw === "object" && relatesToRaw !== null
|
||||
? (relatesToRaw as Record<string, unknown>)
|
||||
: null;
|
||||
const inReplyToRaw = relatesTo?.["m.in_reply_to"];
|
||||
const inReplyTo =
|
||||
typeof inReplyToRaw === "object" && inReplyToRaw !== null
|
||||
? (inReplyToRaw as Record<string, unknown>)
|
||||
: null;
|
||||
const mentionsRaw = content["m.mentions"];
|
||||
const mentions =
|
||||
typeof mentionsRaw === "object" && mentionsRaw !== null
|
||||
? (mentionsRaw as Record<string, unknown>)
|
||||
: null;
|
||||
const mentionUserIds = normalizeMentionUserIds(mentions?.user_ids);
|
||||
const reactionKey =
|
||||
type === "m.reaction" && typeof relatesTo?.key === "string" ? relatesTo.key : undefined;
|
||||
const reactionEventId =
|
||||
type === "m.reaction" && typeof relatesTo?.event_id === "string"
|
||||
? relatesTo.event_id
|
||||
: undefined;
|
||||
|
||||
return {
|
||||
roomId,
|
||||
eventId,
|
||||
sender: typeof event.sender === "string" ? event.sender : undefined,
|
||||
stateKey: typeof event.state_key === "string" ? event.state_key : undefined,
|
||||
type,
|
||||
originServerTs:
|
||||
typeof event.origin_server_ts === "number" ? Math.floor(event.origin_server_ts) : undefined,
|
||||
body: typeof content.body === "string" ? content.body : undefined,
|
||||
formattedBody: typeof content.formatted_body === "string" ? content.formatted_body : undefined,
|
||||
msgtype: typeof content.msgtype === "string" ? content.msgtype : undefined,
|
||||
membership: typeof content.membership === "string" ? content.membership : undefined,
|
||||
...(relatesTo
|
||||
? {
|
||||
relatesTo: {
|
||||
eventId: typeof relatesTo.event_id === "string" ? relatesTo.event_id : undefined,
|
||||
inReplyToId: typeof inReplyTo?.event_id === "string" ? inReplyTo.event_id : undefined,
|
||||
isFallingBack:
|
||||
typeof relatesTo.is_falling_back === "boolean"
|
||||
? relatesTo.is_falling_back
|
||||
: undefined,
|
||||
relType: typeof relatesTo.rel_type === "string" ? relatesTo.rel_type : undefined,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
...(mentions
|
||||
? {
|
||||
mentions: {
|
||||
...(mentions.room === true ? { room: true } : {}),
|
||||
...(mentionUserIds ? { userIds: mentionUserIds } : {}),
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
...(reactionEventId || reactionKey
|
||||
? {
|
||||
reaction: {
|
||||
...(reactionEventId ? { eventId: reactionEventId } : {}),
|
||||
...(reactionKey ? { key: reactionKey } : {}),
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
};
|
||||
}
|
||||
|
||||
export function resolveNextRegistrationAuth(params: {
|
||||
registrationToken: string;
|
||||
response: MatrixQaUiaaResponse;
|
||||
@@ -377,55 +237,6 @@ export function resolveNextRegistrationAuth(params: {
|
||||
);
|
||||
}
|
||||
|
||||
async function requestMatrixJson<T>(params: {
|
||||
accessToken?: string;
|
||||
baseUrl: string;
|
||||
body?: unknown;
|
||||
endpoint: string;
|
||||
fetchImpl: FetchLike;
|
||||
method: "GET" | "POST" | "PUT";
|
||||
okStatuses?: number[];
|
||||
query?: Record<string, string | number | undefined>;
|
||||
timeoutMs?: number;
|
||||
}) {
|
||||
const url = new URL(params.endpoint, params.baseUrl);
|
||||
for (const [key, value] of Object.entries(params.query ?? {})) {
|
||||
if (value !== undefined) {
|
||||
url.searchParams.set(key, String(value));
|
||||
}
|
||||
}
|
||||
const response = await params.fetchImpl(url, {
|
||||
method: params.method,
|
||||
headers: {
|
||||
accept: "application/json",
|
||||
...(params.body !== undefined ? { "content-type": "application/json" } : {}),
|
||||
...(params.accessToken ? { authorization: `Bearer ${params.accessToken}` } : {}),
|
||||
},
|
||||
...(params.body !== undefined ? { body: JSON.stringify(params.body) } : {}),
|
||||
signal: AbortSignal.timeout(params.timeoutMs ?? 20_000),
|
||||
});
|
||||
let body: unknown = {};
|
||||
try {
|
||||
body = (await response.json()) as unknown;
|
||||
} catch {
|
||||
body = {};
|
||||
}
|
||||
const okStatuses = params.okStatuses ?? [200];
|
||||
if (!okStatuses.includes(response.status)) {
|
||||
const details =
|
||||
typeof body === "object" &&
|
||||
body !== null &&
|
||||
typeof (body as { error?: unknown }).error === "string"
|
||||
? (body as { error: string }).error
|
||||
: `${params.method} ${params.endpoint} failed with status ${response.status}`;
|
||||
throw new Error(details);
|
||||
}
|
||||
return {
|
||||
status: response.status,
|
||||
body: body as T,
|
||||
} satisfies MatrixQaRequestResult<T>;
|
||||
}
|
||||
|
||||
function buildRegisteredAccount(params: {
|
||||
localpart: string;
|
||||
password: string;
|
||||
@@ -448,53 +259,10 @@ function buildRegisteredAccount(params: {
|
||||
export function createMatrixQaClient(params: {
|
||||
accessToken?: string;
|
||||
baseUrl: string;
|
||||
fetchImpl?: FetchLike;
|
||||
fetchImpl?: MatrixQaFetchLike;
|
||||
}) {
|
||||
const fetchImpl = params.fetchImpl ?? fetch;
|
||||
|
||||
async function waitForOptionalRoomEvent(opts: {
|
||||
observedEvents: MatrixQaObservedEvent[];
|
||||
predicate: (event: MatrixQaObservedEvent) => boolean;
|
||||
roomId: string;
|
||||
since?: string;
|
||||
timeoutMs: number;
|
||||
}): Promise<MatrixQaRoomEventWaitResult> {
|
||||
const startedAt = Date.now();
|
||||
let since = opts.since;
|
||||
while (Date.now() - startedAt < opts.timeoutMs) {
|
||||
const remainingMs = Math.max(1_000, opts.timeoutMs - (Date.now() - startedAt));
|
||||
const response = await requestMatrixJson<MatrixQaSyncResponse>({
|
||||
accessToken: params.accessToken,
|
||||
baseUrl: params.baseUrl,
|
||||
endpoint: "/_matrix/client/v3/sync",
|
||||
fetchImpl,
|
||||
method: "GET",
|
||||
query: {
|
||||
...(since ? { since } : {}),
|
||||
timeout: Math.min(10_000, remainingMs),
|
||||
},
|
||||
timeoutMs: Math.min(15_000, remainingMs + 5_000),
|
||||
});
|
||||
since = response.body.next_batch?.trim() || since;
|
||||
const roomEvents = response.body.rooms?.join?.[opts.roomId]?.timeline?.events ?? [];
|
||||
let matchedEvent: MatrixQaObservedEvent | null = null;
|
||||
for (const event of roomEvents) {
|
||||
const normalized = normalizeMatrixQaObservedEvent(opts.roomId, event);
|
||||
if (!normalized) {
|
||||
continue;
|
||||
}
|
||||
opts.observedEvents.push(normalized);
|
||||
if (matchedEvent === null && opts.predicate(normalized)) {
|
||||
matchedEvent = normalized;
|
||||
}
|
||||
}
|
||||
if (matchedEvent) {
|
||||
return { event: matchedEvent, matched: true, since };
|
||||
}
|
||||
}
|
||||
return { matched: false, since };
|
||||
}
|
||||
|
||||
return {
|
||||
async createPrivateRoom(opts: { inviteUserIds: string[]; isDirect?: boolean; name: string }) {
|
||||
const result = await requestMatrixJson<MatrixQaRoomCreateResponse>({
|
||||
@@ -525,15 +293,11 @@ export function createMatrixQaClient(params: {
|
||||
return roomId;
|
||||
},
|
||||
async primeRoom() {
|
||||
const response = await requestMatrixJson<MatrixQaSyncResponse>({
|
||||
return await primeMatrixQaRoom({
|
||||
accessToken: params.accessToken,
|
||||
baseUrl: params.baseUrl,
|
||||
endpoint: "/_matrix/client/v3/sync",
|
||||
fetchImpl,
|
||||
method: "GET",
|
||||
query: { timeout: 0 },
|
||||
});
|
||||
return response.body.next_batch?.trim() || undefined;
|
||||
},
|
||||
async registerWithToken(opts: {
|
||||
deviceName: string;
|
||||
@@ -661,7 +425,20 @@ export function createMatrixQaClient(params: {
|
||||
method: "POST",
|
||||
});
|
||||
},
|
||||
waitForOptionalRoomEvent,
|
||||
waitForOptionalRoomEvent(opts: {
|
||||
observedEvents: MatrixQaObservedEvent[];
|
||||
predicate: (event: MatrixQaObservedEvent) => boolean;
|
||||
roomId: string;
|
||||
since?: string;
|
||||
timeoutMs: number;
|
||||
}) {
|
||||
return waitForOptionalMatrixQaRoomEvent({
|
||||
accessToken: params.accessToken,
|
||||
baseUrl: params.baseUrl,
|
||||
fetchImpl,
|
||||
...opts,
|
||||
});
|
||||
},
|
||||
async waitForRoomEvent(opts: {
|
||||
observedEvents: MatrixQaObservedEvent[];
|
||||
predicate: (event: MatrixQaObservedEvent) => boolean;
|
||||
@@ -669,11 +446,12 @@ export function createMatrixQaClient(params: {
|
||||
since?: string;
|
||||
timeoutMs: number;
|
||||
}) {
|
||||
const result = await waitForOptionalRoomEvent(opts);
|
||||
if (result.matched) {
|
||||
return { event: result.event, since: result.since };
|
||||
}
|
||||
throw new Error(`timed out after ${opts.timeoutMs}ms waiting for Matrix room event`);
|
||||
return await waitForMatrixQaRoomEvent({
|
||||
accessToken: params.accessToken,
|
||||
baseUrl: params.baseUrl,
|
||||
fetchImpl,
|
||||
...opts,
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -681,7 +459,7 @@ export function createMatrixQaClient(params: {
|
||||
async function joinRoomWithRetry(params: {
|
||||
accessToken: string;
|
||||
baseUrl: string;
|
||||
fetchImpl?: FetchLike;
|
||||
fetchImpl?: MatrixQaFetchLike;
|
||||
roomId: string;
|
||||
}) {
|
||||
const client = createMatrixQaClient({
|
||||
@@ -723,7 +501,7 @@ function resolveTopologyMemberAccounts(
|
||||
async function provisionMatrixQaTopology(params: {
|
||||
accounts: Record<MatrixQaParticipantRole, MatrixQaRegisteredAccount>;
|
||||
baseUrl: string;
|
||||
fetchImpl?: FetchLike;
|
||||
fetchImpl?: MatrixQaFetchLike;
|
||||
spec: MatrixQaTopologySpec;
|
||||
}): Promise<MatrixQaProvisionedTopology> {
|
||||
const rooms = [];
|
||||
@@ -779,7 +557,7 @@ async function provisionMatrixQaTopology(params: {
|
||||
|
||||
export async function provisionMatrixQaRoom(params: {
|
||||
baseUrl: string;
|
||||
fetchImpl?: FetchLike;
|
||||
fetchImpl?: MatrixQaFetchLike;
|
||||
topology?: MatrixQaTopologySpec;
|
||||
roomName: string;
|
||||
driverLocalpart: string;
|
||||
@@ -845,6 +623,5 @@ export const __testing = {
|
||||
buildMatrixQaMessageContent,
|
||||
buildMatrixReactionRelation,
|
||||
buildMatrixThreadRelation,
|
||||
normalizeMatrixQaObservedEvent,
|
||||
resolveNextRegistrationAuth,
|
||||
};
|
||||
|
||||
145
extensions/qa-matrix/src/substrate/events.ts
Normal file
145
extensions/qa-matrix/src/substrate/events.ts
Normal file
@@ -0,0 +1,145 @@
|
||||
export type MatrixQaRoomEvent = {
|
||||
content?: Record<string, unknown>;
|
||||
event_id?: string;
|
||||
origin_server_ts?: number;
|
||||
sender?: string;
|
||||
state_key?: string;
|
||||
type?: string;
|
||||
};
|
||||
|
||||
export type MatrixQaObservedEventKind =
|
||||
| "membership"
|
||||
| "message"
|
||||
| "notice"
|
||||
| "redaction"
|
||||
| "reaction"
|
||||
| "room-event";
|
||||
|
||||
export type MatrixQaObservedEvent = {
|
||||
kind: MatrixQaObservedEventKind;
|
||||
roomId: string;
|
||||
eventId: string;
|
||||
sender?: string;
|
||||
stateKey?: string;
|
||||
type: string;
|
||||
originServerTs?: number;
|
||||
body?: string;
|
||||
formattedBody?: string;
|
||||
msgtype?: string;
|
||||
membership?: string;
|
||||
relatesTo?: {
|
||||
eventId?: string;
|
||||
inReplyToId?: string;
|
||||
isFallingBack?: boolean;
|
||||
relType?: string;
|
||||
};
|
||||
mentions?: {
|
||||
room?: boolean;
|
||||
userIds?: string[];
|
||||
};
|
||||
reaction?: {
|
||||
eventId?: string;
|
||||
key?: string;
|
||||
};
|
||||
};
|
||||
|
||||
function normalizeMentionUserIds(value: unknown) {
|
||||
return Array.isArray(value)
|
||||
? value.filter((entry): entry is string => typeof entry === "string" && entry.trim().length > 0)
|
||||
: undefined;
|
||||
}
|
||||
|
||||
function resolveMatrixQaObservedEventKind(params: { msgtype?: string; type: string }) {
|
||||
if (params.type === "m.reaction") {
|
||||
return "reaction" as const;
|
||||
}
|
||||
if (params.type === "m.room.redaction") {
|
||||
return "redaction" as const;
|
||||
}
|
||||
if (params.type === "m.room.member") {
|
||||
return "membership" as const;
|
||||
}
|
||||
if (params.type === "m.room.message") {
|
||||
return params.msgtype === "m.notice" ? ("notice" as const) : ("message" as const);
|
||||
}
|
||||
return "room-event" as const;
|
||||
}
|
||||
|
||||
export function normalizeMatrixQaObservedEvent(
|
||||
roomId: string,
|
||||
event: MatrixQaRoomEvent,
|
||||
): MatrixQaObservedEvent | null {
|
||||
const eventId = event.event_id?.trim();
|
||||
const type = event.type?.trim();
|
||||
if (!eventId || !type) {
|
||||
return null;
|
||||
}
|
||||
const content = event.content ?? {};
|
||||
const msgtype = typeof content.msgtype === "string" ? content.msgtype : undefined;
|
||||
const relatesToRaw = content["m.relates_to"];
|
||||
const relatesTo =
|
||||
typeof relatesToRaw === "object" && relatesToRaw !== null
|
||||
? (relatesToRaw as Record<string, unknown>)
|
||||
: null;
|
||||
const inReplyToRaw = relatesTo?.["m.in_reply_to"];
|
||||
const inReplyTo =
|
||||
typeof inReplyToRaw === "object" && inReplyToRaw !== null
|
||||
? (inReplyToRaw as Record<string, unknown>)
|
||||
: null;
|
||||
const mentionsRaw = content["m.mentions"];
|
||||
const mentions =
|
||||
typeof mentionsRaw === "object" && mentionsRaw !== null
|
||||
? (mentionsRaw as Record<string, unknown>)
|
||||
: null;
|
||||
const mentionUserIds = normalizeMentionUserIds(mentions?.user_ids);
|
||||
const reactionKey =
|
||||
type === "m.reaction" && typeof relatesTo?.key === "string" ? relatesTo.key : undefined;
|
||||
const reactionEventId =
|
||||
type === "m.reaction" && typeof relatesTo?.event_id === "string"
|
||||
? relatesTo.event_id
|
||||
: undefined;
|
||||
|
||||
return {
|
||||
kind: resolveMatrixQaObservedEventKind({ msgtype, type }),
|
||||
roomId,
|
||||
eventId,
|
||||
sender: typeof event.sender === "string" ? event.sender : undefined,
|
||||
stateKey: typeof event.state_key === "string" ? event.state_key : undefined,
|
||||
type,
|
||||
originServerTs:
|
||||
typeof event.origin_server_ts === "number" ? Math.floor(event.origin_server_ts) : undefined,
|
||||
body: typeof content.body === "string" ? content.body : undefined,
|
||||
formattedBody: typeof content.formatted_body === "string" ? content.formatted_body : undefined,
|
||||
msgtype,
|
||||
membership: typeof content.membership === "string" ? content.membership : undefined,
|
||||
...(relatesTo
|
||||
? {
|
||||
relatesTo: {
|
||||
eventId: typeof relatesTo.event_id === "string" ? relatesTo.event_id : undefined,
|
||||
inReplyToId: typeof inReplyTo?.event_id === "string" ? inReplyTo.event_id : undefined,
|
||||
isFallingBack:
|
||||
typeof relatesTo.is_falling_back === "boolean"
|
||||
? relatesTo.is_falling_back
|
||||
: undefined,
|
||||
relType: typeof relatesTo.rel_type === "string" ? relatesTo.rel_type : undefined,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
...(mentions
|
||||
? {
|
||||
mentions: {
|
||||
...(mentions.room === true ? { room: true } : {}),
|
||||
...(mentionUserIds ? { userIds: mentionUserIds } : {}),
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
...(reactionEventId || reactionKey
|
||||
? {
|
||||
reaction: {
|
||||
...(reactionEventId ? { eventId: reactionEventId } : {}),
|
||||
...(reactionKey ? { key: reactionKey } : {}),
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
};
|
||||
}
|
||||
55
extensions/qa-matrix/src/substrate/request.ts
Normal file
55
extensions/qa-matrix/src/substrate/request.ts
Normal file
@@ -0,0 +1,55 @@
|
||||
export type MatrixQaFetchLike = typeof fetch;
|
||||
|
||||
type MatrixQaRequestResult<T> = {
|
||||
status: number;
|
||||
body: T;
|
||||
};
|
||||
|
||||
export async function requestMatrixJson<T>(params: {
|
||||
accessToken?: string;
|
||||
baseUrl: string;
|
||||
body?: unknown;
|
||||
endpoint: string;
|
||||
fetchImpl: MatrixQaFetchLike;
|
||||
method: "GET" | "POST" | "PUT";
|
||||
okStatuses?: number[];
|
||||
query?: Record<string, string | number | undefined>;
|
||||
timeoutMs?: number;
|
||||
}): Promise<MatrixQaRequestResult<T>> {
|
||||
const url = new URL(params.endpoint, params.baseUrl);
|
||||
for (const [key, value] of Object.entries(params.query ?? {})) {
|
||||
if (value !== undefined) {
|
||||
url.searchParams.set(key, String(value));
|
||||
}
|
||||
}
|
||||
const response = await params.fetchImpl(url, {
|
||||
method: params.method,
|
||||
headers: {
|
||||
accept: "application/json",
|
||||
...(params.body !== undefined ? { "content-type": "application/json" } : {}),
|
||||
...(params.accessToken ? { authorization: `Bearer ${params.accessToken}` } : {}),
|
||||
},
|
||||
...(params.body !== undefined ? { body: JSON.stringify(params.body) } : {}),
|
||||
signal: AbortSignal.timeout(params.timeoutMs ?? 20_000),
|
||||
});
|
||||
let body: unknown = {};
|
||||
try {
|
||||
body = (await response.json()) as unknown;
|
||||
} catch {
|
||||
body = {};
|
||||
}
|
||||
const okStatuses = params.okStatuses ?? [200];
|
||||
if (!okStatuses.includes(response.status)) {
|
||||
const details =
|
||||
typeof body === "object" &&
|
||||
body !== null &&
|
||||
typeof (body as { error?: unknown }).error === "string"
|
||||
? (body as { error: string }).error
|
||||
: `${params.method} ${params.endpoint} failed with status ${response.status}`;
|
||||
throw new Error(details);
|
||||
}
|
||||
return {
|
||||
status: response.status,
|
||||
body: body as T,
|
||||
};
|
||||
}
|
||||
107
extensions/qa-matrix/src/substrate/sync.ts
Normal file
107
extensions/qa-matrix/src/substrate/sync.ts
Normal file
@@ -0,0 +1,107 @@
|
||||
import type { MatrixQaObservedEvent } from "./events.js";
|
||||
import { normalizeMatrixQaObservedEvent, type MatrixQaRoomEvent } from "./events.js";
|
||||
import { requestMatrixJson, type MatrixQaFetchLike } from "./request.js";
|
||||
|
||||
type MatrixQaSyncResponse = {
|
||||
next_batch?: string;
|
||||
rooms?: {
|
||||
join?: Record<
|
||||
string,
|
||||
{
|
||||
timeline?: {
|
||||
events?: MatrixQaRoomEvent[];
|
||||
};
|
||||
}
|
||||
>;
|
||||
};
|
||||
};
|
||||
|
||||
export type MatrixQaRoomEventWaitResult =
|
||||
| {
|
||||
event: MatrixQaObservedEvent;
|
||||
matched: true;
|
||||
since?: string;
|
||||
}
|
||||
| {
|
||||
matched: false;
|
||||
since?: string;
|
||||
};
|
||||
|
||||
type MatrixQaSyncParams = {
|
||||
accessToken?: string;
|
||||
baseUrl: string;
|
||||
fetchImpl: MatrixQaFetchLike;
|
||||
};
|
||||
|
||||
export async function primeMatrixQaRoom(params: MatrixQaSyncParams) {
|
||||
const response = await requestMatrixJson<MatrixQaSyncResponse>({
|
||||
accessToken: params.accessToken,
|
||||
baseUrl: params.baseUrl,
|
||||
endpoint: "/_matrix/client/v3/sync",
|
||||
fetchImpl: params.fetchImpl,
|
||||
method: "GET",
|
||||
query: { timeout: 0 },
|
||||
});
|
||||
return response.body.next_batch?.trim() || undefined;
|
||||
}
|
||||
|
||||
export async function waitForOptionalMatrixQaRoomEvent(
|
||||
params: MatrixQaSyncParams & {
|
||||
observedEvents: MatrixQaObservedEvent[];
|
||||
predicate: (event: MatrixQaObservedEvent) => boolean;
|
||||
roomId: string;
|
||||
since?: string;
|
||||
timeoutMs: number;
|
||||
},
|
||||
): Promise<MatrixQaRoomEventWaitResult> {
|
||||
const startedAt = Date.now();
|
||||
let since = params.since;
|
||||
while (Date.now() - startedAt < params.timeoutMs) {
|
||||
const remainingMs = Math.max(1_000, params.timeoutMs - (Date.now() - startedAt));
|
||||
const response = await requestMatrixJson<MatrixQaSyncResponse>({
|
||||
accessToken: params.accessToken,
|
||||
baseUrl: params.baseUrl,
|
||||
endpoint: "/_matrix/client/v3/sync",
|
||||
fetchImpl: params.fetchImpl,
|
||||
method: "GET",
|
||||
query: {
|
||||
...(since ? { since } : {}),
|
||||
timeout: Math.min(10_000, remainingMs),
|
||||
},
|
||||
timeoutMs: Math.min(15_000, remainingMs + 5_000),
|
||||
});
|
||||
since = response.body.next_batch?.trim() || since;
|
||||
const roomEvents = response.body.rooms?.join?.[params.roomId]?.timeline?.events ?? [];
|
||||
let matchedEvent: MatrixQaObservedEvent | null = null;
|
||||
for (const event of roomEvents) {
|
||||
const normalized = normalizeMatrixQaObservedEvent(params.roomId, event);
|
||||
if (!normalized) {
|
||||
continue;
|
||||
}
|
||||
params.observedEvents.push(normalized);
|
||||
if (matchedEvent === null && params.predicate(normalized)) {
|
||||
matchedEvent = normalized;
|
||||
}
|
||||
}
|
||||
if (matchedEvent) {
|
||||
return { event: matchedEvent, matched: true, since };
|
||||
}
|
||||
}
|
||||
return { matched: false, since };
|
||||
}
|
||||
|
||||
export async function waitForMatrixQaRoomEvent(
|
||||
params: MatrixQaSyncParams & {
|
||||
observedEvents: MatrixQaObservedEvent[];
|
||||
predicate: (event: MatrixQaObservedEvent) => boolean;
|
||||
roomId: string;
|
||||
since?: string;
|
||||
timeoutMs: number;
|
||||
},
|
||||
) {
|
||||
const result = await waitForOptionalMatrixQaRoomEvent(params);
|
||||
if (result.matched) {
|
||||
return { event: result.event, since: result.since };
|
||||
}
|
||||
throw new Error(`timed out after ${params.timeoutMs}ms waiting for Matrix room event`);
|
||||
}
|
||||
Reference in New Issue
Block a user