fix: tighten qa bus transport edges

This commit is contained in:
Mason Huang
2026-04-15 18:13:28 +08:00
parent f7e56ca815
commit 82d7dd904e
4 changed files with 53 additions and 7 deletions

View File

@@ -1,10 +1,12 @@
import { createServer } from "node:http";
import { afterEach, describe, expect, it } from "vitest";
import { pollQaBus } from "./bus-client.js";
import { getQaBusState, pollQaBus } from "./bus-client.js";
async function startJsonServer(handler: () => { statusCode?: number; body: string }) {
const server = createServer((_req, res) => {
const response = handler();
async function startJsonServer(
handler: (req: { url?: string | undefined }) => { statusCode?: number; body: string },
) {
const server = createServer((req, res) => {
const response = handler({ url: req.url });
res.writeHead(response.statusCode ?? 200, {
"content-type": "application/json; charset=utf-8",
});
@@ -53,4 +55,26 @@ describe("qa-bus client", () => {
}),
).rejects.toThrow(SyntaxError);
});
it("preserves baseUrl path prefixes when composing bus URLs", async () => {
const server = await startJsonServer((req) => ({
statusCode: req.url === "/qa-bus/v1/state" ? 200 : 404,
body:
req.url === "/qa-bus/v1/state"
? JSON.stringify({
cursor: 1,
conversations: [],
threads: [],
messages: [],
events: [],
})
: JSON.stringify({ error: `unexpected path: ${req.url}` }),
}));
stops.push(server.stop);
await expect(getQaBusState(`${server.baseUrl}/qa-bus`)).resolves.toMatchObject({
cursor: 1,
events: [],
});
});
});

View File

@@ -34,13 +34,18 @@ export type {
type JsonResult<T> = Promise<T>;
function buildQaBusUrl(baseUrl: string, path: string): URL {
const normalizedBaseUrl = baseUrl.endsWith("/") ? baseUrl : `${baseUrl}/`;
return new URL(path.replace(/^\/+/, ""), normalizedBaseUrl);
}
async function postJson<T>(
baseUrl: string,
path: string,
body: unknown,
signal?: AbortSignal,
): JsonResult<T> {
const url = new URL(path, baseUrl);
const url = buildQaBusUrl(baseUrl, path);
const payload = JSON.stringify(body);
const client = url.protocol === "https:" ? https : http;
@@ -266,7 +271,7 @@ export async function injectQaBusInboundMessage(params: {
}
export async function getQaBusState(baseUrl: string): Promise<QaBusStateSnapshot> {
const response = await fetch(`${baseUrl}/v1/state`);
const response = await fetch(buildQaBusUrl(baseUrl, "/v1/state"));
if (!response.ok) {
throw new Error(`qa-bus request failed: ${response.status}`);
}

View File

@@ -1,5 +1,6 @@
import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { normalizeAccountId } from "./bus-queries.js";
import type { QaBusState } from "./bus-state.js";
import type {
QaBusCreateThreadInput,
@@ -134,6 +135,7 @@ export async function handleQaBusRequest(params: {
case "/v1/poll": {
const input = body as unknown as QaBusPollInput;
const timeoutMs = Math.max(0, Math.min(input.timeoutMs ?? 0, 30_000));
const accountId = normalizeAccountId(input.accountId);
const initial = params.state.poll(input);
if (initial.events.length > 0 || timeoutMs === 0) {
writeJson(params.res, 200, initial);
@@ -142,7 +144,7 @@ export async function handleQaBusRequest(params: {
try {
await params.state.waitForCursorAdvance(input.cursor ?? 0, timeoutMs, (snapshot) => {
return snapshot.events.some(
(event) => event.accountId === input.accountId && event.cursor > (input.cursor ?? 0),
(event) => event.accountId === accountId && event.cursor > (input.cursor ?? 0),
);
});
} catch {

View File

@@ -121,6 +121,21 @@ describe("qa-bus state", () => {
await expect(pending).resolves.toBeUndefined();
});
it("wakes default-account cursor waits when accountId is omitted", async () => {
const state = createQaBusState();
const pending = state.waitForCursorAdvance(0, 500, (snapshot) => {
return snapshot.events.some((event) => event.accountId === "default" && event.cursor > 0);
});
state.addInboundMessage({
conversation: { id: "target", kind: "direct" },
senderId: "default-user",
text: "matched",
});
await expect(pending).resolves.toBeUndefined();
});
it("preserves inline attachments and lets search match attachment metadata", () => {
const state = createQaBusState();