Files
openclaw/extensions/qa-lab/src/bus-server.ts
2026-04-05 23:21:56 +01:00

180 lines
5.6 KiB
TypeScript

import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http";
import type { QaBusState } from "./bus-state.js";
import type {
QaBusCreateThreadInput,
QaBusDeleteMessageInput,
QaBusEditMessageInput,
QaBusInboundMessageInput,
QaBusOutboundMessageInput,
QaBusPollInput,
QaBusReactToMessageInput,
QaBusReadMessageInput,
QaBusSearchMessagesInput,
QaBusWaitForInput,
} from "./runtime-api.js";
async function readJson(req: IncomingMessage): Promise<unknown> {
const chunks: Buffer[] = [];
for await (const chunk of req) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
const text = Buffer.concat(chunks).toString("utf8").trim();
return text ? (JSON.parse(text) as unknown) : {};
}
export function writeJson(res: ServerResponse, statusCode: number, body: unknown) {
const payload = JSON.stringify(body);
res.writeHead(statusCode, {
"content-type": "application/json; charset=utf-8",
"content-length": Buffer.byteLength(payload),
});
res.end(payload);
}
export function writeError(res: ServerResponse, statusCode: number, error: unknown) {
writeJson(res, statusCode, {
error: error instanceof Error ? error.message : String(error),
});
}
export async function handleQaBusRequest(params: {
req: IncomingMessage;
res: ServerResponse;
state: QaBusState;
}): Promise<boolean> {
const method = params.req.method ?? "GET";
const url = new URL(params.req.url ?? "/", "http://127.0.0.1");
if (method === "GET" && url.pathname === "/health") {
writeJson(params.res, 200, { ok: true });
return true;
}
if (method === "GET" && url.pathname === "/v1/state") {
writeJson(params.res, 200, params.state.getSnapshot());
return true;
}
if (!url.pathname.startsWith("/v1/")) {
return false;
}
if (method !== "POST") {
writeError(params.res, 405, "method not allowed");
return true;
}
const body = (await readJson(params.req)) as Record<string, unknown>;
try {
switch (url.pathname) {
case "/v1/reset":
params.state.reset();
writeJson(params.res, 200, { ok: true });
return true;
case "/v1/inbound/message":
writeJson(params.res, 200, {
message: params.state.addInboundMessage(body as unknown as QaBusInboundMessageInput),
});
return true;
case "/v1/outbound/message":
writeJson(params.res, 200, {
message: params.state.addOutboundMessage(body as unknown as QaBusOutboundMessageInput),
});
return true;
case "/v1/actions/thread-create":
writeJson(params.res, 200, {
thread: params.state.createThread(body as unknown as QaBusCreateThreadInput),
});
return true;
case "/v1/actions/react":
writeJson(params.res, 200, {
message: params.state.reactToMessage(body as unknown as QaBusReactToMessageInput),
});
return true;
case "/v1/actions/edit":
writeJson(params.res, 200, {
message: params.state.editMessage(body as unknown as QaBusEditMessageInput),
});
return true;
case "/v1/actions/delete":
writeJson(params.res, 200, {
message: params.state.deleteMessage(body as unknown as QaBusDeleteMessageInput),
});
return true;
case "/v1/actions/read":
writeJson(params.res, 200, {
message: params.state.readMessage(body as unknown as QaBusReadMessageInput),
});
return true;
case "/v1/actions/search":
writeJson(params.res, 200, {
messages: params.state.searchMessages(body as unknown as QaBusSearchMessagesInput),
});
return true;
case "/v1/poll": {
const input = body as unknown as QaBusPollInput;
const timeoutMs = Math.max(0, Math.min(input.timeoutMs ?? 0, 30_000));
const initial = params.state.poll(input);
if (initial.events.length > 0 || timeoutMs === 0) {
writeJson(params.res, 200, initial);
return true;
}
try {
await params.state.waitFor({
kind: "event-kind",
eventKind: "inbound-message",
timeoutMs,
});
} catch {
// timeout ok for long-poll
}
writeJson(params.res, 200, params.state.poll(input));
return true;
}
case "/v1/wait":
writeJson(params.res, 200, {
match: await params.state.waitFor(body as unknown as QaBusWaitForInput),
});
return true;
default:
writeError(params.res, 404, "not found");
return true;
}
} catch (error) {
writeError(params.res, 400, error);
return true;
}
}
export function createQaBusServer(state: QaBusState): Server {
return createServer(async (req, res) => {
const handled = await handleQaBusRequest({ req, res, state });
if (!handled) {
writeError(res, 404, "not found");
}
});
}
export async function startQaBusServer(params: { state: QaBusState; port?: number }) {
const server = createQaBusServer(params.state);
await new Promise<void>((resolve, reject) => {
server.once("error", reject);
server.listen(params.port ?? 0, "127.0.0.1", () => resolve());
});
const address = server.address();
if (!address || typeof address === "string") {
throw new Error("qa-bus failed to bind");
}
return {
server,
port: address.port,
baseUrl: `http://127.0.0.1:${address.port}`,
async stop() {
await new Promise<void>((resolve, reject) =>
server.close((error) => (error ? reject(error) : resolve())),
);
},
};
}