From 1f1f70a23f86a6c40290005030f83eeda571c88d Mon Sep 17 00:00:00 2001 From: Val Alexander <68980965+BunsDev@users.noreply.github.com> Date: Wed, 29 Apr 2026 22:55:19 -0500 Subject: [PATCH] fix(gateway): align sessions abort wait semantics (#74751) thanks @BunsDev Co-authored-by: Val Alexander <68980965+BunsDev@users.noreply.github.com> --- src/gateway/chat-abort.ts | 14 +++++++ src/gateway/server-methods/agent.test.ts | 2 + src/gateway/server-methods/chat.ts | 20 +++++----- src/gateway/server-methods/sessions.ts | 36 +++++++++++++++++- .../server.chat.gateway-server-chat-b.test.ts | 6 ++- .../server.chat.gateway-server-chat.test.ts | 37 +++++++++++++++++++ 6 files changed, 104 insertions(+), 11 deletions(-) diff --git a/src/gateway/chat-abort.ts b/src/gateway/chat-abort.ts index 96ad6201556..70bf1472122 100644 --- a/src/gateway/chat-abort.ts +++ b/src/gateway/chat-abort.ts @@ -1,4 +1,5 @@ import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js"; +import { emitAgentEvent } from "../infra/agent-events.js"; const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000; @@ -177,6 +178,19 @@ export function abortChatRunById( ops.chatDeltaLastBroadcastLen.delete(runId); const removed = ops.removeChatRun(runId, runId, sessionKey); broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText }); + emitAgentEvent({ + runId, + sessionKey, + stream: "lifecycle", + data: { + phase: "end", + status: "cancelled", + aborted: true, + stopReason, + startedAt: active.startedAtMs, + endedAt: Date.now(), + }, + }); ops.agentRunSeq.delete(runId); if (removed?.clientRunId) { ops.agentRunSeq.delete(removed.clientRunId); diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index dd2bdd45646..72de8ebe1b7 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -25,6 +25,7 @@ const mocks = vi.hoisted(() => ({ updateSessionStore: vi.fn(), agentCommand: vi.fn(), registerAgentRunContext: vi.fn(), + emitAgentEvent: vi.fn(), performGatewaySessionReset: vi.fn(), getLatestSubagentRunByChildSessionKey: vi.fn(), replaceSubagentRunAfterSteer: vi.fn(), @@ -102,6 +103,7 @@ vi.mock("../../auto-reply/reply/session-reset-prompt.js", async () => { }); vi.mock("../../infra/agent-events.js", () => ({ + emitAgentEvent: mocks.emitAgentEvent, registerAgentRunContext: mocks.registerAgentRunContext, onAgentEvent: vi.fn(), })); diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 2a5188684d5..9952eaea285 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -2519,15 +2519,17 @@ export const chatHandlers: GatewayRequestHandlers = { } else { void emitUserTranscriptUpdate(); } - setGatewayDedupeEntry({ - dedupe: context.dedupe, - key: `chat:${clientRunId}`, - entry: { - ts: Date.now(), - ok: true, - payload: { runId: clientRunId, status: "ok" as const }, - }, - }); + if (!context.chatAbortedRuns.has(clientRunId)) { + setGatewayDedupeEntry({ + dedupe: context.dedupe, + key: `chat:${clientRunId}`, + entry: { + ts: Date.now(), + ok: true, + payload: { runId: clientRunId, status: "ok" as const }, + }, + }); + } }) .catch((err) => { void rewriteUserTranscriptMedia().catch((rewriteErr) => { diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index a154067e427..42eb0acfabf 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -91,6 +91,7 @@ import { } from "../session-utils.js"; import { applySessionsPatchToStore } from "../sessions-patch.js"; import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js"; +import { setGatewayDedupeEntry } from "./agent-wait-dedupe.js"; import { chatHandlers } from "./chat.js"; import type { GatewayClient, @@ -1314,6 +1315,19 @@ export const sessionsHandlers: GatewayRequestHandlers = { canonicalKey, runId: requestedRunId, }); + // Capture run kinds before the abort because abortChatRunById deletes entries + // from chatAbortControllers synchronously. We use this snapshot to choose the + // correct dedupe namespace: agent-kind runs use "agent:" (their runId equals + // their idempotency key), while chat-send runs use "chat:" so the abort + // snapshot does not collide with the agent RPC dedupe cache. + const preAbortRunKinds = new Map(); + if (requestedRunId) { + preAbortRunKinds.set(requestedRunId, context.chatAbortControllers.get(requestedRunId)?.kind); + } else { + for (const [rid, entry] of context.chatAbortControllers) { + preAbortRunKinds.set(rid, entry.kind); + } + } let abortedRunId: string | null = null; await chatHandlers["chat.abort"]({ req, @@ -1334,7 +1348,27 @@ export const sessionsHandlers: GatewayRequestHandlers = { Boolean(normalizeOptionalString(value)), ) : []; - abortedRunId = runIds[0] ?? null; + const firstAbortedRunId = runIds[0] ?? null; + abortedRunId = firstAbortedRunId; + if (firstAbortedRunId) { + const endedAt = Date.now(); + const runKind = preAbortRunKinds.get(firstAbortedRunId); + const dedupePrefix = runKind === "agent" ? "agent" : "chat"; + setGatewayDedupeEntry({ + dedupe: context.dedupe, + key: `${dedupePrefix}:${firstAbortedRunId}`, + entry: { + ts: endedAt, + ok: true, + payload: { + status: "timeout", + runId: firstAbortedRunId, + stopReason: "rpc", + endedAt, + }, + }, + }); + } respond( true, { diff --git a/src/gateway/server.chat.gateway-server-chat-b.test.ts b/src/gateway/server.chat.gateway-server-chat-b.test.ts index 9cbfa9330ff..aad7f9f66c4 100644 --- a/src/gateway/server.chat.gateway-server-chat-b.test.ts +++ b/src/gateway/server.chat.gateway-server-chat-b.test.ts @@ -78,7 +78,11 @@ async function withGatewayChatHarness( clearConfigCache(); testState.sessionStorePath = undefined; ws.close(); - await Promise.all(tempDirs.map((dir) => fs.rm(dir, { recursive: true, force: true }))); + await Promise.all( + tempDirs.map((dir) => + fs.rm(dir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 }), + ), + ); } } diff --git a/src/gateway/server.chat.gateway-server-chat.test.ts b/src/gateway/server.chat.gateway-server-chat.test.ts index 45a8c24ed9d..d988c13da80 100644 --- a/src/gateway/server.chat.gateway-server-chat.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.test.ts @@ -275,6 +275,26 @@ describe("gateway server chat", () => { }); expect(sendRes.ok).toBe(true); + const cancelledEventP = onceMessage( + ws, + (o) => { + const data = + o.payload?.data && typeof o.payload.data === "object" + ? (o.payload.data as Record) + : {}; + return ( + o.type === "event" && + o.event === "agent" && + o.payload?.runId === "idem-sessions-abort-1" && + o.payload?.stream === "lifecycle" && + data.phase === "end" && + data.stopReason === "rpc" + ); + }, + 8000, + ); + void cancelledEventP.catch(() => undefined); + const abortRes = await rpcReq(ws, "sessions.abort", { key: "agent:main:dashboard:test-abort", runId: "idem-sessions-abort-1", @@ -283,6 +303,23 @@ describe("gateway server chat", () => { expect(["aborted", "no-active-run"]).toContain(abortRes.payload?.status); if (abortRes.payload?.status === "aborted") { expect(abortRes.payload?.abortedRunId).toBe("idem-sessions-abort-1"); + const cancelledEvent = await cancelledEventP; + expect(cancelledEvent.payload?.data).toMatchObject({ + phase: "end", + status: "cancelled", + aborted: true, + stopReason: "rpc", + }); + const waitRes = await rpcReq(ws, "agent.wait", { + runId: "idem-sessions-abort-1", + timeoutMs: 0, + }); + expect(waitRes.ok).toBe(true); + expect(waitRes.payload).toMatchObject({ + runId: "idem-sessions-abort-1", + status: "timeout", + stopReason: "rpc", + }); } else { expect(abortRes.payload?.abortedRunId).toBeNull(); }