fix(gateway): defer agent dispatch after accept

This commit is contained in:
Vincent Koc
2026-04-29 02:20:45 -07:00
parent 1d61862adb
commit 985000026e
2 changed files with 241 additions and 182 deletions

View File

@@ -183,6 +183,37 @@ async function waitForAssertion(assertion: () => void, timeoutMs = 2_000, stepMs
}
}
async function flushScheduledDispatchStep() {
await Promise.resolve();
if (vi.isFakeTimers()) {
await vi.runOnlyPendingTimersAsync();
} else {
await new Promise<void>((resolve) => setTimeout(resolve, 15));
}
await Promise.resolve();
}
async function waitForAcceptedRunDispatch(respond: ReturnType<typeof vi.fn>) {
const accepted = respond.mock.calls.some(([ok, payload]) => {
return ok === true && (payload as { status?: string } | undefined)?.status === "accepted";
});
if (!accepted) {
return;
}
const commandCallCount = mocks.agentCommand.mock.calls.length;
const respondCallCount = respond.mock.calls.length;
for (let attempt = 0; attempt < 50; attempt++) {
await flushScheduledDispatchStep();
if (
mocks.agentCommand.mock.calls.length > commandCallCount ||
respond.mock.calls.length > respondCallCount
) {
return;
}
}
}
function mockMainSessionEntry(entry: Record<string, unknown>, cfg: Record<string, unknown> = {}) {
mocks.loadSessionEntry.mockReturnValue({
cfg,
@@ -317,6 +348,7 @@ async function invokeAgent(
context?: GatewayRequestContext;
client?: AgentHandlerArgs["client"];
isWebchatConnect?: AgentHandlerArgs["isWebchatConnect"];
flushDispatch?: boolean;
},
) {
const respond = options?.respond ?? vi.fn();
@@ -328,6 +360,9 @@ async function invokeAgent(
client: options?.client ?? null,
isWebchatConnect: options?.isWebchatConnect ?? (() => false),
});
if (options?.flushDispatch !== false) {
await waitForAcceptedRunDispatch(respond);
}
return respond;
}
@@ -1717,19 +1752,19 @@ describe("gateway agent handler", () => {
meta: { durationMs: 100 },
});
const respond = vi.fn();
await agentHandlers.agent({
params: {
await invokeAgent(
{
message: "do thing",
sessionKey: "main",
voiceWakeTrigger: "robot wake",
idempotencyKey: "test-voice-route",
},
respond,
context: makeContext(),
req: { type: "req", id: "voice-1", method: "agent" },
client: null,
isWebchatConnect: () => false,
});
{
respond,
context: makeContext(),
reqId: "voice-1",
},
);
await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled());
const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string };
@@ -1761,19 +1796,19 @@ describe("gateway agent handler", () => {
});
const respond = vi.fn();
await agentHandlers.agent({
params: {
await invokeAgent(
{
message: "do thing",
sessionKey: "main",
voiceWakeTrigger: "robot wake",
idempotencyKey: "test-voice-route-unknown",
},
respond,
context: makeContext(),
req: { type: "req", id: "voice-2", method: "agent" },
client: null,
isWebchatConnect: () => false,
});
{
respond,
context: makeContext(),
reqId: "voice-2",
},
);
await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled());
const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string };
@@ -1805,19 +1840,19 @@ describe("gateway agent handler", () => {
});
const respond = vi.fn();
await agentHandlers.agent({
params: {
await invokeAgent(
{
message: "do thing",
sessionKey: "main",
voiceWakeTrigger: " ",
idempotencyKey: "test-voice-route-default-target",
},
respond,
context: makeContext(),
req: { type: "req", id: "voice-3", method: "agent" },
client: null,
isWebchatConnect: () => false,
});
{
respond,
context: makeContext(),
reqId: "voice-3",
},
);
await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled());
const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string };
@@ -1853,8 +1888,8 @@ describe("gateway agent handler", () => {
});
const respond = vi.fn();
await agentHandlers.agent({
params: {
await invokeAgent(
{
message: "do thing",
sessionKey: "main",
to: " ",
@@ -1862,12 +1897,12 @@ describe("gateway agent handler", () => {
voiceWakeTrigger: "robot wake",
idempotencyKey: "test-voice-route-whitespace-delivery",
},
respond,
context: makeContext(),
req: { type: "req", id: "voice-4", method: "agent" },
client: null,
isWebchatConnect: () => false,
});
{
respond,
context: makeContext(),
reqId: "voice-4",
},
);
await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled());
const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string };
@@ -1905,19 +1940,19 @@ describe("gateway agent handler", () => {
mocks.resolveVoiceWakeRouteByTrigger.mockClear();
const respond = vi.fn();
await agentHandlers.agent({
params: {
await invokeAgent(
{
message: "do thing",
sessionKey: "agent:main:research",
voiceWakeTrigger: "robot wake",
idempotencyKey: "test-voice-route-explicit-session",
},
respond,
context: makeContext(),
req: { type: "req", id: "voice-5", method: "agent" },
client: null,
isWebchatConnect: () => false,
});
{
respond,
context: makeContext(),
reqId: "voice-5",
},
);
await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled());
const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string };
@@ -1953,19 +1988,19 @@ describe("gateway agent handler", () => {
mocks.resolveVoiceWakeRouteByTrigger.mockClear();
const respond = vi.fn();
await agentHandlers.agent({
params: {
await invokeAgent(
{
message: "do thing",
sessionKey: "agent:ops:main",
voiceWakeTrigger: "robot wake",
idempotencyKey: "test-voice-route-explicit-other-agent-main",
},
respond,
context: makeContext(),
req: { type: "req", id: "voice-5b", method: "agent" },
client: null,
isWebchatConnect: () => false,
});
{
respond,
context: makeContext(),
reqId: "voice-5b",
},
);
await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled());
const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string };
@@ -2001,20 +2036,20 @@ describe("gateway agent handler", () => {
mocks.resolveVoiceWakeRouteByTrigger.mockClear();
const respond = vi.fn();
await agentHandlers.agent({
params: {
await invokeAgent(
{
message: "do thing",
sessionKey: "main",
sessionId: "caller-selected-session-id",
voiceWakeTrigger: "robot wake",
idempotencyKey: "test-voice-route-explicit-session-id",
},
respond,
context: makeContext(),
req: { type: "req", id: "voice-6", method: "agent" },
client: null,
isWebchatConnect: () => false,
});
{
respond,
context: makeContext(),
reqId: "voice-6",
},
);
await vi.waitFor(() => expect(mocks.agentCommand).toHaveBeenCalled());
const callArgs = mocks.agentCommand.mock.calls.at(-1)?.[0] as { sessionKey?: string };
@@ -2504,7 +2539,7 @@ describe("gateway agent handler chat.abort integration", () => {
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ respond, reqId: runId },
{ respond, reqId: runId, flushDispatch: false },
);
await Promise.resolve();
@@ -2521,7 +2556,9 @@ describe("gateway agent handler chat.abort integration", () => {
);
expect(mocks.agentCommand).not.toHaveBeenCalled();
await new Promise((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(mocks.agentCommand).not.toHaveBeenCalled();
await new Promise<void>((resolve) => setTimeout(resolve, 15));
await pending;
expect(mocks.agentCommand).toHaveBeenCalledTimes(1);
@@ -2725,20 +2762,25 @@ describe("gateway agent handler chat.abort integration", () => {
const context = makeContext();
const runId = "idem-abort-reactivation-fails";
await expect(
invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: runId },
),
).rejects.toThrow("reactivate boom");
const respond = vi.fn();
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: runId, respond },
);
expect(context.chatAbortControllers.has(runId)).toBe(false);
expect(mocks.agentCommand).not.toHaveBeenCalled();
expect(respond).toHaveBeenCalledWith(
false,
expect.objectContaining({ runId, status: "error" }),
expect.objectContaining({ code: "UNAVAILABLE" }),
expect.objectContaining({ runId }),
);
});
it("does not overwrite or evict a pre-existing chatAbortControllers entry with the same runId", async () => {

View File

@@ -1,5 +1,4 @@
import { randomUUID } from "node:crypto";
import { MessageChannel } from "node:worker_threads";
import {
listAgentIds,
resolveDefaultAgentId,
@@ -395,13 +394,7 @@ function dispatchAgentRunFromGateway(params: {
function yieldAfterAgentAcceptedAck(): Promise<void> {
return new Promise((resolve) => {
const channel = new MessageChannel();
channel.port1.on("message", () => {
channel.port1.close();
channel.port2.close();
resolve();
});
channel.port2.postMessage(undefined);
setTimeout(resolve, 10);
});
}
@@ -1130,121 +1123,145 @@ export const agentHandlers: GatewayRequestHandlers = {
});
respond(true, accepted, undefined, { runId });
// Give the accepted frame one event-loop turn to flush before the runner
// starts potentially heavy synchronous prompt/context setup. Otherwise a
// hot pre-turn path can starve the WebSocket caller until it times out.
await yieldAfterAgentAcceptedAck();
// starts potentially heavy synchronous prompt/context setup. The dispatch
// is scheduled out of this request handler so immediate agent.wait calls
// can reach the gateway before the pre-turn runner monopolizes the loop.
void (async () => {
await yieldAfterAgentAcceptedAck();
let dispatched = false;
try {
if (resolvedSessionKey) {
await reactivateCompletedSubagentSession({
sessionKey: resolvedSessionKey,
runId,
});
}
if (requestedSessionKey && resolvedSessionKey && isNewSession) {
emitSessionsChanged(context, {
sessionKey: resolvedSessionKey,
reason: "create",
});
}
if (resolvedSessionKey) {
emitSessionsChanged(context, {
sessionKey: resolvedSessionKey,
reason: "send",
});
}
if (shouldPrependStartupContext && resolvedSessionKey) {
const { runtimeWorkspaceDir } = resolveSessionRuntimeWorkspace({
cfg: cfgForAgent ?? cfg,
sessionKey: resolvedSessionKey,
sessionEntry,
spawnedBy: spawnedByValue,
});
const startupContextPrelude = await buildSessionStartupContextPrelude({
workspaceDir: runtimeWorkspaceDir,
cfg: cfgForAgent ?? cfg,
});
if (startupContextPrelude) {
message = `${startupContextPrelude}\n\n${message}`;
let dispatched = false;
try {
if (resolvedSessionKey) {
await reactivateCompletedSubagentSession({
sessionKey: resolvedSessionKey,
runId,
});
}
}
if (!isRawModelRun) {
message = annotateInterSessionPromptText(message, inputProvenance);
}
const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId;
const ingressAgentId =
agentId &&
(!resolvedSessionKey || resolveAgentIdFromSessionKey(resolvedSessionKey) === agentId)
? agentId
: undefined;
if (requestedSessionKey && resolvedSessionKey && isNewSession) {
emitSessionsChanged(context, {
sessionKey: resolvedSessionKey,
reason: "create",
});
}
if (resolvedSessionKey) {
emitSessionsChanged(context, {
sessionKey: resolvedSessionKey,
reason: "send",
});
}
dispatchAgentRunFromGateway({
ingressOpts: {
message,
images,
imageOrder,
agentId: ingressAgentId,
provider: providerOverride,
model: modelOverride,
to: resolvedTo,
sessionId: resolvedSessionId,
sessionKey: resolvedSessionKey,
thinking: request.thinking,
deliver,
deliveryTargetMode,
channel: resolvedChannel,
accountId: resolvedAccountId,
threadId: resolvedThreadId,
runContext: {
messageChannel: originMessageChannel,
if (shouldPrependStartupContext && resolvedSessionKey) {
const { runtimeWorkspaceDir } = resolveSessionRuntimeWorkspace({
cfg: cfgForAgent ?? cfg,
sessionKey: resolvedSessionKey,
sessionEntry,
spawnedBy: spawnedByValue,
});
const startupContextPrelude = await buildSessionStartupContextPrelude({
workspaceDir: runtimeWorkspaceDir,
cfg: cfgForAgent ?? cfg,
});
if (startupContextPrelude) {
message = `${startupContextPrelude}\n\n${message}`;
}
}
if (!isRawModelRun) {
message = annotateInterSessionPromptText(message, inputProvenance);
}
const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId;
const ingressAgentId =
agentId &&
(!resolvedSessionKey || resolveAgentIdFromSessionKey(resolvedSessionKey) === agentId)
? agentId
: undefined;
dispatchAgentRunFromGateway({
ingressOpts: {
message,
images,
imageOrder,
agentId: ingressAgentId,
provider: providerOverride,
model: modelOverride,
to: resolvedTo,
sessionId: resolvedSessionId,
sessionKey: resolvedSessionKey,
thinking: request.thinking,
deliver,
deliveryTargetMode,
channel: resolvedChannel,
accountId: resolvedAccountId,
threadId: resolvedThreadId,
runContext: {
messageChannel: originMessageChannel,
accountId: resolvedAccountId,
groupId: resolvedGroupId,
groupChannel: resolvedGroupChannel,
groupSpace: resolvedGroupSpace,
currentThreadTs: resolvedThreadId != null ? String(resolvedThreadId) : undefined,
},
groupId: resolvedGroupId,
groupChannel: resolvedGroupChannel,
groupSpace: resolvedGroupSpace,
currentThreadTs: resolvedThreadId != null ? String(resolvedThreadId) : undefined,
},
groupId: resolvedGroupId,
groupChannel: resolvedGroupChannel,
groupSpace: resolvedGroupSpace,
spawnedBy: spawnedByValue,
timeout: request.timeout?.toString(),
bestEffortDeliver,
messageChannel: originMessageChannel,
runId,
lane: request.lane,
modelRun: request.modelRun === true,
promptMode: request.promptMode,
extraSystemPrompt: request.extraSystemPrompt,
bootstrapContextMode: request.bootstrapContextMode,
bootstrapContextRunKind: request.bootstrapContextRunKind,
acpTurnSource: request.acpTurnSource,
internalEvents: request.internalEvents,
inputProvenance,
abortSignal: activeRunAbort.controller.signal,
// Internal-only: allow workspace override for spawned subagent runs.
workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({
spawnedBy: spawnedByValue,
workspaceDir: sessionEntry?.spawnedWorkspaceDir,
}),
senderIsOwner,
allowModelOverride,
},
runId,
idempotencyKey: idem,
abortController: activeRunAbort.controller,
respond,
context,
});
dispatched = true;
} finally {
if (!dispatched) {
activeRunAbort.cleanup();
timeout: request.timeout?.toString(),
bestEffortDeliver,
messageChannel: originMessageChannel,
runId,
lane: request.lane,
modelRun: request.modelRun === true,
promptMode: request.promptMode,
extraSystemPrompt: request.extraSystemPrompt,
bootstrapContextMode: request.bootstrapContextMode,
bootstrapContextRunKind: request.bootstrapContextRunKind,
acpTurnSource: request.acpTurnSource,
internalEvents: request.internalEvents,
inputProvenance,
abortSignal: activeRunAbort.controller.signal,
// Internal-only: allow workspace override for spawned subagent runs.
workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({
spawnedBy: spawnedByValue,
workspaceDir: sessionEntry?.spawnedWorkspaceDir,
}),
senderIsOwner,
allowModelOverride,
},
runId,
idempotencyKey: idem,
abortController: activeRunAbort.controller,
respond,
context,
});
dispatched = true;
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = {
runId,
status: "error" as const,
summary: String(err),
};
setGatewayDedupeEntry({
dedupe: context.dedupe,
key: `agent:${idem}`,
entry: {
ts: Date.now(),
ok: false,
payload,
error,
},
});
respond(false, payload, error, {
runId,
error: formatForLog(err),
});
} finally {
if (!dispatched) {
activeRunAbort.cleanup();
}
}
}
})();
},
"agent.identity.get": ({ params, respond, context }) => {
if (!validateAgentIdentityParams(params)) {