mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-20 21:51:28 +00:00
feat: add qa lab extension
This commit is contained in:
@@ -1,136 +1 @@
|
||||
import type {
|
||||
QaBusConversation,
|
||||
QaBusEvent,
|
||||
QaBusMessage,
|
||||
QaBusPollInput,
|
||||
QaBusPollResult,
|
||||
QaBusReadMessageInput,
|
||||
QaBusSearchMessagesInput,
|
||||
QaBusStateSnapshot,
|
||||
QaBusThread,
|
||||
} from "../../extensions/qa-channel/test-api.js";
|
||||
|
||||
export const DEFAULT_ACCOUNT_ID = "default";
|
||||
|
||||
export function normalizeAccountId(raw?: string): string {
|
||||
const trimmed = raw?.trim();
|
||||
return trimmed || DEFAULT_ACCOUNT_ID;
|
||||
}
|
||||
|
||||
export function normalizeConversationFromTarget(target: string): {
|
||||
conversation: QaBusConversation;
|
||||
threadId?: string;
|
||||
} {
|
||||
const trimmed = target.trim();
|
||||
if (trimmed.startsWith("thread:")) {
|
||||
const rest = trimmed.slice("thread:".length);
|
||||
const slash = rest.indexOf("/");
|
||||
if (slash > 0) {
|
||||
return {
|
||||
conversation: { id: rest.slice(0, slash), kind: "channel" },
|
||||
threadId: rest.slice(slash + 1),
|
||||
};
|
||||
}
|
||||
}
|
||||
if (trimmed.startsWith("channel:")) {
|
||||
return {
|
||||
conversation: { id: trimmed.slice("channel:".length), kind: "channel" },
|
||||
};
|
||||
}
|
||||
if (trimmed.startsWith("dm:")) {
|
||||
return {
|
||||
conversation: { id: trimmed.slice("dm:".length), kind: "direct" },
|
||||
};
|
||||
}
|
||||
return {
|
||||
conversation: { id: trimmed, kind: "direct" },
|
||||
};
|
||||
}
|
||||
|
||||
export function cloneMessage(message: QaBusMessage): QaBusMessage {
|
||||
return {
|
||||
...message,
|
||||
conversation: { ...message.conversation },
|
||||
reactions: message.reactions.map((reaction) => ({ ...reaction })),
|
||||
};
|
||||
}
|
||||
|
||||
export function cloneEvent(event: QaBusEvent): QaBusEvent {
|
||||
switch (event.kind) {
|
||||
case "inbound-message":
|
||||
case "outbound-message":
|
||||
case "message-edited":
|
||||
case "message-deleted":
|
||||
case "reaction-added":
|
||||
return { ...event, message: cloneMessage(event.message) };
|
||||
case "thread-created":
|
||||
return { ...event, thread: { ...event.thread } };
|
||||
}
|
||||
}
|
||||
|
||||
export function buildQaBusSnapshot(params: {
|
||||
cursor: number;
|
||||
conversations: Map<string, QaBusConversation>;
|
||||
threads: Map<string, QaBusThread>;
|
||||
messages: Map<string, QaBusMessage>;
|
||||
events: QaBusEvent[];
|
||||
}): QaBusStateSnapshot {
|
||||
return {
|
||||
cursor: params.cursor,
|
||||
conversations: Array.from(params.conversations.values()).map((conversation) => ({
|
||||
...conversation,
|
||||
})),
|
||||
threads: Array.from(params.threads.values()).map((thread) => ({ ...thread })),
|
||||
messages: Array.from(params.messages.values()).map((message) => cloneMessage(message)),
|
||||
events: params.events.map((event) => cloneEvent(event)),
|
||||
};
|
||||
}
|
||||
|
||||
export function readQaBusMessage(params: {
|
||||
messages: Map<string, QaBusMessage>;
|
||||
input: QaBusReadMessageInput;
|
||||
}) {
|
||||
const message = params.messages.get(params.input.messageId);
|
||||
if (!message) {
|
||||
throw new Error(`qa-bus message not found: ${params.input.messageId}`);
|
||||
}
|
||||
return cloneMessage(message);
|
||||
}
|
||||
|
||||
export function searchQaBusMessages(params: {
|
||||
messages: Map<string, QaBusMessage>;
|
||||
input: QaBusSearchMessagesInput;
|
||||
}) {
|
||||
const accountId = normalizeAccountId(params.input.accountId);
|
||||
const limit = Math.max(1, Math.min(params.input.limit ?? 20, 100));
|
||||
const query = params.input.query?.trim().toLowerCase();
|
||||
return Array.from(params.messages.values())
|
||||
.filter((message) => message.accountId === accountId)
|
||||
.filter((message) =>
|
||||
params.input.conversationId ? message.conversation.id === params.input.conversationId : true,
|
||||
)
|
||||
.filter((message) =>
|
||||
params.input.threadId ? message.threadId === params.input.threadId : true,
|
||||
)
|
||||
.filter((message) => (query ? message.text.toLowerCase().includes(query) : true))
|
||||
.slice(-limit)
|
||||
.map((message) => cloneMessage(message));
|
||||
}
|
||||
|
||||
export function pollQaBusEvents(params: {
|
||||
events: QaBusEvent[];
|
||||
cursor: number;
|
||||
input?: QaBusPollInput;
|
||||
}): QaBusPollResult {
|
||||
const accountId = normalizeAccountId(params.input?.accountId);
|
||||
const startCursor = params.input?.cursor ?? 0;
|
||||
const limit = Math.max(1, Math.min(params.input?.limit ?? 100, 500));
|
||||
const matches = params.events
|
||||
.filter((event) => event.accountId === accountId && event.cursor > startCursor)
|
||||
.slice(0, limit)
|
||||
.map((event) => cloneEvent(event));
|
||||
return {
|
||||
cursor: params.cursor,
|
||||
events: matches,
|
||||
};
|
||||
}
|
||||
export * from "../../extensions/qa-lab/api.js";
|
||||
|
||||
@@ -1,170 +1 @@
|
||||
import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http";
|
||||
import type {
|
||||
QaBusCreateThreadInput,
|
||||
QaBusDeleteMessageInput,
|
||||
QaBusEditMessageInput,
|
||||
QaBusInboundMessageInput,
|
||||
QaBusOutboundMessageInput,
|
||||
QaBusPollInput,
|
||||
QaBusReactToMessageInput,
|
||||
QaBusReadMessageInput,
|
||||
QaBusSearchMessagesInput,
|
||||
QaBusWaitForInput,
|
||||
} from "../../extensions/qa-channel/test-api.js";
|
||||
import type { QaBusState } from "./bus-state.js";
|
||||
|
||||
async function readJson(req: IncomingMessage): Promise<unknown> {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of req) {
|
||||
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
|
||||
}
|
||||
const text = Buffer.concat(chunks).toString("utf8").trim();
|
||||
return text ? (JSON.parse(text) as unknown) : {};
|
||||
}
|
||||
|
||||
function writeJson(res: ServerResponse, statusCode: number, body: unknown) {
|
||||
const payload = JSON.stringify(body);
|
||||
res.writeHead(statusCode, {
|
||||
"content-type": "application/json; charset=utf-8",
|
||||
"content-length": Buffer.byteLength(payload),
|
||||
});
|
||||
res.end(payload);
|
||||
}
|
||||
|
||||
function writeError(res: ServerResponse, statusCode: number, error: unknown) {
|
||||
writeJson(res, statusCode, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
|
||||
async function handleRequest(params: {
|
||||
req: IncomingMessage;
|
||||
res: ServerResponse;
|
||||
state: QaBusState;
|
||||
}) {
|
||||
const method = params.req.method ?? "GET";
|
||||
const url = new URL(params.req.url ?? "/", "http://127.0.0.1");
|
||||
|
||||
if (method === "GET" && url.pathname === "/health") {
|
||||
writeJson(params.res, 200, { ok: true });
|
||||
return;
|
||||
}
|
||||
|
||||
if (method === "GET" && url.pathname === "/v1/state") {
|
||||
writeJson(params.res, 200, params.state.getSnapshot());
|
||||
return;
|
||||
}
|
||||
|
||||
if (method !== "POST") {
|
||||
writeError(params.res, 405, "method not allowed");
|
||||
return;
|
||||
}
|
||||
|
||||
const body = (await readJson(params.req)) as Record<string, unknown>;
|
||||
|
||||
try {
|
||||
switch (url.pathname) {
|
||||
case "/v1/reset":
|
||||
params.state.reset();
|
||||
writeJson(params.res, 200, { ok: true });
|
||||
return;
|
||||
case "/v1/inbound/message":
|
||||
writeJson(params.res, 200, {
|
||||
message: params.state.addInboundMessage(body as unknown as QaBusInboundMessageInput),
|
||||
});
|
||||
return;
|
||||
case "/v1/outbound/message":
|
||||
writeJson(params.res, 200, {
|
||||
message: params.state.addOutboundMessage(body as unknown as QaBusOutboundMessageInput),
|
||||
});
|
||||
return;
|
||||
case "/v1/actions/thread-create":
|
||||
writeJson(params.res, 200, {
|
||||
thread: params.state.createThread(body as unknown as QaBusCreateThreadInput),
|
||||
});
|
||||
return;
|
||||
case "/v1/actions/react":
|
||||
writeJson(params.res, 200, {
|
||||
message: params.state.reactToMessage(body as unknown as QaBusReactToMessageInput),
|
||||
});
|
||||
return;
|
||||
case "/v1/actions/edit":
|
||||
writeJson(params.res, 200, {
|
||||
message: params.state.editMessage(body as unknown as QaBusEditMessageInput),
|
||||
});
|
||||
return;
|
||||
case "/v1/actions/delete":
|
||||
writeJson(params.res, 200, {
|
||||
message: params.state.deleteMessage(body as unknown as QaBusDeleteMessageInput),
|
||||
});
|
||||
return;
|
||||
case "/v1/actions/read":
|
||||
writeJson(params.res, 200, {
|
||||
message: params.state.readMessage(body as unknown as QaBusReadMessageInput),
|
||||
});
|
||||
return;
|
||||
case "/v1/actions/search":
|
||||
writeJson(params.res, 200, {
|
||||
messages: params.state.searchMessages(body as unknown as QaBusSearchMessagesInput),
|
||||
});
|
||||
return;
|
||||
case "/v1/poll": {
|
||||
const input = body as unknown as QaBusPollInput;
|
||||
const timeoutMs = Math.max(0, Math.min(input.timeoutMs ?? 0, 30_000));
|
||||
const initial = params.state.poll(input);
|
||||
if (initial.events.length > 0 || timeoutMs === 0) {
|
||||
writeJson(params.res, 200, initial);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await params.state.waitFor({
|
||||
kind: "event-kind",
|
||||
eventKind: "inbound-message",
|
||||
timeoutMs,
|
||||
});
|
||||
} catch {
|
||||
// timeout is fine for long-poll.
|
||||
}
|
||||
writeJson(params.res, 200, params.state.poll(input));
|
||||
return;
|
||||
}
|
||||
case "/v1/wait":
|
||||
writeJson(params.res, 200, {
|
||||
match: await params.state.waitFor(body as unknown as QaBusWaitForInput),
|
||||
});
|
||||
return;
|
||||
default:
|
||||
writeError(params.res, 404, "not found");
|
||||
}
|
||||
} catch (error) {
|
||||
writeError(params.res, 400, error);
|
||||
}
|
||||
}
|
||||
|
||||
export function createQaBusServer(state: QaBusState): Server {
|
||||
return createServer(async (req, res) => {
|
||||
await handleRequest({ req, res, state });
|
||||
});
|
||||
}
|
||||
|
||||
export async function startQaBusServer(params: { state: QaBusState; port?: number }) {
|
||||
const server = createQaBusServer(params.state);
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.once("error", reject);
|
||||
server.listen(params.port ?? 0, "127.0.0.1", () => resolve());
|
||||
});
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("qa-bus failed to bind");
|
||||
}
|
||||
return {
|
||||
server,
|
||||
port: address.port,
|
||||
baseUrl: `http://127.0.0.1:${address.port}`,
|
||||
async stop() {
|
||||
await new Promise<void>((resolve, reject) =>
|
||||
server.close((error) => (error ? reject(error) : resolve())),
|
||||
);
|
||||
},
|
||||
};
|
||||
}
|
||||
export * from "../../extensions/qa-lab/api.js";
|
||||
|
||||
@@ -62,4 +62,38 @@ describe("qa-bus state", () => {
|
||||
});
|
||||
expect("id" in waited && waited.id).toBe(thread.id);
|
||||
});
|
||||
|
||||
it("replays fresh events after a reset rewinds the cursor", () => {
|
||||
const state = createQaBusState();
|
||||
|
||||
state.addInboundMessage({
|
||||
conversation: { id: "alice", kind: "direct" },
|
||||
senderId: "alice",
|
||||
text: "before reset",
|
||||
});
|
||||
const beforeReset = state.poll({
|
||||
accountId: "default",
|
||||
cursor: 0,
|
||||
});
|
||||
expect(beforeReset.events).toHaveLength(1);
|
||||
|
||||
state.reset();
|
||||
state.addInboundMessage({
|
||||
conversation: { id: "alice", kind: "direct" },
|
||||
senderId: "alice",
|
||||
text: "after reset",
|
||||
});
|
||||
|
||||
const afterReset = state.poll({
|
||||
accountId: "default",
|
||||
cursor: beforeReset.cursor,
|
||||
});
|
||||
expect(afterReset.events).toHaveLength(1);
|
||||
expect(afterReset.events[0]?.kind).toBe("inbound-message");
|
||||
expect(
|
||||
afterReset.events[0] &&
|
||||
"message" in afterReset.events[0] &&
|
||||
afterReset.events[0].message.text,
|
||||
).toBe("after reset");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,256 +1 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import {
|
||||
type QaBusConversation,
|
||||
type QaBusCreateThreadInput,
|
||||
type QaBusDeleteMessageInput,
|
||||
type QaBusEditMessageInput,
|
||||
type QaBusEvent,
|
||||
type QaBusInboundMessageInput,
|
||||
type QaBusMessage,
|
||||
type QaBusOutboundMessageInput,
|
||||
type QaBusPollInput,
|
||||
type QaBusReadMessageInput,
|
||||
type QaBusReactToMessageInput,
|
||||
type QaBusSearchMessagesInput,
|
||||
type QaBusThread,
|
||||
type QaBusWaitForInput,
|
||||
} from "../../extensions/qa-channel/test-api.js";
|
||||
import {
|
||||
buildQaBusSnapshot,
|
||||
cloneMessage,
|
||||
normalizeAccountId,
|
||||
normalizeConversationFromTarget,
|
||||
pollQaBusEvents,
|
||||
readQaBusMessage,
|
||||
searchQaBusMessages,
|
||||
} from "./bus-queries.js";
|
||||
import { createQaBusWaiterStore } from "./bus-waiters.js";
|
||||
|
||||
const DEFAULT_BOT_ID = "openclaw";
|
||||
const DEFAULT_BOT_NAME = "OpenClaw QA";
|
||||
|
||||
type QaBusEventSeed =
|
||||
| Omit<Extract<QaBusEvent, { kind: "inbound-message" }>, "cursor">
|
||||
| Omit<Extract<QaBusEvent, { kind: "outbound-message" }>, "cursor">
|
||||
| Omit<Extract<QaBusEvent, { kind: "thread-created" }>, "cursor">
|
||||
| Omit<Extract<QaBusEvent, { kind: "message-edited" }>, "cursor">
|
||||
| Omit<Extract<QaBusEvent, { kind: "message-deleted" }>, "cursor">
|
||||
| Omit<Extract<QaBusEvent, { kind: "reaction-added" }>, "cursor">;
|
||||
|
||||
export function createQaBusState() {
|
||||
const conversations = new Map<string, QaBusConversation>();
|
||||
const threads = new Map<string, QaBusThread>();
|
||||
const messages = new Map<string, QaBusMessage>();
|
||||
const events: QaBusEvent[] = [];
|
||||
let cursor = 0;
|
||||
const waiters = createQaBusWaiterStore(() =>
|
||||
buildQaBusSnapshot({
|
||||
cursor,
|
||||
conversations,
|
||||
threads,
|
||||
messages,
|
||||
events,
|
||||
}),
|
||||
);
|
||||
|
||||
const pushEvent = (event: QaBusEventSeed | ((cursor: number) => QaBusEventSeed)): QaBusEvent => {
|
||||
cursor += 1;
|
||||
const next = typeof event === "function" ? event(cursor) : event;
|
||||
const finalized = { cursor, ...next } as QaBusEvent;
|
||||
events.push(finalized);
|
||||
waiters.settle();
|
||||
return finalized;
|
||||
};
|
||||
|
||||
const ensureConversation = (conversation: QaBusConversation): QaBusConversation => {
|
||||
const existing = conversations.get(conversation.id);
|
||||
if (existing) {
|
||||
if (!existing.title && conversation.title) {
|
||||
existing.title = conversation.title;
|
||||
}
|
||||
return existing;
|
||||
}
|
||||
const created = { ...conversation };
|
||||
conversations.set(created.id, created);
|
||||
return created;
|
||||
};
|
||||
|
||||
const createMessage = (params: {
|
||||
direction: QaBusMessage["direction"];
|
||||
accountId: string;
|
||||
conversation: QaBusConversation;
|
||||
senderId: string;
|
||||
senderName?: string;
|
||||
text: string;
|
||||
timestamp?: number;
|
||||
threadId?: string;
|
||||
threadTitle?: string;
|
||||
replyToId?: string;
|
||||
}): QaBusMessage => {
|
||||
const conversation = ensureConversation(params.conversation);
|
||||
const message: QaBusMessage = {
|
||||
id: randomUUID(),
|
||||
accountId: params.accountId,
|
||||
direction: params.direction,
|
||||
conversation,
|
||||
senderId: params.senderId,
|
||||
senderName: params.senderName,
|
||||
text: params.text,
|
||||
timestamp: params.timestamp ?? Date.now(),
|
||||
threadId: params.threadId,
|
||||
threadTitle: params.threadTitle,
|
||||
replyToId: params.replyToId,
|
||||
reactions: [],
|
||||
};
|
||||
messages.set(message.id, message);
|
||||
return message;
|
||||
};
|
||||
|
||||
return {
|
||||
reset() {
|
||||
conversations.clear();
|
||||
threads.clear();
|
||||
messages.clear();
|
||||
events.length = 0;
|
||||
cursor = 0;
|
||||
waiters.reset();
|
||||
},
|
||||
getSnapshot() {
|
||||
return buildQaBusSnapshot({
|
||||
cursor,
|
||||
conversations,
|
||||
threads,
|
||||
messages,
|
||||
events,
|
||||
});
|
||||
},
|
||||
addInboundMessage(input: QaBusInboundMessageInput) {
|
||||
const accountId = normalizeAccountId(input.accountId);
|
||||
const message = createMessage({
|
||||
direction: "inbound",
|
||||
accountId,
|
||||
conversation: input.conversation,
|
||||
senderId: input.senderId,
|
||||
senderName: input.senderName,
|
||||
text: input.text,
|
||||
timestamp: input.timestamp,
|
||||
threadId: input.threadId,
|
||||
threadTitle: input.threadTitle,
|
||||
replyToId: input.replyToId,
|
||||
});
|
||||
pushEvent({
|
||||
kind: "inbound-message",
|
||||
accountId,
|
||||
message: cloneMessage(message),
|
||||
});
|
||||
return cloneMessage(message);
|
||||
},
|
||||
addOutboundMessage(input: QaBusOutboundMessageInput) {
|
||||
const accountId = normalizeAccountId(input.accountId);
|
||||
const { conversation, threadId } = normalizeConversationFromTarget(input.to);
|
||||
const message = createMessage({
|
||||
direction: "outbound",
|
||||
accountId,
|
||||
conversation,
|
||||
senderId: input.senderId?.trim() || DEFAULT_BOT_ID,
|
||||
senderName: input.senderName?.trim() || DEFAULT_BOT_NAME,
|
||||
text: input.text,
|
||||
timestamp: input.timestamp,
|
||||
threadId: input.threadId ?? threadId,
|
||||
replyToId: input.replyToId,
|
||||
});
|
||||
pushEvent({
|
||||
kind: "outbound-message",
|
||||
accountId,
|
||||
message: cloneMessage(message),
|
||||
});
|
||||
return cloneMessage(message);
|
||||
},
|
||||
createThread(input: QaBusCreateThreadInput) {
|
||||
const accountId = normalizeAccountId(input.accountId);
|
||||
const thread: QaBusThread = {
|
||||
id: `thread-${randomUUID()}`,
|
||||
accountId,
|
||||
conversationId: input.conversationId,
|
||||
title: input.title,
|
||||
createdAt: input.timestamp ?? Date.now(),
|
||||
createdBy: input.createdBy?.trim() || DEFAULT_BOT_ID,
|
||||
};
|
||||
threads.set(thread.id, thread);
|
||||
ensureConversation({
|
||||
id: input.conversationId,
|
||||
kind: "channel",
|
||||
});
|
||||
pushEvent({
|
||||
kind: "thread-created",
|
||||
accountId,
|
||||
thread: { ...thread },
|
||||
});
|
||||
return { ...thread };
|
||||
},
|
||||
reactToMessage(input: QaBusReactToMessageInput) {
|
||||
const accountId = normalizeAccountId(input.accountId);
|
||||
const message = messages.get(input.messageId);
|
||||
if (!message) {
|
||||
throw new Error(`qa-bus message not found: ${input.messageId}`);
|
||||
}
|
||||
const reaction = {
|
||||
emoji: input.emoji,
|
||||
senderId: input.senderId?.trim() || DEFAULT_BOT_ID,
|
||||
timestamp: input.timestamp ?? Date.now(),
|
||||
};
|
||||
message.reactions.push(reaction);
|
||||
pushEvent({
|
||||
kind: "reaction-added",
|
||||
accountId,
|
||||
message: cloneMessage(message),
|
||||
emoji: reaction.emoji,
|
||||
senderId: reaction.senderId,
|
||||
});
|
||||
return cloneMessage(message);
|
||||
},
|
||||
editMessage(input: QaBusEditMessageInput) {
|
||||
const accountId = normalizeAccountId(input.accountId);
|
||||
const message = messages.get(input.messageId);
|
||||
if (!message) {
|
||||
throw new Error(`qa-bus message not found: ${input.messageId}`);
|
||||
}
|
||||
message.text = input.text;
|
||||
message.editedAt = input.timestamp ?? Date.now();
|
||||
pushEvent({
|
||||
kind: "message-edited",
|
||||
accountId,
|
||||
message: cloneMessage(message),
|
||||
});
|
||||
return cloneMessage(message);
|
||||
},
|
||||
deleteMessage(input: QaBusDeleteMessageInput) {
|
||||
const accountId = normalizeAccountId(input.accountId);
|
||||
const message = messages.get(input.messageId);
|
||||
if (!message) {
|
||||
throw new Error(`qa-bus message not found: ${input.messageId}`);
|
||||
}
|
||||
message.deleted = true;
|
||||
pushEvent({
|
||||
kind: "message-deleted",
|
||||
accountId,
|
||||
message: cloneMessage(message),
|
||||
});
|
||||
return cloneMessage(message);
|
||||
},
|
||||
readMessage(input: QaBusReadMessageInput) {
|
||||
return readQaBusMessage({ messages, input });
|
||||
},
|
||||
searchMessages(input: QaBusSearchMessagesInput) {
|
||||
return searchQaBusMessages({ messages, input });
|
||||
},
|
||||
poll(input: QaBusPollInput = {}) {
|
||||
return pollQaBusEvents({ events, cursor, input });
|
||||
},
|
||||
async waitFor(input: QaBusWaitForInput) {
|
||||
return await waiters.waitFor(input);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export type QaBusState = ReturnType<typeof createQaBusState>;
|
||||
export * from "../../extensions/qa-lab/api.js";
|
||||
|
||||
@@ -1,87 +1 @@
|
||||
import type {
|
||||
QaBusEvent,
|
||||
QaBusMessage,
|
||||
QaBusStateSnapshot,
|
||||
QaBusThread,
|
||||
QaBusWaitForInput,
|
||||
} from "../../extensions/qa-channel/test-api.js";
|
||||
|
||||
export const DEFAULT_WAIT_TIMEOUT_MS = 5_000;
|
||||
|
||||
export type QaBusWaitMatch = QaBusEvent | QaBusMessage | QaBusThread;
|
||||
|
||||
type Waiter = {
|
||||
resolve: (event: QaBusWaitMatch) => void;
|
||||
reject: (error: Error) => void;
|
||||
timer: NodeJS.Timeout;
|
||||
matcher: (snapshot: QaBusStateSnapshot) => QaBusWaitMatch | null;
|
||||
};
|
||||
|
||||
function createQaBusMatcher(
|
||||
input: QaBusWaitForInput,
|
||||
): (snapshot: QaBusStateSnapshot) => QaBusWaitMatch | null {
|
||||
return (snapshot) => {
|
||||
if (input.kind === "event-kind") {
|
||||
return snapshot.events.find((event) => event.kind === input.eventKind) ?? null;
|
||||
}
|
||||
if (input.kind === "thread-id") {
|
||||
return snapshot.threads.find((thread) => thread.id === input.threadId) ?? null;
|
||||
}
|
||||
return (
|
||||
snapshot.messages.find(
|
||||
(message) =>
|
||||
(!input.direction || message.direction === input.direction) &&
|
||||
message.text.includes(input.textIncludes),
|
||||
) ?? null
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
|
||||
const waiters = new Set<Waiter>();
|
||||
|
||||
return {
|
||||
reset(reason = "qa-bus reset") {
|
||||
for (const waiter of waiters) {
|
||||
clearTimeout(waiter.timer);
|
||||
waiter.reject(new Error(reason));
|
||||
}
|
||||
waiters.clear();
|
||||
},
|
||||
settle() {
|
||||
if (waiters.size === 0) {
|
||||
return;
|
||||
}
|
||||
const snapshot = getSnapshot();
|
||||
for (const waiter of Array.from(waiters)) {
|
||||
const match = waiter.matcher(snapshot);
|
||||
if (!match) {
|
||||
continue;
|
||||
}
|
||||
clearTimeout(waiter.timer);
|
||||
waiters.delete(waiter);
|
||||
waiter.resolve(match);
|
||||
}
|
||||
},
|
||||
async waitFor(input: QaBusWaitForInput) {
|
||||
const matcher = createQaBusMatcher(input);
|
||||
const immediate = matcher(getSnapshot());
|
||||
if (immediate) {
|
||||
return immediate;
|
||||
}
|
||||
return await new Promise<QaBusWaitMatch>((resolve, reject) => {
|
||||
const timeoutMs = input.timeoutMs ?? DEFAULT_WAIT_TIMEOUT_MS;
|
||||
const waiter: Waiter = {
|
||||
resolve,
|
||||
reject,
|
||||
matcher,
|
||||
timer: setTimeout(() => {
|
||||
waiters.delete(waiter);
|
||||
reject(new Error(`qa-bus wait timeout after ${timeoutMs}ms`));
|
||||
}, timeoutMs),
|
||||
};
|
||||
waiters.add(waiter);
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
export * from "../../extensions/qa-lab/api.js";
|
||||
|
||||
@@ -1,75 +1 @@
|
||||
import type { PluginRuntime } from "../plugins/runtime/types.js";
|
||||
|
||||
type SessionRecord = {
|
||||
sessionKey: string;
|
||||
body: string;
|
||||
};
|
||||
|
||||
export function createQaRunnerRuntime(): PluginRuntime {
|
||||
const sessions = new Map<string, SessionRecord>();
|
||||
return {
|
||||
channel: {
|
||||
routing: {
|
||||
resolveAgentRoute({
|
||||
accountId,
|
||||
peer,
|
||||
}: {
|
||||
accountId?: string | null;
|
||||
peer?: { kind?: string; id?: string } | null;
|
||||
}) {
|
||||
return {
|
||||
agentId: "qa-agent",
|
||||
accountId: accountId ?? "default",
|
||||
sessionKey: `qa-agent:${peer?.kind ?? "direct"}:${peer?.id ?? "default"}`,
|
||||
mainSessionKey: "qa-agent:main",
|
||||
lastRoutePolicy: "session",
|
||||
matchedBy: "default",
|
||||
channel: "qa-channel",
|
||||
};
|
||||
},
|
||||
},
|
||||
session: {
|
||||
resolveStorePath(_store: string | undefined, { agentId }: { agentId: string }) {
|
||||
return agentId;
|
||||
},
|
||||
readSessionUpdatedAt({ sessionKey }: { sessionKey: string }) {
|
||||
return sessions.has(sessionKey) ? Date.now() : undefined;
|
||||
},
|
||||
recordInboundSession({
|
||||
sessionKey,
|
||||
ctx,
|
||||
}: {
|
||||
sessionKey: string;
|
||||
ctx: { BodyForAgent?: string; Body?: string };
|
||||
}) {
|
||||
sessions.set(sessionKey, {
|
||||
sessionKey,
|
||||
body: String(ctx.BodyForAgent ?? ctx.Body ?? ""),
|
||||
});
|
||||
},
|
||||
},
|
||||
reply: {
|
||||
resolveEnvelopeFormatOptions() {
|
||||
return {};
|
||||
},
|
||||
formatAgentEnvelope({ body }: { body: string }) {
|
||||
return body;
|
||||
},
|
||||
finalizeInboundContext(ctx: Record<string, unknown>) {
|
||||
return ctx as typeof ctx & { CommandAuthorized: boolean };
|
||||
},
|
||||
async dispatchReplyWithBufferedBlockDispatcher({
|
||||
ctx,
|
||||
dispatcherOptions,
|
||||
}: {
|
||||
ctx: { BodyForAgent?: string; Body?: string };
|
||||
dispatcherOptions: { deliver: (payload: { text: string }) => Promise<void> };
|
||||
}) {
|
||||
await dispatcherOptions.deliver({
|
||||
text: `qa-echo: ${String(ctx.BodyForAgent ?? ctx.Body ?? "")}`,
|
||||
});
|
||||
},
|
||||
},
|
||||
},
|
||||
} as unknown as PluginRuntime;
|
||||
}
|
||||
export * from "../../extensions/qa-lab/api.js";
|
||||
|
||||
@@ -1,91 +1 @@
|
||||
export type QaReportCheck = {
|
||||
name: string;
|
||||
status: "pass" | "fail" | "skip";
|
||||
details?: string;
|
||||
};
|
||||
|
||||
export type QaReportScenario = {
|
||||
name: string;
|
||||
status: "pass" | "fail" | "skip";
|
||||
details?: string;
|
||||
steps?: QaReportCheck[];
|
||||
};
|
||||
|
||||
export function renderQaMarkdownReport(params: {
|
||||
title: string;
|
||||
startedAt: Date;
|
||||
finishedAt: Date;
|
||||
checks?: QaReportCheck[];
|
||||
scenarios?: QaReportScenario[];
|
||||
timeline?: string[];
|
||||
notes?: string[];
|
||||
}) {
|
||||
const checks = params.checks ?? [];
|
||||
const scenarios = params.scenarios ?? [];
|
||||
const passCount =
|
||||
checks.filter((check) => check.status === "pass").length +
|
||||
scenarios.filter((scenario) => scenario.status === "pass").length;
|
||||
const failCount =
|
||||
checks.filter((check) => check.status === "fail").length +
|
||||
scenarios.filter((scenario) => scenario.status === "fail").length;
|
||||
|
||||
const lines = [
|
||||
`# ${params.title}`,
|
||||
"",
|
||||
`- Started: ${params.startedAt.toISOString()}`,
|
||||
`- Finished: ${params.finishedAt.toISOString()}`,
|
||||
`- Duration ms: ${params.finishedAt.getTime() - params.startedAt.getTime()}`,
|
||||
`- Passed: ${passCount}`,
|
||||
`- Failed: ${failCount}`,
|
||||
"",
|
||||
];
|
||||
|
||||
if (checks.length > 0) {
|
||||
lines.push("## Checks", "");
|
||||
for (const check of checks) {
|
||||
lines.push(`- [${check.status === "pass" ? "x" : " "}] ${check.name}`);
|
||||
if (check.details) {
|
||||
lines.push(` - ${check.details}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (scenarios.length > 0) {
|
||||
lines.push("", "## Scenarios", "");
|
||||
for (const scenario of scenarios) {
|
||||
lines.push(`### ${scenario.name}`);
|
||||
lines.push("");
|
||||
lines.push(`- Status: ${scenario.status}`);
|
||||
if (scenario.details) {
|
||||
lines.push(`- Details: ${scenario.details}`);
|
||||
}
|
||||
if (scenario.steps?.length) {
|
||||
lines.push("- Steps:");
|
||||
for (const step of scenario.steps) {
|
||||
lines.push(` - [${step.status === "pass" ? "x" : " "}] ${step.name}`);
|
||||
if (step.details) {
|
||||
lines.push(` - ${step.details}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
lines.push("");
|
||||
}
|
||||
}
|
||||
|
||||
if (params.timeline && params.timeline.length > 0) {
|
||||
lines.push("## Timeline", "");
|
||||
for (const item of params.timeline) {
|
||||
lines.push(`- ${item}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (params.notes && params.notes.length > 0) {
|
||||
lines.push("", "## Notes", "");
|
||||
for (const note of params.notes) {
|
||||
lines.push(`- ${note}`);
|
||||
}
|
||||
}
|
||||
|
||||
lines.push("");
|
||||
return lines.join("\n");
|
||||
}
|
||||
export * from "../../extensions/qa-lab/api.js";
|
||||
|
||||
@@ -1,124 +1 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { qaChannelPlugin, setQaChannelRuntime } from "../../extensions/qa-channel/api.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { startQaBusServer } from "./bus-server.js";
|
||||
import { createQaBusState } from "./bus-state.js";
|
||||
import { createQaRunnerRuntime } from "./harness-runtime.js";
|
||||
import { renderQaMarkdownReport } from "./report.js";
|
||||
import { runQaScenario } from "./scenario.js";
|
||||
import { createQaSelfCheckScenario } from "./self-check-scenario.js";
|
||||
|
||||
export async function runQaE2eSelfCheck(params?: { outputPath?: string }) {
|
||||
const startedAt = new Date();
|
||||
const state = createQaBusState();
|
||||
const bus = await startQaBusServer({ state });
|
||||
const runtime = createQaRunnerRuntime();
|
||||
setQaChannelRuntime(runtime);
|
||||
|
||||
const cfg: OpenClawConfig = {
|
||||
channels: {
|
||||
"qa-channel": {
|
||||
enabled: true,
|
||||
baseUrl: bus.baseUrl,
|
||||
botUserId: "openclaw",
|
||||
botDisplayName: "OpenClaw QA",
|
||||
allowFrom: ["*"],
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const account = qaChannelPlugin.config.resolveAccount(cfg, "default");
|
||||
const abort = new AbortController();
|
||||
|
||||
const task = qaChannelPlugin.gateway?.startAccount?.({
|
||||
accountId: account.accountId,
|
||||
account,
|
||||
cfg,
|
||||
runtime: {
|
||||
log: () => undefined,
|
||||
error: () => undefined,
|
||||
exit: () => undefined,
|
||||
},
|
||||
abortSignal: abort.signal,
|
||||
log: {
|
||||
info: () => undefined,
|
||||
warn: () => undefined,
|
||||
error: () => undefined,
|
||||
debug: () => undefined,
|
||||
},
|
||||
getStatus: () => ({
|
||||
accountId: account.accountId,
|
||||
configured: true,
|
||||
enabled: true,
|
||||
running: true,
|
||||
}),
|
||||
setStatus: () => undefined,
|
||||
});
|
||||
|
||||
const checks: Array<{ name: string; status: "pass" | "fail"; details?: string }> = [];
|
||||
let scenarioResult: Awaited<ReturnType<typeof runQaScenario>> | undefined;
|
||||
|
||||
try {
|
||||
scenarioResult = await runQaScenario(createQaSelfCheckScenario(cfg), { state });
|
||||
checks.push({
|
||||
name: "QA self-check scenario",
|
||||
status: scenarioResult.status,
|
||||
details: `${scenarioResult.steps.filter((step) => step.status === "pass").length}/${scenarioResult.steps.length} steps passed`,
|
||||
});
|
||||
} catch (error) {
|
||||
checks.push({
|
||||
name: "QA self-check",
|
||||
status: "fail",
|
||||
details: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
} finally {
|
||||
abort.abort();
|
||||
await task;
|
||||
await bus.stop();
|
||||
}
|
||||
|
||||
const finishedAt = new Date();
|
||||
const snapshot = state.getSnapshot();
|
||||
const timeline = snapshot.events.map((event) => {
|
||||
switch (event.kind) {
|
||||
case "thread-created":
|
||||
return `${event.cursor}. ${event.kind} ${event.thread.conversationId}/${event.thread.id}`;
|
||||
case "reaction-added":
|
||||
return `${event.cursor}. ${event.kind} ${event.message.id} ${event.emoji}`;
|
||||
default:
|
||||
return `${event.cursor}. ${event.kind} ${"message" in event ? event.message.id : ""}`.trim();
|
||||
}
|
||||
});
|
||||
const report = renderQaMarkdownReport({
|
||||
title: "OpenClaw QA E2E Self-Check",
|
||||
startedAt,
|
||||
finishedAt,
|
||||
checks,
|
||||
scenarios: scenarioResult
|
||||
? [
|
||||
{
|
||||
name: scenarioResult.name,
|
||||
status: scenarioResult.status,
|
||||
details: scenarioResult.details,
|
||||
steps: scenarioResult.steps,
|
||||
},
|
||||
]
|
||||
: undefined,
|
||||
timeline,
|
||||
notes: [
|
||||
"Vertical slice only: bus + bundled qa-channel + in-process runner runtime.",
|
||||
"Full Docker orchestration and model/provider matrix remain follow-up work.",
|
||||
],
|
||||
});
|
||||
|
||||
const outputPath =
|
||||
params?.outputPath ?? path.join(process.cwd(), ".artifacts", "qa-e2e", "self-check.md");
|
||||
await fs.mkdir(path.dirname(outputPath), { recursive: true });
|
||||
await fs.writeFile(outputPath, report, "utf8");
|
||||
return {
|
||||
outputPath,
|
||||
report,
|
||||
checks,
|
||||
};
|
||||
}
|
||||
export * from "../../extensions/qa-lab/api.js";
|
||||
|
||||
@@ -1,65 +1 @@
|
||||
import type { QaBusState } from "./bus-state.js";
|
||||
|
||||
export type QaScenarioStepContext = {
|
||||
state: QaBusState;
|
||||
};
|
||||
|
||||
export type QaScenarioStep = {
|
||||
name: string;
|
||||
run: (ctx: QaScenarioStepContext) => Promise<string | void>;
|
||||
};
|
||||
|
||||
export type QaScenarioDefinition = {
|
||||
name: string;
|
||||
steps: QaScenarioStep[];
|
||||
};
|
||||
|
||||
export type QaScenarioStepResult = {
|
||||
name: string;
|
||||
status: "pass" | "fail";
|
||||
details?: string;
|
||||
};
|
||||
|
||||
export type QaScenarioResult = {
|
||||
name: string;
|
||||
status: "pass" | "fail";
|
||||
steps: QaScenarioStepResult[];
|
||||
details?: string;
|
||||
};
|
||||
|
||||
export async function runQaScenario(
|
||||
definition: QaScenarioDefinition,
|
||||
ctx: QaScenarioStepContext,
|
||||
): Promise<QaScenarioResult> {
|
||||
const steps: QaScenarioStepResult[] = [];
|
||||
|
||||
for (const step of definition.steps) {
|
||||
try {
|
||||
const details = await step.run(ctx);
|
||||
steps.push({
|
||||
name: step.name,
|
||||
status: "pass",
|
||||
...(details ? { details } : {}),
|
||||
});
|
||||
} catch (error) {
|
||||
const details = error instanceof Error ? error.message : String(error);
|
||||
steps.push({
|
||||
name: step.name,
|
||||
status: "fail",
|
||||
details,
|
||||
});
|
||||
return {
|
||||
name: definition.name,
|
||||
status: "fail",
|
||||
steps,
|
||||
details,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
name: definition.name,
|
||||
status: "pass",
|
||||
steps,
|
||||
};
|
||||
}
|
||||
export * from "../../extensions/qa-lab/api.js";
|
||||
|
||||
@@ -1,122 +1 @@
|
||||
import { qaChannelPlugin } from "../../extensions/qa-channel/api.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { extractToolPayload } from "../infra/outbound/tool-payload.js";
|
||||
import type { QaScenarioDefinition } from "./scenario.js";
|
||||
|
||||
export function createQaSelfCheckScenario(cfg: OpenClawConfig): QaScenarioDefinition {
|
||||
return {
|
||||
name: "Synthetic Slack-class roundtrip",
|
||||
steps: [
|
||||
{
|
||||
name: "DM echo roundtrip",
|
||||
async run({ state }) {
|
||||
state.addInboundMessage({
|
||||
conversation: { id: "alice", kind: "direct" },
|
||||
senderId: "alice",
|
||||
senderName: "Alice",
|
||||
text: "hello from qa",
|
||||
});
|
||||
await state.waitFor({
|
||||
kind: "message-text",
|
||||
textIncludes: "qa-echo: hello from qa",
|
||||
direction: "outbound",
|
||||
timeoutMs: 5_000,
|
||||
});
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Thread create and threaded echo",
|
||||
async run({ state }) {
|
||||
const threadResult = await qaChannelPlugin.actions?.handleAction?.({
|
||||
channel: "qa-channel",
|
||||
action: "thread-create",
|
||||
cfg,
|
||||
accountId: "default",
|
||||
params: {
|
||||
channelId: "qa-room",
|
||||
title: "QA thread",
|
||||
},
|
||||
});
|
||||
const threadPayload = (threadResult ? extractToolPayload(threadResult) : undefined) as
|
||||
| { thread?: { id?: string } }
|
||||
| undefined;
|
||||
const threadId = threadPayload?.thread?.id;
|
||||
if (!threadId) {
|
||||
throw new Error("thread-create did not return thread id");
|
||||
}
|
||||
|
||||
state.addInboundMessage({
|
||||
conversation: { id: "qa-room", kind: "channel", title: "QA Room" },
|
||||
senderId: "alice",
|
||||
senderName: "Alice",
|
||||
text: "inside thread",
|
||||
threadId,
|
||||
threadTitle: "QA thread",
|
||||
});
|
||||
await state.waitFor({
|
||||
kind: "message-text",
|
||||
textIncludes: "qa-echo: inside thread",
|
||||
direction: "outbound",
|
||||
timeoutMs: 5_000,
|
||||
});
|
||||
return threadId;
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Reaction, edit, delete lifecycle",
|
||||
async run({ state }) {
|
||||
const outbound = state
|
||||
.searchMessages({ query: "qa-echo: inside thread", conversationId: "qa-room" })
|
||||
.at(-1);
|
||||
if (!outbound) {
|
||||
throw new Error("threaded outbound message not found");
|
||||
}
|
||||
|
||||
await qaChannelPlugin.actions?.handleAction?.({
|
||||
channel: "qa-channel",
|
||||
action: "react",
|
||||
cfg,
|
||||
accountId: "default",
|
||||
params: {
|
||||
messageId: outbound.id,
|
||||
emoji: "white_check_mark",
|
||||
},
|
||||
});
|
||||
const reacted = state.readMessage({ messageId: outbound.id });
|
||||
if (reacted.reactions.length === 0) {
|
||||
throw new Error("reaction not recorded");
|
||||
}
|
||||
|
||||
await qaChannelPlugin.actions?.handleAction?.({
|
||||
channel: "qa-channel",
|
||||
action: "edit",
|
||||
cfg,
|
||||
accountId: "default",
|
||||
params: {
|
||||
messageId: outbound.id,
|
||||
text: "qa-echo: inside thread (edited)",
|
||||
},
|
||||
});
|
||||
const edited = state.readMessage({ messageId: outbound.id });
|
||||
if (!edited.text.includes("(edited)")) {
|
||||
throw new Error("edit not recorded");
|
||||
}
|
||||
|
||||
await qaChannelPlugin.actions?.handleAction?.({
|
||||
channel: "qa-channel",
|
||||
action: "delete",
|
||||
cfg,
|
||||
accountId: "default",
|
||||
params: {
|
||||
messageId: outbound.id,
|
||||
},
|
||||
});
|
||||
const deleted = state.readMessage({ messageId: outbound.id });
|
||||
if (!deleted.deleted) {
|
||||
throw new Error("delete not recorded");
|
||||
}
|
||||
},
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
export * from "../../extensions/qa-lab/api.js";
|
||||
|
||||
Reference in New Issue
Block a user