From 6893e8f5f41f819900a2274ed06f25206813b434 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 26 Apr 2026 05:33:42 +0100 Subject: [PATCH] test(gateway): stabilize agent wait lifecycle test --- src/gateway/server-methods/agent-job.ts | 37 +++++++++++++++++++ .../server.chat.gateway-server-chat.test.ts | 14 +++---- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/src/gateway/server-methods/agent-job.ts b/src/gateway/server-methods/agent-job.ts index f32a1d8b0e2..9e2cda71dc5 100644 --- a/src/gateway/server-methods/agent-job.ts +++ b/src/gateway/server-methods/agent-job.ts @@ -20,6 +20,7 @@ const agentRunCache = new Map(); const agentRunStarts = new Map(); const pendingAgentRunErrors = new Map(); const pendingAgentRunTimeouts = new Map(); +const agentRunWaiterCounts = new Map(); let agentRunListenerStarted = false; type AgentRunSnapshot = { @@ -195,6 +196,23 @@ function getCachedAgentRun(runId: string) { return agentRunCache.get(runId); } +function addAgentRunWaiter(runId: string): () => void { + agentRunWaiterCounts.set(runId, (agentRunWaiterCounts.get(runId) ?? 0) + 1); + let removed = false; + return () => { + if (removed) { + return; + } + removed = true; + const nextCount = (agentRunWaiterCounts.get(runId) ?? 1) - 1; + if (nextCount <= 0) { + agentRunWaiterCounts.delete(runId); + return; + } + agentRunWaiterCounts.set(runId, nextCount); + }; +} + export async function waitForAgentJob(params: { runId: string; timeoutMs: number; @@ -216,6 +234,7 @@ export async function waitForAgentJob(params: { let pendingErrorTimer: NodeJS.Timeout | undefined; let pendingTimeoutTimer: NodeJS.Timeout | undefined; let onAbort: (() => void) | undefined; + let removeWaiter = () => {}; const clearPendingErrorTimer = () => { if (!pendingErrorTimer) { @@ -242,6 +261,7 @@ export async function waitForAgentJob(params: { clearPendingErrorTimer(); clearPendingTimeoutTimer(); unsubscribe(); + removeWaiter(); if (onAbort) { signal?.removeEventListener("abort", onAbort); } @@ -334,6 +354,7 @@ export async function waitForAgentJob(params: { recordAgentRunSnapshot(snapshot); finish(snapshot); }); + removeWaiter = addAgentRunWaiter(runId); const timer = setSafeTimeout(() => finish(null), timeoutMs); onAbort = () => finish(null); @@ -342,3 +363,19 @@ export async function waitForAgentJob(params: { } ensureAgentRunListener(); + +export const __testing = { + getWaiterCount(runId?: string): number { + if (runId) { + return agentRunWaiterCounts.get(runId) ?? 0; + } + let total = 0; + for (const count of agentRunWaiterCounts.values()) { + total += count; + } + return total; + }, + resetWaiters(): void { + agentRunWaiterCounts.clear(); + }, +}; diff --git a/src/gateway/server.chat.gateway-server-chat.test.ts b/src/gateway/server.chat.gateway-server-chat.test.ts index 3c20f7ffdea..d2dd6bb14a5 100644 --- a/src/gateway/server.chat.gateway-server-chat.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.test.ts @@ -6,6 +6,7 @@ import { WebSocket } from "ws"; import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js"; import { extractFirstTextBlock } from "../shared/chat-message-content.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; +import { __testing as agentJobTesting } from "./server-methods/agent-job.js"; import { connectOk, dispatchInboundMessageMock, @@ -157,6 +158,10 @@ describe("gateway server chat", () => { return res; }; + const waitForLifecycleWaiter = async (runId: string) => { + await vi.waitFor(() => expect(agentJobTesting.getWaiterCount(runId)).toBeGreaterThan(0)); + }; + const abortChatRun = async (runId: string) => { const res = await rpcReq(ws, "chat.abort", { sessionKey: "main", @@ -1102,14 +1107,7 @@ describe("gateway server chat", () => { timeoutMs: 1_000, }); - vi.useFakeTimers(); - try { - const settle = new Promise((resolve) => setTimeout(resolve, 20)); - await vi.advanceTimersByTimeAsync(20); - await settle; - } finally { - vi.useRealTimers(); - } + await waitForLifecycleWaiter(runId); emitAgentEvent({ runId, stream: "lifecycle",