diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 2b128e1dc6b..54b979d4c2e 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -183,6 +183,37 @@ async function waitForAssertion(assertion: () => void, timeoutMs = 2_000, stepMs } } +async function flushScheduledDispatchStep() { + await Promise.resolve(); + if (vi.isFakeTimers()) { + await vi.runOnlyPendingTimersAsync(); + } else { + await new Promise((resolve) => setTimeout(resolve, 15)); + } + await Promise.resolve(); +} + +async function waitForAcceptedRunDispatch(respond: ReturnType) { + const accepted = respond.mock.calls.some(([ok, payload]) => { + return ok === true && (payload as { status?: string } | undefined)?.status === "accepted"; + }); + if (!accepted) { + return; + } + + const commandCallCount = mocks.agentCommand.mock.calls.length; + const respondCallCount = respond.mock.calls.length; + for (let attempt = 0; attempt < 50; attempt++) { + await flushScheduledDispatchStep(); + if ( + mocks.agentCommand.mock.calls.length > commandCallCount || + respond.mock.calls.length > respondCallCount + ) { + return; + } + } +} + function mockMainSessionEntry(entry: Record, cfg: Record = {}) { mocks.loadSessionEntry.mockReturnValue({ cfg, @@ -317,6 +348,7 @@ async function invokeAgent( context?: GatewayRequestContext; client?: AgentHandlerArgs["client"]; isWebchatConnect?: AgentHandlerArgs["isWebchatConnect"]; + flushDispatch?: boolean; }, ) { const respond = options?.respond ?? vi.fn(); @@ -328,6 +360,9 @@ async function invokeAgent( client: options?.client ?? null, isWebchatConnect: options?.isWebchatConnect ?? (() => false), }); + if (options?.flushDispatch !== false) { + await waitForAcceptedRunDispatch(respond); + } return respond; } @@ -1717,19 +1752,19 @@ describe("gateway agent handler", () => { meta: { durationMs: 100 }, }); const respond = vi.fn(); - await agentHandlers.agent({ - params: { + await invokeAgent( + { message: "do thing", sessionKey: "main", voiceWakeTrigger: "robot wake", idempotencyKey: "test-voice-route", }, - respond, - context: makeContext(), - req: { type: "req", id: "voice-1", method: "agent" }, - client: null, - isWebchatConnect: () => false, - }); + { + respond, + context: makeContext(), + reqId: "voice-1", + }, + ); await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled()); const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string }; @@ -1761,19 +1796,19 @@ describe("gateway agent handler", () => { }); const respond = vi.fn(); - await agentHandlers.agent({ - params: { + await invokeAgent( + { message: "do thing", sessionKey: "main", voiceWakeTrigger: "robot wake", idempotencyKey: "test-voice-route-unknown", }, - respond, - context: makeContext(), - req: { type: "req", id: "voice-2", method: "agent" }, - client: null, - isWebchatConnect: () => false, - }); + { + respond, + context: makeContext(), + reqId: "voice-2", + }, + ); await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled()); const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string }; @@ -1805,19 +1840,19 @@ describe("gateway agent handler", () => { }); const respond = vi.fn(); - await agentHandlers.agent({ - params: { + await invokeAgent( + { message: "do thing", sessionKey: "main", voiceWakeTrigger: " ", idempotencyKey: "test-voice-route-default-target", }, - respond, - context: makeContext(), - req: { type: "req", id: "voice-3", method: "agent" }, - client: null, - isWebchatConnect: () => false, - }); + { + respond, + context: makeContext(), + reqId: "voice-3", + }, + ); await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled()); const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string }; @@ -1853,8 +1888,8 @@ describe("gateway agent handler", () => { }); const respond = vi.fn(); - await agentHandlers.agent({ - params: { + await invokeAgent( + { message: "do thing", sessionKey: "main", to: " ", @@ -1862,12 +1897,12 @@ describe("gateway agent handler", () => { voiceWakeTrigger: "robot wake", idempotencyKey: "test-voice-route-whitespace-delivery", }, - respond, - context: makeContext(), - req: { type: "req", id: "voice-4", method: "agent" }, - client: null, - isWebchatConnect: () => false, - }); + { + respond, + context: makeContext(), + reqId: "voice-4", + }, + ); await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled()); const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string }; @@ -1905,19 +1940,19 @@ describe("gateway agent handler", () => { mocks.resolveVoiceWakeRouteByTrigger.mockClear(); const respond = vi.fn(); - await agentHandlers.agent({ - params: { + await invokeAgent( + { message: "do thing", sessionKey: "agent:main:research", voiceWakeTrigger: "robot wake", idempotencyKey: "test-voice-route-explicit-session", }, - respond, - context: makeContext(), - req: { type: "req", id: "voice-5", method: "agent" }, - client: null, - isWebchatConnect: () => false, - }); + { + respond, + context: makeContext(), + reqId: "voice-5", + }, + ); await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled()); const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string }; @@ -1953,19 +1988,19 @@ describe("gateway agent handler", () => { mocks.resolveVoiceWakeRouteByTrigger.mockClear(); const respond = vi.fn(); - await agentHandlers.agent({ - params: { + await invokeAgent( + { message: "do thing", sessionKey: "agent:ops:main", voiceWakeTrigger: "robot wake", idempotencyKey: "test-voice-route-explicit-other-agent-main", }, - respond, - context: makeContext(), - req: { type: "req", id: "voice-5b", method: "agent" }, - client: null, - isWebchatConnect: () => false, - }); + { + respond, + context: makeContext(), + reqId: "voice-5b", + }, + ); await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled()); const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string }; @@ -2001,20 +2036,20 @@ describe("gateway agent handler", () => { mocks.resolveVoiceWakeRouteByTrigger.mockClear(); const respond = vi.fn(); - await agentHandlers.agent({ - params: { + await invokeAgent( + { message: "do thing", sessionKey: "main", sessionId: "caller-selected-session-id", voiceWakeTrigger: "robot wake", idempotencyKey: "test-voice-route-explicit-session-id", }, - respond, - context: makeContext(), - req: { type: "req", id: "voice-6", method: "agent" }, - client: null, - isWebchatConnect: () => false, - }); + { + respond, + context: makeContext(), + reqId: "voice-6", + }, + ); await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled()); const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string }; @@ -2504,7 +2539,7 @@ describe("gateway agent handler chat.abort integration", () => { sessionKey: "agent:main:main", idempotencyKey: runId, }, - { respond, reqId: runId }, + { respond, reqId: runId, flushDispatch: false }, ); await Promise.resolve(); @@ -2521,7 +2556,9 @@ describe("gateway agent handler chat.abort integration", () => { ); expect(mocks.agentCommand).not.toHaveBeenCalled(); - await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + expect(mocks.agentCommand).not.toHaveBeenCalled(); + await new Promise((resolve) => setTimeout(resolve, 15)); await pending; expect(mocks.agentCommand).toHaveBeenCalledTimes(1); @@ -2725,20 +2762,25 @@ describe("gateway agent handler chat.abort integration", () => { const context = makeContext(); const runId = "idem-abort-reactivation-fails"; - await expect( - invokeAgent( - { - message: "hi", - agentId: "main", - sessionKey: "agent:main:main", - idempotencyKey: runId, - }, - { context, reqId: runId }, - ), - ).rejects.toThrow("reactivate boom"); + const respond = vi.fn(); + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, reqId: runId, respond }, + ); expect(context.chatAbortControllers.has(runId)).toBe(false); expect(mocks.agentCommand).not.toHaveBeenCalled(); + expect(respond).toHaveBeenCalledWith( + false, + expect.objectContaining({ runId, status: "error" }), + expect.objectContaining({ code: "UNAVAILABLE" }), + expect.objectContaining({ runId }), + ); }); it("does not overwrite or evict a pre-existing chatAbortControllers entry with the same runId", async () => { diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 3c55002d9b5..01ca26f224c 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -1,5 +1,4 @@ import { randomUUID } from "node:crypto"; -import { MessageChannel } from "node:worker_threads"; import { listAgentIds, resolveDefaultAgentId, @@ -395,13 +394,7 @@ function dispatchAgentRunFromGateway(params: { function yieldAfterAgentAcceptedAck(): Promise { return new Promise((resolve) => { - const channel = new MessageChannel(); - channel.port1.on("message", () => { - channel.port1.close(); - channel.port2.close(); - resolve(); - }); - channel.port2.postMessage(undefined); + setTimeout(resolve, 10); }); } @@ -1130,121 +1123,145 @@ export const agentHandlers: GatewayRequestHandlers = { }); respond(true, accepted, undefined, { runId }); // Give the accepted frame one event-loop turn to flush before the runner - // starts potentially heavy synchronous prompt/context setup. Otherwise a - // hot pre-turn path can starve the WebSocket caller until it times out. - await yieldAfterAgentAcceptedAck(); + // starts potentially heavy synchronous prompt/context setup. The dispatch + // is scheduled out of this request handler so immediate agent.wait calls + // can reach the gateway before the pre-turn runner monopolizes the loop. + void (async () => { + await yieldAfterAgentAcceptedAck(); - let dispatched = false; - try { - if (resolvedSessionKey) { - await reactivateCompletedSubagentSession({ - sessionKey: resolvedSessionKey, - runId, - }); - } - - if (requestedSessionKey && resolvedSessionKey && isNewSession) { - emitSessionsChanged(context, { - sessionKey: resolvedSessionKey, - reason: "create", - }); - } - if (resolvedSessionKey) { - emitSessionsChanged(context, { - sessionKey: resolvedSessionKey, - reason: "send", - }); - } - - if (shouldPrependStartupContext && resolvedSessionKey) { - const { runtimeWorkspaceDir } = resolveSessionRuntimeWorkspace({ - cfg: cfgForAgent ?? cfg, - sessionKey: resolvedSessionKey, - sessionEntry, - spawnedBy: spawnedByValue, - }); - const startupContextPrelude = await buildSessionStartupContextPrelude({ - workspaceDir: runtimeWorkspaceDir, - cfg: cfgForAgent ?? cfg, - }); - if (startupContextPrelude) { - message = `${startupContextPrelude}\n\n${message}`; + let dispatched = false; + try { + if (resolvedSessionKey) { + await reactivateCompletedSubagentSession({ + sessionKey: resolvedSessionKey, + runId, + }); } - } - if (!isRawModelRun) { - message = annotateInterSessionPromptText(message, inputProvenance); - } - const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId; - const ingressAgentId = - agentId && - (!resolvedSessionKey || resolveAgentIdFromSessionKey(resolvedSessionKey) === agentId) - ? agentId - : undefined; + if (requestedSessionKey && resolvedSessionKey && isNewSession) { + emitSessionsChanged(context, { + sessionKey: resolvedSessionKey, + reason: "create", + }); + } + if (resolvedSessionKey) { + emitSessionsChanged(context, { + sessionKey: resolvedSessionKey, + reason: "send", + }); + } - dispatchAgentRunFromGateway({ - ingressOpts: { - message, - images, - imageOrder, - agentId: ingressAgentId, - provider: providerOverride, - model: modelOverride, - to: resolvedTo, - sessionId: resolvedSessionId, - sessionKey: resolvedSessionKey, - thinking: request.thinking, - deliver, - deliveryTargetMode, - channel: resolvedChannel, - accountId: resolvedAccountId, - threadId: resolvedThreadId, - runContext: { - messageChannel: originMessageChannel, + if (shouldPrependStartupContext && resolvedSessionKey) { + const { runtimeWorkspaceDir } = resolveSessionRuntimeWorkspace({ + cfg: cfgForAgent ?? cfg, + sessionKey: resolvedSessionKey, + sessionEntry, + spawnedBy: spawnedByValue, + }); + const startupContextPrelude = await buildSessionStartupContextPrelude({ + workspaceDir: runtimeWorkspaceDir, + cfg: cfgForAgent ?? cfg, + }); + if (startupContextPrelude) { + message = `${startupContextPrelude}\n\n${message}`; + } + } + if (!isRawModelRun) { + message = annotateInterSessionPromptText(message, inputProvenance); + } + + const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId; + const ingressAgentId = + agentId && + (!resolvedSessionKey || resolveAgentIdFromSessionKey(resolvedSessionKey) === agentId) + ? agentId + : undefined; + + dispatchAgentRunFromGateway({ + ingressOpts: { + message, + images, + imageOrder, + agentId: ingressAgentId, + provider: providerOverride, + model: modelOverride, + to: resolvedTo, + sessionId: resolvedSessionId, + sessionKey: resolvedSessionKey, + thinking: request.thinking, + deliver, + deliveryTargetMode, + channel: resolvedChannel, accountId: resolvedAccountId, + threadId: resolvedThreadId, + runContext: { + messageChannel: originMessageChannel, + accountId: resolvedAccountId, + groupId: resolvedGroupId, + groupChannel: resolvedGroupChannel, + groupSpace: resolvedGroupSpace, + currentThreadTs: resolvedThreadId != null ? String(resolvedThreadId) : undefined, + }, groupId: resolvedGroupId, groupChannel: resolvedGroupChannel, groupSpace: resolvedGroupSpace, - currentThreadTs: resolvedThreadId != null ? String(resolvedThreadId) : undefined, - }, - groupId: resolvedGroupId, - groupChannel: resolvedGroupChannel, - groupSpace: resolvedGroupSpace, - spawnedBy: spawnedByValue, - timeout: request.timeout?.toString(), - bestEffortDeliver, - messageChannel: originMessageChannel, - runId, - lane: request.lane, - modelRun: request.modelRun === true, - promptMode: request.promptMode, - extraSystemPrompt: request.extraSystemPrompt, - bootstrapContextMode: request.bootstrapContextMode, - bootstrapContextRunKind: request.bootstrapContextRunKind, - acpTurnSource: request.acpTurnSource, - internalEvents: request.internalEvents, - inputProvenance, - abortSignal: activeRunAbort.controller.signal, - // Internal-only: allow workspace override for spawned subagent runs. - workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({ spawnedBy: spawnedByValue, - workspaceDir: sessionEntry?.spawnedWorkspaceDir, - }), - senderIsOwner, - allowModelOverride, - }, - runId, - idempotencyKey: idem, - abortController: activeRunAbort.controller, - respond, - context, - }); - dispatched = true; - } finally { - if (!dispatched) { - activeRunAbort.cleanup(); + timeout: request.timeout?.toString(), + bestEffortDeliver, + messageChannel: originMessageChannel, + runId, + lane: request.lane, + modelRun: request.modelRun === true, + promptMode: request.promptMode, + extraSystemPrompt: request.extraSystemPrompt, + bootstrapContextMode: request.bootstrapContextMode, + bootstrapContextRunKind: request.bootstrapContextRunKind, + acpTurnSource: request.acpTurnSource, + internalEvents: request.internalEvents, + inputProvenance, + abortSignal: activeRunAbort.controller.signal, + // Internal-only: allow workspace override for spawned subagent runs. + workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({ + spawnedBy: spawnedByValue, + workspaceDir: sessionEntry?.spawnedWorkspaceDir, + }), + senderIsOwner, + allowModelOverride, + }, + runId, + idempotencyKey: idem, + abortController: activeRunAbort.controller, + respond, + context, + }); + dispatched = true; + } catch (err) { + const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); + const payload = { + runId, + status: "error" as const, + summary: String(err), + }; + setGatewayDedupeEntry({ + dedupe: context.dedupe, + key: `agent:${idem}`, + entry: { + ts: Date.now(), + ok: false, + payload, + error, + }, + }); + respond(false, payload, error, { + runId, + error: formatForLog(err), + }); + } finally { + if (!dispatched) { + activeRunAbort.cleanup(); + } } - } + })(); }, "agent.identity.get": ({ params, respond, context }) => { if (!validateAgentIdentityParams(params)) {