diff --git a/extensions/qa-channel/src/bus-client.test.ts b/extensions/qa-channel/src/bus-client.test.ts new file mode 100644 index 00000000000..2c7237c83cf --- /dev/null +++ b/extensions/qa-channel/src/bus-client.test.ts @@ -0,0 +1,56 @@ +import { createServer } from "node:http"; +import { afterEach, describe, expect, it } from "vitest"; +import { pollQaBus } from "./bus-client.js"; + +async function startJsonServer(handler: () => { statusCode?: number; body: string }) { + const server = createServer((_req, res) => { + const response = handler(); + res.writeHead(response.statusCode ?? 200, { + "content-type": "application/json; charset=utf-8", + }); + res.end(response.body); + }); + + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => resolve()); + }); + + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("test server failed to bind"); + } + + return { + baseUrl: `http://127.0.0.1:${address.port}`, + async stop() { + await new Promise((resolve, reject) => { + server.close((error) => (error ? reject(error) : resolve())); + }); + }, + }; +} + +describe("qa-bus client", () => { + const stops: Array<() => Promise> = []; + + afterEach(async () => { + await Promise.all(stops.splice(0).map((stop) => stop())); + }); + + it("rejects malformed JSON responses instead of throwing from the stream callback", async () => { + const server = await startJsonServer(() => ({ + body: '{"cursor":1,"events":[', + })); + stops.push(server.stop); + + await expect( + pollQaBus({ + baseUrl: server.baseUrl, + accountId: "acct-a", + cursor: 0, + timeoutMs: 0, + }), + ).rejects.toThrow(SyntaxError); + }); +}); diff --git a/extensions/qa-channel/src/bus-client.ts b/extensions/qa-channel/src/bus-client.ts index 7229e749c97..3c7c29b2126 100644 --- a/extensions/qa-channel/src/bus-client.ts +++ b/extensions/qa-channel/src/bus-client.ts @@ -69,7 +69,13 @@ async function postJson( }); response.on("end", () => { const text = Buffer.concat(chunks).toString("utf8"); - const parsed = text ? (JSON.parse(text) as T | { error?: string }) : ({} as T); + let parsed: T | { error?: string }; + try { + parsed = text ? (JSON.parse(text) as T | { error?: string }) : ({} as T); + } catch (error) { + reject(error); + return; + } if ((response.statusCode ?? 500) < 200 || (response.statusCode ?? 500) >= 300) { const error = typeof parsed === "object" && parsed && "error" in parsed ? parsed.error : undefined; diff --git a/extensions/qa-lab/src/bus-server.ts b/extensions/qa-lab/src/bus-server.ts index 6a35d864890..f4fd7da3296 100644 --- a/extensions/qa-lab/src/bus-server.ts +++ b/extensions/qa-lab/src/bus-server.ts @@ -140,7 +140,11 @@ export async function handleQaBusRequest(params: { return true; } try { - await params.state.waitForCursorAdvance(input.cursor ?? 0, timeoutMs); + await params.state.waitForCursorAdvance(input.cursor ?? 0, timeoutMs, (snapshot) => { + return snapshot.events.some( + (event) => event.accountId === input.accountId && event.cursor > (input.cursor ?? 0), + ); + }); } catch { // timeout ok for long-poll } diff --git a/extensions/qa-lab/src/bus-state.test.ts b/extensions/qa-lab/src/bus-state.test.ts index 0060cc350d0..e0c4a065a8b 100644 --- a/extensions/qa-lab/src/bus-state.test.ts +++ b/extensions/qa-lab/src/bus-state.test.ts @@ -92,6 +92,35 @@ describe("qa-bus state", () => { ).rejects.toThrow("qa-bus wait timeout"); }); + it("keeps account-scoped cursor waits blocked on unrelated account traffic", async () => { + const state = createQaBusState(); + const pending = state.waitForCursorAdvance(0, 500, (snapshot) => { + return snapshot.events.some((event) => event.accountId === "acct-a" && event.cursor > 0); + }); + + state.addInboundMessage({ + accountId: "acct-b", + conversation: { id: "other", kind: "direct" }, + senderId: "acct-b-user", + text: "unrelated", + }); + + const beforeMatch = await Promise.race([ + pending.then(() => "resolved"), + new Promise((resolve) => setTimeout(() => resolve("still-waiting"), 20)), + ]); + expect(beforeMatch).toBe("still-waiting"); + + state.addInboundMessage({ + accountId: "acct-a", + conversation: { id: "target", kind: "direct" }, + senderId: "acct-a-user", + text: "matched", + }); + + await expect(pending).resolves.toBeUndefined(); + }); + it("preserves inline attachments and lets search match attachment metadata", () => { const state = createQaBusState(); diff --git a/extensions/qa-lab/src/bus-state.ts b/extensions/qa-lab/src/bus-state.ts index 3ed8724fe69..f8c867631fb 100644 --- a/extensions/qa-lab/src/bus-state.ts +++ b/extensions/qa-lab/src/bus-state.ts @@ -23,6 +23,7 @@ import type { QaBusReadMessageInput, QaBusReactToMessageInput, QaBusSearchMessagesInput, + QaBusStateSnapshot, QaBusThread, QaBusWaitForInput, } from "./runtime-api.js"; @@ -282,8 +283,12 @@ export function createQaBusState() { async waitFor(input: QaBusWaitForInput) { return await waiters.waitFor(input); }, - async waitForCursorAdvance(afterCursor: number, timeoutMs: number) { - return await waiters.waitForCursorAdvance(afterCursor, timeoutMs); + async waitForCursorAdvance( + afterCursor: number, + timeoutMs: number, + shouldResolve?: (snapshot: QaBusStateSnapshot) => boolean, + ) { + return await waiters.waitForCursorAdvance(afterCursor, timeoutMs, shouldResolve); }, }; } diff --git a/extensions/qa-lab/src/bus-waiters.ts b/extensions/qa-lab/src/bus-waiters.ts index 7a37222aa07..68174276c08 100644 --- a/extensions/qa-lab/src/bus-waiters.ts +++ b/extensions/qa-lab/src/bus-waiters.ts @@ -22,6 +22,7 @@ type CursorWaiter = { reject: (error: Error) => void; timer: NodeJS.Timeout; afterCursor: number; + shouldResolve?: (snapshot: QaBusStateSnapshot) => boolean; }; function createQaBusMatcher( @@ -79,6 +80,9 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) { if (snapshot.cursor <= waiter.afterCursor) { continue; } + if (waiter.shouldResolve && !waiter.shouldResolve(snapshot)) { + continue; + } clearTimeout(waiter.timer); cursorWaiters.delete(waiter); waiter.resolve(); @@ -104,8 +108,13 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) { waiters.add(waiter); }); }, - async waitForCursorAdvance(afterCursor: number, timeoutMs: number) { - if (getSnapshot().cursor > afterCursor) { + async waitForCursorAdvance( + afterCursor: number, + timeoutMs: number, + shouldResolve?: (snapshot: QaBusStateSnapshot) => boolean, + ) { + const snapshot = getSnapshot(); + if (snapshot.cursor > afterCursor && (!shouldResolve || shouldResolve(snapshot))) { return; } return await new Promise((resolve, reject) => { @@ -113,6 +122,7 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) { resolve, reject, afterCursor, + shouldResolve, timer: setTimeout(() => { cursorWaiters.delete(waiter); reject(new Error(`qa-bus wait timeout after ${timeoutMs}ms`));