Files
openclaw/extensions/qa-lab/src/bus-server.ts
Mason Huang edfa074e0f Tests: align pnpm test expectations with main (#67001)
Merged via squash.

Prepared head SHA: 29c8068053
Co-authored-by: hxy91819 <8814856+hxy91819@users.noreply.github.com>
Co-authored-by: hxy91819 <8814856+hxy91819@users.noreply.github.com>
Reviewed-by: @hxy91819
2026-04-15 18:31:23 +08:00

199 lines
6.2 KiB
TypeScript

import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { normalizeAccountId } from "./bus-queries.js";
import type { QaBusState } from "./bus-state.js";
import type {
QaBusCreateThreadInput,
QaBusDeleteMessageInput,
QaBusEditMessageInput,
QaBusInboundMessageInput,
QaBusOutboundMessageInput,
QaBusPollInput,
QaBusReactToMessageInput,
QaBusReadMessageInput,
QaBusSearchMessagesInput,
QaBusWaitForInput,
} from "./runtime-api.js";
async function readJson(req: IncomingMessage): Promise<unknown> {
const chunks: Buffer[] = [];
for await (const chunk of req) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
const text = Buffer.concat(chunks).toString("utf8").trim();
return text ? (JSON.parse(text) as unknown) : {};
}
export function writeJson(res: ServerResponse, statusCode: number, body: unknown) {
const payload = JSON.stringify(body);
res.writeHead(statusCode, {
"content-type": "application/json; charset=utf-8",
"content-length": Buffer.byteLength(payload),
});
res.end(payload);
}
export function writeError(res: ServerResponse, statusCode: number, error: unknown) {
writeJson(res, statusCode, {
error: formatErrorMessage(error),
});
}
export async function closeQaHttpServer(server: Server): Promise<void> {
let forceCloseTimer: NodeJS.Timeout | undefined;
try {
await new Promise<void>((resolve, reject) => {
server.close((error) => (error ? reject(error) : resolve()));
server.closeIdleConnections?.();
forceCloseTimer = setTimeout(() => {
server.closeAllConnections?.();
}, 250);
forceCloseTimer.unref();
});
} finally {
if (forceCloseTimer) {
clearTimeout(forceCloseTimer);
}
}
}
export async function handleQaBusRequest(params: {
req: IncomingMessage;
res: ServerResponse;
state: QaBusState;
}): Promise<boolean> {
const method = params.req.method ?? "GET";
const url = new URL(params.req.url ?? "/", "http://127.0.0.1");
if (method === "GET" && url.pathname === "/health") {
writeJson(params.res, 200, { ok: true });
return true;
}
if (method === "GET" && url.pathname === "/v1/state") {
writeJson(params.res, 200, params.state.getSnapshot());
return true;
}
if (!url.pathname.startsWith("/v1/")) {
return false;
}
if (method !== "POST") {
writeError(params.res, 405, "method not allowed");
return true;
}
const body = (await readJson(params.req)) as Record<string, unknown>;
try {
switch (url.pathname) {
case "/v1/reset":
params.state.reset();
writeJson(params.res, 200, { ok: true });
return true;
case "/v1/inbound/message":
writeJson(params.res, 200, {
message: params.state.addInboundMessage(body as unknown as QaBusInboundMessageInput),
});
return true;
case "/v1/outbound/message":
writeJson(params.res, 200, {
message: params.state.addOutboundMessage(body as unknown as QaBusOutboundMessageInput),
});
return true;
case "/v1/actions/thread-create":
writeJson(params.res, 200, {
thread: params.state.createThread(body as unknown as QaBusCreateThreadInput),
});
return true;
case "/v1/actions/react":
writeJson(params.res, 200, {
message: params.state.reactToMessage(body as unknown as QaBusReactToMessageInput),
});
return true;
case "/v1/actions/edit":
writeJson(params.res, 200, {
message: params.state.editMessage(body as unknown as QaBusEditMessageInput),
});
return true;
case "/v1/actions/delete":
writeJson(params.res, 200, {
message: params.state.deleteMessage(body as unknown as QaBusDeleteMessageInput),
});
return true;
case "/v1/actions/read":
writeJson(params.res, 200, {
message: params.state.readMessage(body as unknown as QaBusReadMessageInput),
});
return true;
case "/v1/actions/search":
writeJson(params.res, 200, {
messages: params.state.searchMessages(body as unknown as QaBusSearchMessagesInput),
});
return true;
case "/v1/poll": {
const input = body as unknown as QaBusPollInput;
const timeoutMs = Math.max(0, Math.min(input.timeoutMs ?? 0, 30_000));
const accountId = normalizeAccountId(input.accountId);
const initial = params.state.poll(input);
if (initial.events.length > 0 || timeoutMs === 0) {
writeJson(params.res, 200, initial);
return true;
}
try {
await params.state.waitForCursorAdvance(input.cursor ?? 0, timeoutMs, (snapshot) => {
return snapshot.events.some(
(event) => event.accountId === accountId && event.cursor > (input.cursor ?? 0),
);
});
} catch {
// timeout ok for long-poll
}
writeJson(params.res, 200, params.state.poll(input));
return true;
}
case "/v1/wait":
writeJson(params.res, 200, {
match: await params.state.waitFor(body as unknown as QaBusWaitForInput),
});
return true;
default:
writeError(params.res, 404, "not found");
return true;
}
} catch (error) {
writeError(params.res, 400, error);
return true;
}
}
export function createQaBusServer(state: QaBusState): Server {
return createServer(async (req, res) => {
const handled = await handleQaBusRequest({ req, res, state });
if (!handled) {
writeError(res, 404, "not found");
}
});
}
export async function startQaBusServer(params: { state: QaBusState; port?: number }) {
const server = createQaBusServer(params.state);
await new Promise<void>((resolve, reject) => {
server.once("error", reject);
server.listen(params.port ?? 0, "127.0.0.1", () => resolve());
});
const address = server.address();
if (!address || typeof address === "string") {
throw new Error("qa-bus failed to bind");
}
return {
server,
port: address.port,
baseUrl: `http://127.0.0.1:${address.port}`,
async stop() {
await closeQaHttpServer(server);
},
};
}