mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-16 19:51:11 +00:00
180 lines
5.6 KiB
TypeScript
180 lines
5.6 KiB
TypeScript
import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http";
|
|
import type { QaBusState } from "./bus-state.js";
|
|
import type {
|
|
QaBusCreateThreadInput,
|
|
QaBusDeleteMessageInput,
|
|
QaBusEditMessageInput,
|
|
QaBusInboundMessageInput,
|
|
QaBusOutboundMessageInput,
|
|
QaBusPollInput,
|
|
QaBusReactToMessageInput,
|
|
QaBusReadMessageInput,
|
|
QaBusSearchMessagesInput,
|
|
QaBusWaitForInput,
|
|
} from "./runtime-api.js";
|
|
|
|
async function readJson(req: IncomingMessage): Promise<unknown> {
|
|
const chunks: Buffer[] = [];
|
|
for await (const chunk of req) {
|
|
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
|
|
}
|
|
const text = Buffer.concat(chunks).toString("utf8").trim();
|
|
return text ? (JSON.parse(text) as unknown) : {};
|
|
}
|
|
|
|
export function writeJson(res: ServerResponse, statusCode: number, body: unknown) {
|
|
const payload = JSON.stringify(body);
|
|
res.writeHead(statusCode, {
|
|
"content-type": "application/json; charset=utf-8",
|
|
"content-length": Buffer.byteLength(payload),
|
|
});
|
|
res.end(payload);
|
|
}
|
|
|
|
export function writeError(res: ServerResponse, statusCode: number, error: unknown) {
|
|
writeJson(res, statusCode, {
|
|
error: error instanceof Error ? error.message : String(error),
|
|
});
|
|
}
|
|
|
|
export async function handleQaBusRequest(params: {
|
|
req: IncomingMessage;
|
|
res: ServerResponse;
|
|
state: QaBusState;
|
|
}): Promise<boolean> {
|
|
const method = params.req.method ?? "GET";
|
|
const url = new URL(params.req.url ?? "/", "http://127.0.0.1");
|
|
|
|
if (method === "GET" && url.pathname === "/health") {
|
|
writeJson(params.res, 200, { ok: true });
|
|
return true;
|
|
}
|
|
|
|
if (method === "GET" && url.pathname === "/v1/state") {
|
|
writeJson(params.res, 200, params.state.getSnapshot());
|
|
return true;
|
|
}
|
|
|
|
if (!url.pathname.startsWith("/v1/")) {
|
|
return false;
|
|
}
|
|
|
|
if (method !== "POST") {
|
|
writeError(params.res, 405, "method not allowed");
|
|
return true;
|
|
}
|
|
|
|
const body = (await readJson(params.req)) as Record<string, unknown>;
|
|
|
|
try {
|
|
switch (url.pathname) {
|
|
case "/v1/reset":
|
|
params.state.reset();
|
|
writeJson(params.res, 200, { ok: true });
|
|
return true;
|
|
case "/v1/inbound/message":
|
|
writeJson(params.res, 200, {
|
|
message: params.state.addInboundMessage(body as unknown as QaBusInboundMessageInput),
|
|
});
|
|
return true;
|
|
case "/v1/outbound/message":
|
|
writeJson(params.res, 200, {
|
|
message: params.state.addOutboundMessage(body as unknown as QaBusOutboundMessageInput),
|
|
});
|
|
return true;
|
|
case "/v1/actions/thread-create":
|
|
writeJson(params.res, 200, {
|
|
thread: params.state.createThread(body as unknown as QaBusCreateThreadInput),
|
|
});
|
|
return true;
|
|
case "/v1/actions/react":
|
|
writeJson(params.res, 200, {
|
|
message: params.state.reactToMessage(body as unknown as QaBusReactToMessageInput),
|
|
});
|
|
return true;
|
|
case "/v1/actions/edit":
|
|
writeJson(params.res, 200, {
|
|
message: params.state.editMessage(body as unknown as QaBusEditMessageInput),
|
|
});
|
|
return true;
|
|
case "/v1/actions/delete":
|
|
writeJson(params.res, 200, {
|
|
message: params.state.deleteMessage(body as unknown as QaBusDeleteMessageInput),
|
|
});
|
|
return true;
|
|
case "/v1/actions/read":
|
|
writeJson(params.res, 200, {
|
|
message: params.state.readMessage(body as unknown as QaBusReadMessageInput),
|
|
});
|
|
return true;
|
|
case "/v1/actions/search":
|
|
writeJson(params.res, 200, {
|
|
messages: params.state.searchMessages(body as unknown as QaBusSearchMessagesInput),
|
|
});
|
|
return true;
|
|
case "/v1/poll": {
|
|
const input = body as unknown as QaBusPollInput;
|
|
const timeoutMs = Math.max(0, Math.min(input.timeoutMs ?? 0, 30_000));
|
|
const initial = params.state.poll(input);
|
|
if (initial.events.length > 0 || timeoutMs === 0) {
|
|
writeJson(params.res, 200, initial);
|
|
return true;
|
|
}
|
|
try {
|
|
await params.state.waitFor({
|
|
kind: "event-kind",
|
|
eventKind: "inbound-message",
|
|
timeoutMs,
|
|
});
|
|
} catch {
|
|
// timeout ok for long-poll
|
|
}
|
|
writeJson(params.res, 200, params.state.poll(input));
|
|
return true;
|
|
}
|
|
case "/v1/wait":
|
|
writeJson(params.res, 200, {
|
|
match: await params.state.waitFor(body as unknown as QaBusWaitForInput),
|
|
});
|
|
return true;
|
|
default:
|
|
writeError(params.res, 404, "not found");
|
|
return true;
|
|
}
|
|
} catch (error) {
|
|
writeError(params.res, 400, error);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
export function createQaBusServer(state: QaBusState): Server {
|
|
return createServer(async (req, res) => {
|
|
const handled = await handleQaBusRequest({ req, res, state });
|
|
if (!handled) {
|
|
writeError(res, 404, "not found");
|
|
}
|
|
});
|
|
}
|
|
|
|
export async function startQaBusServer(params: { state: QaBusState; port?: number }) {
|
|
const server = createQaBusServer(params.state);
|
|
await new Promise<void>((resolve, reject) => {
|
|
server.once("error", reject);
|
|
server.listen(params.port ?? 0, "127.0.0.1", () => resolve());
|
|
});
|
|
const address = server.address();
|
|
if (!address || typeof address === "string") {
|
|
throw new Error("qa-bus failed to bind");
|
|
}
|
|
return {
|
|
server,
|
|
port: address.port,
|
|
baseUrl: `http://127.0.0.1:${address.port}`,
|
|
async stop() {
|
|
await new Promise<void>((resolve, reject) =>
|
|
server.close((error) => (error ? reject(error) : resolve())),
|
|
);
|
|
},
|
|
};
|
|
}
|