test(gateway): stabilize agent wait lifecycle test

This commit is contained in:
Peter Steinberger
2026-04-26 05:33:42 +01:00
parent 5f2273e81e
commit 6893e8f5f4
2 changed files with 43 additions and 8 deletions

View File

@@ -20,6 +20,7 @@ const agentRunCache = new Map<string, AgentRunSnapshot>();
const agentRunStarts = new Map<string, number>();
const pendingAgentRunErrors = new Map<string, PendingAgentRunError>();
const pendingAgentRunTimeouts = new Map<string, PendingAgentRunTerminal>();
const agentRunWaiterCounts = new Map<string, number>();
let agentRunListenerStarted = false;
type AgentRunSnapshot = {
@@ -195,6 +196,23 @@ function getCachedAgentRun(runId: string) {
return agentRunCache.get(runId);
}
function addAgentRunWaiter(runId: string): () => void {
agentRunWaiterCounts.set(runId, (agentRunWaiterCounts.get(runId) ?? 0) + 1);
let removed = false;
return () => {
if (removed) {
return;
}
removed = true;
const nextCount = (agentRunWaiterCounts.get(runId) ?? 1) - 1;
if (nextCount <= 0) {
agentRunWaiterCounts.delete(runId);
return;
}
agentRunWaiterCounts.set(runId, nextCount);
};
}
export async function waitForAgentJob(params: {
runId: string;
timeoutMs: number;
@@ -216,6 +234,7 @@ export async function waitForAgentJob(params: {
let pendingErrorTimer: NodeJS.Timeout | undefined;
let pendingTimeoutTimer: NodeJS.Timeout | undefined;
let onAbort: (() => void) | undefined;
let removeWaiter = () => {};
const clearPendingErrorTimer = () => {
if (!pendingErrorTimer) {
@@ -242,6 +261,7 @@ export async function waitForAgentJob(params: {
clearPendingErrorTimer();
clearPendingTimeoutTimer();
unsubscribe();
removeWaiter();
if (onAbort) {
signal?.removeEventListener("abort", onAbort);
}
@@ -334,6 +354,7 @@ export async function waitForAgentJob(params: {
recordAgentRunSnapshot(snapshot);
finish(snapshot);
});
removeWaiter = addAgentRunWaiter(runId);
const timer = setSafeTimeout(() => finish(null), timeoutMs);
onAbort = () => finish(null);
@@ -342,3 +363,19 @@ export async function waitForAgentJob(params: {
}
ensureAgentRunListener();
export const __testing = {
getWaiterCount(runId?: string): number {
if (runId) {
return agentRunWaiterCounts.get(runId) ?? 0;
}
let total = 0;
for (const count of agentRunWaiterCounts.values()) {
total += count;
}
return total;
},
resetWaiters(): void {
agentRunWaiterCounts.clear();
},
};

View File

@@ -6,6 +6,7 @@ import { WebSocket } from "ws";
import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js";
import { extractFirstTextBlock } from "../shared/chat-message-content.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import { __testing as agentJobTesting } from "./server-methods/agent-job.js";
import {
connectOk,
dispatchInboundMessageMock,
@@ -157,6 +158,10 @@ describe("gateway server chat", () => {
return res;
};
const waitForLifecycleWaiter = async (runId: string) => {
await vi.waitFor(() => expect(agentJobTesting.getWaiterCount(runId)).toBeGreaterThan(0));
};
const abortChatRun = async (runId: string) => {
const res = await rpcReq(ws, "chat.abort", {
sessionKey: "main",
@@ -1102,14 +1107,7 @@ describe("gateway server chat", () => {
timeoutMs: 1_000,
});
vi.useFakeTimers();
try {
const settle = new Promise((resolve) => setTimeout(resolve, 20));
await vi.advanceTimersByTimeAsync(20);
await settle;
} finally {
vi.useRealTimers();
}
await waitForLifecycleWaiter(runId);
emitAgentEvent({
runId,
stream: "lifecycle",