QA channel: stabilize qa bus polling and tests

This commit is contained in:
Mason Huang
2026-04-15 13:27:21 +08:00
parent 8dd1abedec
commit e2d96f13af
5 changed files with 120 additions and 23 deletions

View File

@@ -1,3 +1,5 @@
import http from "node:http";
import https from "node:https";
import type {
QaBusConversation,
QaBusEvent,
@@ -38,21 +40,61 @@ async function postJson<T>(
body: unknown,
signal?: AbortSignal,
): JsonResult<T> {
const response = await fetch(`${baseUrl}${path}`, {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify(body),
signal,
const url = new URL(path, baseUrl);
const payload = JSON.stringify(body);
const client = url.protocol === "https:" ? https : http;
return await new Promise<T>((resolve, reject) => {
const abortError = () =>
Object.assign(new Error("The operation was aborted"), { name: "AbortError" });
if (signal?.aborted) {
reject(abortError());
return;
}
const request = client.request(
url,
{
method: "POST",
headers: {
"content-type": "application/json",
"content-length": Buffer.byteLength(payload),
connection: "close",
},
},
(response) => {
const chunks: Buffer[] = [];
response.on("data", (chunk) => {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
});
response.on("end", () => {
const text = Buffer.concat(chunks).toString("utf8");
const parsed = text ? (JSON.parse(text) as T | { error?: string }) : ({} as T);
if ((response.statusCode ?? 500) < 200 || (response.statusCode ?? 500) >= 300) {
const error =
typeof parsed === "object" && parsed && "error" in parsed ? parsed.error : undefined;
reject(new Error(error || `qa-bus request failed: ${response.statusCode ?? 500}`));
return;
}
resolve(parsed as T);
});
response.on("error", reject);
},
);
const onAbort = () => {
request.destroy(abortError());
};
signal?.addEventListener("abort", onAbort, { once: true });
request.on("error", (error) => {
signal?.removeEventListener("abort", onAbort);
reject(error);
});
request.on("close", () => {
signal?.removeEventListener("abort", onAbort);
});
request.end(payload);
});
const payload = (await response.json()) as T | { error?: string };
if (!response.ok) {
const error =
typeof payload === "object" && payload && "error" in payload ? payload.error : undefined;
throw new Error(error || `qa-bus request failed: ${response.status}`);
}
return payload as T;
}
export function normalizeQaTarget(raw: string): string | undefined {

View File

@@ -1,10 +1,24 @@
import type { PluginRuntime } from "openclaw/plugin-sdk/core";
import { describe, expect, it } from "vitest";
import { afterEach, describe, expect, it } from "vitest";
import { extractToolPayload } from "../../../src/infra/outbound/tool-payload.js";
import {
resetPluginRuntimeStateForTest,
setActivePluginRegistry,
} from "../../../src/plugins/runtime.js";
import { createTestRegistry } from "../../../src/test-utils/channel-plugins.js";
import { createStartAccountContext } from "../../../test/helpers/plugins/start-account-context.js";
import { createQaBusState, startQaBusServer } from "../../qa-lab/api.js";
import { qaChannelPlugin } from "../api.js";
import { setQaChannelRuntime } from "../api.js";
import { qaChannelPlugin, setQaChannelRuntime } from "../api.js";
afterEach(() => {
resetPluginRuntimeStateForTest();
});
function installQaChannelTestRegistry() {
setActivePluginRegistry(
createTestRegistry([{ pluginId: "qa-channel", plugin: qaChannelPlugin, source: "test" }]),
);
}
function createMockQaRuntime(params?: {
onDispatch?: (ctx: Record<string, unknown>) => void;
@@ -71,6 +85,7 @@ function createMockQaRuntime(params?: {
describe("qa-channel plugin", () => {
it("roundtrips inbound DM traffic through the qa bus", { timeout: 20_000 }, async () => {
installQaChannelTestRegistry();
const state = createQaBusState();
const bus = await startQaBusServer({ state });
setQaChannelRuntime(createMockQaRuntime());
@@ -120,6 +135,7 @@ describe("qa-channel plugin", () => {
});
it("stages inbound image attachments into agent media payload", { timeout: 20_000 }, async () => {
installQaChannelTestRegistry();
const state = createQaBusState();
const bus = await startQaBusServer({ state });
let dispatchedCtx: Record<string, unknown> | null = null;
@@ -200,6 +216,7 @@ describe("qa-channel plugin", () => {
});
it("exposes thread and message actions against the qa bus", async () => {
installQaChannelTestRegistry();
const state = createQaBusState();
const bus = await startQaBusServer({ state });
@@ -306,6 +323,7 @@ describe("qa-channel plugin", () => {
});
it("routes the advertised send action to the qa bus", async () => {
installQaChannelTestRegistry();
const state = createQaBusState();
const bus = await startQaBusServer({ state });

View File

@@ -140,11 +140,7 @@ export async function handleQaBusRequest(params: {
return true;
}
try {
await params.state.waitFor({
kind: "event-kind",
eventKind: "inbound-message",
timeoutMs,
});
await params.state.waitForCursorAdvance(input.cursor ?? 0, timeoutMs);
} catch {
// timeout ok for long-poll
}

View File

@@ -282,6 +282,9 @@ export function createQaBusState() {
async waitFor(input: QaBusWaitForInput) {
return await waiters.waitFor(input);
},
async waitForCursorAdvance(afterCursor: number, timeoutMs: number) {
return await waiters.waitForCursorAdvance(afterCursor, timeoutMs);
},
};
}

View File

@@ -17,6 +17,13 @@ type Waiter = {
matcher: (snapshot: QaBusStateSnapshot) => QaBusWaitMatch | null;
};
type CursorWaiter = {
resolve: () => void;
reject: (error: Error) => void;
timer: NodeJS.Timeout;
afterCursor: number;
};
function createQaBusMatcher(
input: QaBusWaitForInput,
): (snapshot: QaBusStateSnapshot) => QaBusWaitMatch | null {
@@ -39,6 +46,7 @@ function createQaBusMatcher(
export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
const waiters = new Set<Waiter>();
const cursorWaiters = new Set<CursorWaiter>();
return {
reset(reason = "qa-bus reset") {
@@ -47,9 +55,14 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
waiter.reject(new Error(reason));
}
waiters.clear();
for (const waiter of cursorWaiters) {
clearTimeout(waiter.timer);
waiter.reject(new Error(reason));
}
cursorWaiters.clear();
},
settle() {
if (waiters.size === 0) {
if (waiters.size === 0 && cursorWaiters.size === 0) {
return;
}
const snapshot = getSnapshot();
@@ -62,6 +75,14 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
waiters.delete(waiter);
waiter.resolve(match);
}
for (const waiter of Array.from(cursorWaiters)) {
if (snapshot.cursor <= waiter.afterCursor) {
continue;
}
clearTimeout(waiter.timer);
cursorWaiters.delete(waiter);
waiter.resolve();
}
},
async waitFor(input: QaBusWaitForInput) {
const matcher = createQaBusMatcher(input);
@@ -83,5 +104,22 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
waiters.add(waiter);
});
},
async waitForCursorAdvance(afterCursor: number, timeoutMs: number) {
if (getSnapshot().cursor > afterCursor) {
return;
}
return await new Promise<void>((resolve, reject) => {
const waiter: CursorWaiter = {
resolve,
reject,
afterCursor,
timer: setTimeout(() => {
cursorWaiters.delete(waiter);
reject(new Error(`qa-bus wait timeout after ${timeoutMs}ms`));
}, timeoutMs),
};
cursorWaiters.add(waiter);
});
},
};
}