mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 19:00:45 +00:00
fix: harden qa bus polling
This commit is contained in:
56
extensions/qa-channel/src/bus-client.test.ts
Normal file
56
extensions/qa-channel/src/bus-client.test.ts
Normal file
@@ -0,0 +1,56 @@
|
||||
import { createServer } from "node:http";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { pollQaBus } from "./bus-client.js";
|
||||
|
||||
async function startJsonServer(handler: () => { statusCode?: number; body: string }) {
|
||||
const server = createServer((_req, res) => {
|
||||
const response = handler();
|
||||
res.writeHead(response.statusCode ?? 200, {
|
||||
"content-type": "application/json; charset=utf-8",
|
||||
});
|
||||
res.end(response.body);
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.once("error", reject);
|
||||
server.listen(0, "127.0.0.1", () => resolve());
|
||||
});
|
||||
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("test server failed to bind");
|
||||
}
|
||||
|
||||
return {
|
||||
baseUrl: `http://127.0.0.1:${address.port}`,
|
||||
async stop() {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.close((error) => (error ? reject(error) : resolve()));
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe("qa-bus client", () => {
|
||||
const stops: Array<() => Promise<void>> = [];
|
||||
|
||||
afterEach(async () => {
|
||||
await Promise.all(stops.splice(0).map((stop) => stop()));
|
||||
});
|
||||
|
||||
it("rejects malformed JSON responses instead of throwing from the stream callback", async () => {
|
||||
const server = await startJsonServer(() => ({
|
||||
body: '{"cursor":1,"events":[',
|
||||
}));
|
||||
stops.push(server.stop);
|
||||
|
||||
await expect(
|
||||
pollQaBus({
|
||||
baseUrl: server.baseUrl,
|
||||
accountId: "acct-a",
|
||||
cursor: 0,
|
||||
timeoutMs: 0,
|
||||
}),
|
||||
).rejects.toThrow(SyntaxError);
|
||||
});
|
||||
});
|
||||
@@ -69,7 +69,13 @@ async function postJson<T>(
|
||||
});
|
||||
response.on("end", () => {
|
||||
const text = Buffer.concat(chunks).toString("utf8");
|
||||
const parsed = text ? (JSON.parse(text) as T | { error?: string }) : ({} as T);
|
||||
let parsed: T | { error?: string };
|
||||
try {
|
||||
parsed = text ? (JSON.parse(text) as T | { error?: string }) : ({} as T);
|
||||
} catch (error) {
|
||||
reject(error);
|
||||
return;
|
||||
}
|
||||
if ((response.statusCode ?? 500) < 200 || (response.statusCode ?? 500) >= 300) {
|
||||
const error =
|
||||
typeof parsed === "object" && parsed && "error" in parsed ? parsed.error : undefined;
|
||||
|
||||
@@ -140,7 +140,11 @@ export async function handleQaBusRequest(params: {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
await params.state.waitForCursorAdvance(input.cursor ?? 0, timeoutMs);
|
||||
await params.state.waitForCursorAdvance(input.cursor ?? 0, timeoutMs, (snapshot) => {
|
||||
return snapshot.events.some(
|
||||
(event) => event.accountId === input.accountId && event.cursor > (input.cursor ?? 0),
|
||||
);
|
||||
});
|
||||
} catch {
|
||||
// timeout ok for long-poll
|
||||
}
|
||||
|
||||
@@ -92,6 +92,35 @@ describe("qa-bus state", () => {
|
||||
).rejects.toThrow("qa-bus wait timeout");
|
||||
});
|
||||
|
||||
it("keeps account-scoped cursor waits blocked on unrelated account traffic", async () => {
|
||||
const state = createQaBusState();
|
||||
const pending = state.waitForCursorAdvance(0, 500, (snapshot) => {
|
||||
return snapshot.events.some((event) => event.accountId === "acct-a" && event.cursor > 0);
|
||||
});
|
||||
|
||||
state.addInboundMessage({
|
||||
accountId: "acct-b",
|
||||
conversation: { id: "other", kind: "direct" },
|
||||
senderId: "acct-b-user",
|
||||
text: "unrelated",
|
||||
});
|
||||
|
||||
const beforeMatch = await Promise.race([
|
||||
pending.then(() => "resolved"),
|
||||
new Promise((resolve) => setTimeout(() => resolve("still-waiting"), 20)),
|
||||
]);
|
||||
expect(beforeMatch).toBe("still-waiting");
|
||||
|
||||
state.addInboundMessage({
|
||||
accountId: "acct-a",
|
||||
conversation: { id: "target", kind: "direct" },
|
||||
senderId: "acct-a-user",
|
||||
text: "matched",
|
||||
});
|
||||
|
||||
await expect(pending).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves inline attachments and lets search match attachment metadata", () => {
|
||||
const state = createQaBusState();
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ import type {
|
||||
QaBusReadMessageInput,
|
||||
QaBusReactToMessageInput,
|
||||
QaBusSearchMessagesInput,
|
||||
QaBusStateSnapshot,
|
||||
QaBusThread,
|
||||
QaBusWaitForInput,
|
||||
} from "./runtime-api.js";
|
||||
@@ -282,8 +283,12 @@ export function createQaBusState() {
|
||||
async waitFor(input: QaBusWaitForInput) {
|
||||
return await waiters.waitFor(input);
|
||||
},
|
||||
async waitForCursorAdvance(afterCursor: number, timeoutMs: number) {
|
||||
return await waiters.waitForCursorAdvance(afterCursor, timeoutMs);
|
||||
async waitForCursorAdvance(
|
||||
afterCursor: number,
|
||||
timeoutMs: number,
|
||||
shouldResolve?: (snapshot: QaBusStateSnapshot) => boolean,
|
||||
) {
|
||||
return await waiters.waitForCursorAdvance(afterCursor, timeoutMs, shouldResolve);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ type CursorWaiter = {
|
||||
reject: (error: Error) => void;
|
||||
timer: NodeJS.Timeout;
|
||||
afterCursor: number;
|
||||
shouldResolve?: (snapshot: QaBusStateSnapshot) => boolean;
|
||||
};
|
||||
|
||||
function createQaBusMatcher(
|
||||
@@ -79,6 +80,9 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
|
||||
if (snapshot.cursor <= waiter.afterCursor) {
|
||||
continue;
|
||||
}
|
||||
if (waiter.shouldResolve && !waiter.shouldResolve(snapshot)) {
|
||||
continue;
|
||||
}
|
||||
clearTimeout(waiter.timer);
|
||||
cursorWaiters.delete(waiter);
|
||||
waiter.resolve();
|
||||
@@ -104,8 +108,13 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
|
||||
waiters.add(waiter);
|
||||
});
|
||||
},
|
||||
async waitForCursorAdvance(afterCursor: number, timeoutMs: number) {
|
||||
if (getSnapshot().cursor > afterCursor) {
|
||||
async waitForCursorAdvance(
|
||||
afterCursor: number,
|
||||
timeoutMs: number,
|
||||
shouldResolve?: (snapshot: QaBusStateSnapshot) => boolean,
|
||||
) {
|
||||
const snapshot = getSnapshot();
|
||||
if (snapshot.cursor > afterCursor && (!shouldResolve || shouldResolve(snapshot))) {
|
||||
return;
|
||||
}
|
||||
return await new Promise<void>((resolve, reject) => {
|
||||
@@ -113,6 +122,7 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
|
||||
resolve,
|
||||
reject,
|
||||
afterCursor,
|
||||
shouldResolve,
|
||||
timer: setTimeout(() => {
|
||||
cursorWaiters.delete(waiter);
|
||||
reject(new Error(`qa-bus wait timeout after ${timeoutMs}ms`));
|
||||
|
||||
Reference in New Issue
Block a user