mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 21:00:44 +00:00
Tests: align pnpm test expectations with main (#67001)
Merged via squash.
Prepared head SHA: 29c8068053
Co-authored-by: hxy91819 <8814856+hxy91819@users.noreply.github.com>
Co-authored-by: hxy91819 <8814856+hxy91819@users.noreply.github.com>
Reviewed-by: @hxy91819
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http";
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
|
||||
import { normalizeAccountId } from "./bus-queries.js";
|
||||
import type { QaBusState } from "./bus-state.js";
|
||||
import type {
|
||||
QaBusCreateThreadInput,
|
||||
@@ -134,16 +135,17 @@ export async function handleQaBusRequest(params: {
|
||||
case "/v1/poll": {
|
||||
const input = body as unknown as QaBusPollInput;
|
||||
const timeoutMs = Math.max(0, Math.min(input.timeoutMs ?? 0, 30_000));
|
||||
const accountId = normalizeAccountId(input.accountId);
|
||||
const initial = params.state.poll(input);
|
||||
if (initial.events.length > 0 || timeoutMs === 0) {
|
||||
writeJson(params.res, 200, initial);
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
await params.state.waitFor({
|
||||
kind: "event-kind",
|
||||
eventKind: "inbound-message",
|
||||
timeoutMs,
|
||||
await params.state.waitForCursorAdvance(input.cursor ?? 0, timeoutMs, (snapshot) => {
|
||||
return snapshot.events.some(
|
||||
(event) => event.accountId === accountId && event.cursor > (input.cursor ?? 0),
|
||||
);
|
||||
});
|
||||
} catch {
|
||||
// timeout ok for long-poll
|
||||
|
||||
@@ -92,6 +92,50 @@ describe("qa-bus state", () => {
|
||||
).rejects.toThrow("qa-bus wait timeout");
|
||||
});
|
||||
|
||||
it("keeps account-scoped cursor waits blocked on unrelated account traffic", async () => {
|
||||
const state = createQaBusState();
|
||||
const pending = state.waitForCursorAdvance(0, 500, (snapshot) => {
|
||||
return snapshot.events.some((event) => event.accountId === "acct-a" && event.cursor > 0);
|
||||
});
|
||||
|
||||
state.addInboundMessage({
|
||||
accountId: "acct-b",
|
||||
conversation: { id: "other", kind: "direct" },
|
||||
senderId: "acct-b-user",
|
||||
text: "unrelated",
|
||||
});
|
||||
|
||||
const beforeMatch = await Promise.race([
|
||||
pending.then(() => "resolved"),
|
||||
new Promise((resolve) => setTimeout(() => resolve("still-waiting"), 20)),
|
||||
]);
|
||||
expect(beforeMatch).toBe("still-waiting");
|
||||
|
||||
state.addInboundMessage({
|
||||
accountId: "acct-a",
|
||||
conversation: { id: "target", kind: "direct" },
|
||||
senderId: "acct-a-user",
|
||||
text: "matched",
|
||||
});
|
||||
|
||||
await expect(pending).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("wakes default-account cursor waits when accountId is omitted", async () => {
|
||||
const state = createQaBusState();
|
||||
const pending = state.waitForCursorAdvance(0, 500, (snapshot) => {
|
||||
return snapshot.events.some((event) => event.accountId === "default" && event.cursor > 0);
|
||||
});
|
||||
|
||||
state.addInboundMessage({
|
||||
conversation: { id: "target", kind: "direct" },
|
||||
senderId: "default-user",
|
||||
text: "matched",
|
||||
});
|
||||
|
||||
await expect(pending).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves inline attachments and lets search match attachment metadata", () => {
|
||||
const state = createQaBusState();
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ import type {
|
||||
QaBusReadMessageInput,
|
||||
QaBusReactToMessageInput,
|
||||
QaBusSearchMessagesInput,
|
||||
QaBusStateSnapshot,
|
||||
QaBusThread,
|
||||
QaBusWaitForInput,
|
||||
} from "./runtime-api.js";
|
||||
@@ -282,6 +283,13 @@ export function createQaBusState() {
|
||||
async waitFor(input: QaBusWaitForInput) {
|
||||
return await waiters.waitFor(input);
|
||||
},
|
||||
async waitForCursorAdvance(
|
||||
afterCursor: number,
|
||||
timeoutMs: number,
|
||||
shouldResolve?: (snapshot: QaBusStateSnapshot) => boolean,
|
||||
) {
|
||||
return await waiters.waitForCursorAdvance(afterCursor, timeoutMs, shouldResolve);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,14 @@ type Waiter = {
|
||||
matcher: (snapshot: QaBusStateSnapshot) => QaBusWaitMatch | null;
|
||||
};
|
||||
|
||||
type CursorWaiter = {
|
||||
resolve: () => void;
|
||||
reject: (error: Error) => void;
|
||||
timer: NodeJS.Timeout;
|
||||
afterCursor: number;
|
||||
shouldResolve?: (snapshot: QaBusStateSnapshot) => boolean;
|
||||
};
|
||||
|
||||
function createQaBusMatcher(
|
||||
input: QaBusWaitForInput,
|
||||
): (snapshot: QaBusStateSnapshot) => QaBusWaitMatch | null {
|
||||
@@ -39,6 +47,7 @@ function createQaBusMatcher(
|
||||
|
||||
export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
|
||||
const waiters = new Set<Waiter>();
|
||||
const cursorWaiters = new Set<CursorWaiter>();
|
||||
|
||||
return {
|
||||
reset(reason = "qa-bus reset") {
|
||||
@@ -47,9 +56,14 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
|
||||
waiter.reject(new Error(reason));
|
||||
}
|
||||
waiters.clear();
|
||||
for (const waiter of cursorWaiters) {
|
||||
clearTimeout(waiter.timer);
|
||||
waiter.reject(new Error(reason));
|
||||
}
|
||||
cursorWaiters.clear();
|
||||
},
|
||||
settle() {
|
||||
if (waiters.size === 0) {
|
||||
if (waiters.size === 0 && cursorWaiters.size === 0) {
|
||||
return;
|
||||
}
|
||||
const snapshot = getSnapshot();
|
||||
@@ -62,6 +76,17 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
|
||||
waiters.delete(waiter);
|
||||
waiter.resolve(match);
|
||||
}
|
||||
for (const waiter of Array.from(cursorWaiters)) {
|
||||
if (snapshot.cursor <= waiter.afterCursor) {
|
||||
continue;
|
||||
}
|
||||
if (waiter.shouldResolve && !waiter.shouldResolve(snapshot)) {
|
||||
continue;
|
||||
}
|
||||
clearTimeout(waiter.timer);
|
||||
cursorWaiters.delete(waiter);
|
||||
waiter.resolve();
|
||||
}
|
||||
},
|
||||
async waitFor(input: QaBusWaitForInput) {
|
||||
const matcher = createQaBusMatcher(input);
|
||||
@@ -83,5 +108,28 @@ export function createQaBusWaiterStore(getSnapshot: () => QaBusStateSnapshot) {
|
||||
waiters.add(waiter);
|
||||
});
|
||||
},
|
||||
async waitForCursorAdvance(
|
||||
afterCursor: number,
|
||||
timeoutMs: number,
|
||||
shouldResolve?: (snapshot: QaBusStateSnapshot) => boolean,
|
||||
) {
|
||||
const snapshot = getSnapshot();
|
||||
if (snapshot.cursor > afterCursor && (!shouldResolve || shouldResolve(snapshot))) {
|
||||
return;
|
||||
}
|
||||
return await new Promise<void>((resolve, reject) => {
|
||||
const waiter: CursorWaiter = {
|
||||
resolve,
|
||||
reject,
|
||||
afterCursor,
|
||||
shouldResolve,
|
||||
timer: setTimeout(() => {
|
||||
cursorWaiters.delete(waiter);
|
||||
reject(new Error(`qa-bus wait timeout after ${timeoutMs}ms`));
|
||||
}, timeoutMs),
|
||||
};
|
||||
cursorWaiters.add(waiter);
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -181,6 +181,7 @@ describe("qa multipass runtime", () => {
|
||||
const fakeCodexHome = path.join(fakeHome, ".codex");
|
||||
fs.mkdirSync(fakeCodexHome, { recursive: true });
|
||||
vi.stubEnv("HOME", "");
|
||||
vi.stubEnv("CODEX_HOME", "");
|
||||
vi.spyOn(os, "homedir").mockReturnValue(fakeHome);
|
||||
|
||||
try {
|
||||
|
||||
@@ -280,8 +280,9 @@ function resolveQaLiveCliAuthEnv(baseEnv: NodeJS.ProcessEnv) {
|
||||
const codexHome = resolveUserPath(configuredCodexHome, baseEnv);
|
||||
return fs.existsSync(codexHome) ? { CODEX_HOME: codexHome } : {};
|
||||
}
|
||||
const hostHome = baseEnv.HOME?.trim() || os.homedir();
|
||||
const codexHome = path.join(hostHome, ".codex");
|
||||
const hostHome = baseEnv.HOME?.trim();
|
||||
const effectiveHome = hostHome || os.homedir();
|
||||
const codexHome = path.join(effectiveHome, ".codex");
|
||||
return fs.existsSync(codexHome) ? { CODEX_HOME: codexHome } : {};
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user