fix(gateway): align sessions abort wait semantics (#74751) thanks @BunsDev

Co-authored-by: Val Alexander <68980965+BunsDev@users.noreply.github.com>
This commit is contained in:
Val Alexander
2026-04-29 22:55:19 -05:00
committed by GitHub
parent e6abd9e3d8
commit 1f1f70a23f
6 changed files with 104 additions and 11 deletions

View File

@@ -1,4 +1,5 @@
import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js";
import { emitAgentEvent } from "../infra/agent-events.js";
const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000;
@@ -177,6 +178,19 @@ export function abortChatRunById(
ops.chatDeltaLastBroadcastLen.delete(runId);
const removed = ops.removeChatRun(runId, runId, sessionKey);
broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText });
emitAgentEvent({
runId,
sessionKey,
stream: "lifecycle",
data: {
phase: "end",
status: "cancelled",
aborted: true,
stopReason,
startedAt: active.startedAtMs,
endedAt: Date.now(),
},
});
ops.agentRunSeq.delete(runId);
if (removed?.clientRunId) {
ops.agentRunSeq.delete(removed.clientRunId);

View File

@@ -25,6 +25,7 @@ const mocks = vi.hoisted(() => ({
updateSessionStore: vi.fn(),
agentCommand: vi.fn(),
registerAgentRunContext: vi.fn(),
emitAgentEvent: vi.fn(),
performGatewaySessionReset: vi.fn(),
getLatestSubagentRunByChildSessionKey: vi.fn(),
replaceSubagentRunAfterSteer: vi.fn(),
@@ -102,6 +103,7 @@ vi.mock("../../auto-reply/reply/session-reset-prompt.js", async () => {
});
vi.mock("../../infra/agent-events.js", () => ({
emitAgentEvent: mocks.emitAgentEvent,
registerAgentRunContext: mocks.registerAgentRunContext,
onAgentEvent: vi.fn(),
}));

View File

@@ -2519,15 +2519,17 @@ export const chatHandlers: GatewayRequestHandlers = {
} else {
void emitUserTranscriptUpdate();
}
setGatewayDedupeEntry({
dedupe: context.dedupe,
key: `chat:${clientRunId}`,
entry: {
ts: Date.now(),
ok: true,
payload: { runId: clientRunId, status: "ok" as const },
},
});
if (!context.chatAbortedRuns.has(clientRunId)) {
setGatewayDedupeEntry({
dedupe: context.dedupe,
key: `chat:${clientRunId}`,
entry: {
ts: Date.now(),
ok: true,
payload: { runId: clientRunId, status: "ok" as const },
},
});
}
})
.catch((err) => {
void rewriteUserTranscriptMedia().catch((rewriteErr) => {

View File

@@ -91,6 +91,7 @@ import {
} from "../session-utils.js";
import { applySessionsPatchToStore } from "../sessions-patch.js";
import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js";
import { setGatewayDedupeEntry } from "./agent-wait-dedupe.js";
import { chatHandlers } from "./chat.js";
import type {
GatewayClient,
@@ -1314,6 +1315,19 @@ export const sessionsHandlers: GatewayRequestHandlers = {
canonicalKey,
runId: requestedRunId,
});
// Capture run kinds before the abort because abortChatRunById deletes entries
// from chatAbortControllers synchronously. We use this snapshot to choose the
// correct dedupe namespace: agent-kind runs use "agent:" (their runId equals
// their idempotency key), while chat-send runs use "chat:" so the abort
// snapshot does not collide with the agent RPC dedupe cache.
const preAbortRunKinds = new Map<string, "chat-send" | "agent" | undefined>();
if (requestedRunId) {
preAbortRunKinds.set(requestedRunId, context.chatAbortControllers.get(requestedRunId)?.kind);
} else {
for (const [rid, entry] of context.chatAbortControllers) {
preAbortRunKinds.set(rid, entry.kind);
}
}
let abortedRunId: string | null = null;
await chatHandlers["chat.abort"]({
req,
@@ -1334,7 +1348,27 @@ export const sessionsHandlers: GatewayRequestHandlers = {
Boolean(normalizeOptionalString(value)),
)
: [];
abortedRunId = runIds[0] ?? null;
const firstAbortedRunId = runIds[0] ?? null;
abortedRunId = firstAbortedRunId;
if (firstAbortedRunId) {
const endedAt = Date.now();
const runKind = preAbortRunKinds.get(firstAbortedRunId);
const dedupePrefix = runKind === "agent" ? "agent" : "chat";
setGatewayDedupeEntry({
dedupe: context.dedupe,
key: `${dedupePrefix}:${firstAbortedRunId}`,
entry: {
ts: endedAt,
ok: true,
payload: {
status: "timeout",
runId: firstAbortedRunId,
stopReason: "rpc",
endedAt,
},
},
});
}
respond(
true,
{

View File

@@ -78,7 +78,11 @@ async function withGatewayChatHarness(
clearConfigCache();
testState.sessionStorePath = undefined;
ws.close();
await Promise.all(tempDirs.map((dir) => fs.rm(dir, { recursive: true, force: true })));
await Promise.all(
tempDirs.map((dir) =>
fs.rm(dir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 }),
),
);
}
}

View File

@@ -275,6 +275,26 @@ describe("gateway server chat", () => {
});
expect(sendRes.ok).toBe(true);
const cancelledEventP = onceMessage(
ws,
(o) => {
const data =
o.payload?.data && typeof o.payload.data === "object"
? (o.payload.data as Record<string, unknown>)
: {};
return (
o.type === "event" &&
o.event === "agent" &&
o.payload?.runId === "idem-sessions-abort-1" &&
o.payload?.stream === "lifecycle" &&
data.phase === "end" &&
data.stopReason === "rpc"
);
},
8000,
);
void cancelledEventP.catch(() => undefined);
const abortRes = await rpcReq(ws, "sessions.abort", {
key: "agent:main:dashboard:test-abort",
runId: "idem-sessions-abort-1",
@@ -283,6 +303,23 @@ describe("gateway server chat", () => {
expect(["aborted", "no-active-run"]).toContain(abortRes.payload?.status);
if (abortRes.payload?.status === "aborted") {
expect(abortRes.payload?.abortedRunId).toBe("idem-sessions-abort-1");
const cancelledEvent = await cancelledEventP;
expect(cancelledEvent.payload?.data).toMatchObject({
phase: "end",
status: "cancelled",
aborted: true,
stopReason: "rpc",
});
const waitRes = await rpcReq(ws, "agent.wait", {
runId: "idem-sessions-abort-1",
timeoutMs: 0,
});
expect(waitRes.ok).toBe(true);
expect(waitRes.payload).toMatchObject({
runId: "idem-sessions-abort-1",
status: "timeout",
stopReason: "rpc",
});
} else {
expect(abortRes.payload?.abortedRunId).toBeNull();
}