From e2d96f13af0d5885318d3551855fb597b6bccc5b Mon Sep 17 00:00:00 2001 From: Mason Huang Date: Wed, 15 Apr 2026 13:27:21 +0800 Subject: [PATCH] QA channel: stabilize qa bus polling and tests --- extensions/qa-channel/src/bus-client.ts | 70 ++++++++++++++++++----- extensions/qa-channel/src/channel.test.ts | 24 +++++++- extensions/qa-lab/src/bus-server.ts | 6 +- extensions/qa-lab/src/bus-state.ts | 3 + extensions/qa-lab/src/bus-waiters.ts | 40 ++++++++++++- 5 files changed, 120 insertions(+), 23 deletions(-) diff --git a/extensions/qa-channel/src/bus-client.ts b/extensions/qa-channel/src/bus-client.ts index bd79351962e..7229e749c97 100644 --- a/extensions/qa-channel/src/bus-client.ts +++ b/extensions/qa-channel/src/bus-client.ts @@ -1,3 +1,5 @@ +import http from "node:http"; +import https from "node:https"; import type { QaBusConversation, QaBusEvent, @@ -38,21 +40,61 @@ async function postJson( body: unknown, signal?: AbortSignal, ): JsonResult { - const response = await fetch(`${baseUrl}${path}`, { - method: "POST", - headers: { - "content-type": "application/json", - }, - body: JSON.stringify(body), - signal, + const url = new URL(path, baseUrl); + const payload = JSON.stringify(body); + const client = url.protocol === "https:" ? https : http; + + return await new Promise((resolve, reject) => { + const abortError = () => + Object.assign(new Error("The operation was aborted"), { name: "AbortError" }); + if (signal?.aborted) { + reject(abortError()); + return; + } + + const request = client.request( + url, + { + method: "POST", + headers: { + "content-type": "application/json", + "content-length": Buffer.byteLength(payload), + connection: "close", + }, + }, + (response) => { + const chunks: Buffer[] = []; + response.on("data", (chunk) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + response.on("end", () => { + const text = Buffer.concat(chunks).toString("utf8"); + const parsed = text ? (JSON.parse(text) as T | { error?: string }) : ({} as T); + if ((response.statusCode ?? 500) < 200 || (response.statusCode ?? 500) >= 300) { + const error = + typeof parsed === "object" && parsed && "error" in parsed ? parsed.error : undefined; + reject(new Error(error || `qa-bus request failed: ${response.statusCode ?? 500}`)); + return; + } + resolve(parsed as T); + }); + response.on("error", reject); + }, + ); + + const onAbort = () => { + request.destroy(abortError()); + }; + signal?.addEventListener("abort", onAbort, { once: true }); + request.on("error", (error) => { + signal?.removeEventListener("abort", onAbort); + reject(error); + }); + request.on("close", () => { + signal?.removeEventListener("abort", onAbort); + }); + request.end(payload); }); - const payload = (await response.json()) as T | { error?: string }; - if (!response.ok) { - const error = - typeof payload === "object" && payload && "error" in payload ? payload.error : undefined; - throw new Error(error || `qa-bus request failed: ${response.status}`); - } - return payload as T; } export function normalizeQaTarget(raw: string): string | undefined { diff --git a/extensions/qa-channel/src/channel.test.ts b/extensions/qa-channel/src/channel.test.ts index e72f7ed8d6c..1471b01a8d6 100644 --- a/extensions/qa-channel/src/channel.test.ts +++ b/extensions/qa-channel/src/channel.test.ts @@ -1,10 +1,24 @@ import type { PluginRuntime } from "openclaw/plugin-sdk/core"; -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it } from "vitest"; import { extractToolPayload } from "../../../src/infra/outbound/tool-payload.js"; +import { + resetPluginRuntimeStateForTest, + setActivePluginRegistry, +} from "../../../src/plugins/runtime.js"; +import { createTestRegistry } from "../../../src/test-utils/channel-plugins.js"; import { createStartAccountContext } from "../../../test/helpers/plugins/start-account-context.js"; import { createQaBusState, startQaBusServer } from "../../qa-lab/api.js"; -import { qaChannelPlugin } from "../api.js"; -import { setQaChannelRuntime } from "../api.js"; +import { qaChannelPlugin, setQaChannelRuntime } from "../api.js"; + +afterEach(() => { + resetPluginRuntimeStateForTest(); +}); + +function installQaChannelTestRegistry() { + setActivePluginRegistry( + createTestRegistry([{ pluginId: "qa-channel", plugin: qaChannelPlugin, source: "test" }]), + ); +} function createMockQaRuntime(params?: { onDispatch?: (ctx: Record) => void; @@ -71,6 +85,7 @@ function createMockQaRuntime(params?: { describe("qa-channel plugin", () => { it("roundtrips inbound DM traffic through the qa bus", { timeout: 20_000 }, async () => { + installQaChannelTestRegistry(); const state = createQaBusState(); const bus = await startQaBusServer({ state }); setQaChannelRuntime(createMockQaRuntime()); @@ -120,6 +135,7 @@ describe("qa-channel plugin", () => { }); it("stages inbound image attachments into agent media payload", { timeout: 20_000 }, async () => { + installQaChannelTestRegistry(); const state = createQaBusState(); const bus = await startQaBusServer({ state }); let dispatchedCtx: Record | null = null; @@ -200,6 +216,7 @@ describe("qa-channel plugin", () => { }); it("exposes thread and message actions against the qa bus", async () => { + installQaChannelTestRegistry(); const state = createQaBusState(); const bus = await startQaBusServer({ state }); @@ -306,6 +323,7 @@ describe("qa-channel plugin", () => { }); it("routes the advertised send action to the qa bus", async () => { + installQaChannelTestRegistry(); const state = createQaBusState(); const bus = await startQaBusServer({ state }); diff --git a/extensions/qa-lab/src/bus-server.ts b/extensions/qa-lab/src/bus-server.ts index f5bc37ca459..6a35d864890 100644 --- a/extensions/qa-lab/src/bus-server.ts +++ b/extensions/qa-lab/src/bus-server.ts @@ -140,11 +140,7 @@ export async function handleQaBusRequest(params: { return true; } try { - await params.state.waitFor({ - kind: "event-kind", - eventKind: "inbound-message", - timeoutMs, - }); + await params.state.waitForCursorAdvance(input.cursor ?? 0, timeoutMs); } catch { // timeout ok for long-poll } diff --git a/extensions/qa-lab/src/bus-state.ts b/extensions/qa-lab/src/bus-state.ts index bb5aef491ef..3ed8724fe69 100644 --- a/extensions/qa-lab/src/bus-state.ts +++ b/extensions/qa-lab/src/bus-state.ts @@ -282,6 +282,9 @@ export function createQaBusState() { async waitFor(input: QaBusWaitForInput) { return await waiters.waitFor(input); }, + async waitForCursorAdvance(afterCursor: number, timeoutMs: number) { + return await waiters.waitForCursorAdvance(afterCursor, timeoutMs); + }, }; } diff --git a/extensions/qa-lab/src/bus-waiters.ts b/extensions/qa-lab/src/bus-waiters.ts index 1d6c15671f6..7a37222aa07 100644 --- a/extensions/qa-lab/src/bus-waiters.ts +++ b/extensions/qa-lab/src/bus-waiters.ts @@ -17,6 +17,13 @@ type Waiter = { matcher: (snapshot: QaBusStateSnapshot) => QaBusWaitMatch | null; }; +type CursorWaiter = { + resolve: () => void; + reject: (error: Error) => void; + timer: NodeJS.Timeout; + afterCursor: number; +}; + function createQaBusMatcher( input: QaBusWaitForInput, ): (snapshot: QaBusStateSnapshot) => QaBusWaitMatch | null { @@ -39,6 +46,7 @@ function createQaBusMatcher( export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) { const waiters = new Set(); + const cursorWaiters = new Set(); return { reset(reason = "qa-bus reset") { @@ -47,9 +55,14 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) { waiter.reject(new Error(reason)); } waiters.clear(); + for (const waiter of cursorWaiters) { + clearTimeout(waiter.timer); + waiter.reject(new Error(reason)); + } + cursorWaiters.clear(); }, settle() { - if (waiters.size === 0) { + if (waiters.size === 0 && cursorWaiters.size === 0) { return; } const snapshot = getSnapshot(); @@ -62,6 +75,14 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) { waiters.delete(waiter); waiter.resolve(match); } + for (const waiter of Array.from(cursorWaiters)) { + if (snapshot.cursor <= waiter.afterCursor) { + continue; + } + clearTimeout(waiter.timer); + cursorWaiters.delete(waiter); + waiter.resolve(); + } }, async waitFor(input: QaBusWaitForInput) { const matcher = createQaBusMatcher(input); @@ -83,5 +104,22 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) { waiters.add(waiter); }); }, + async waitForCursorAdvance(afterCursor: number, timeoutMs: number) { + if (getSnapshot().cursor > afterCursor) { + return; + } + return await new Promise((resolve, reject) => { + const waiter: CursorWaiter = { + resolve, + reject, + afterCursor, + timer: setTimeout(() => { + cursorWaiters.delete(waiter); + reject(new Error(`qa-bus wait timeout after ${timeoutMs}ms`)); + }, timeoutMs), + }; + cursorWaiters.add(waiter); + }); + }, }; }