fix(gateway): align sessions abort wait semantics (#74751) thanks @BunsDev

Co-authored-by: Val Alexander <68980965+BunsDev@users.noreply.github.com>
This commit is contained in:
Val Alexander
2026-04-29 22:55:19 -05:00
committed by GitHub
parent e6abd9e3d8
commit 1f1f70a23f
6 changed files with 104 additions and 11 deletions

View File

@@ -1,4 +1,5 @@
import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js"; import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js";
import { emitAgentEvent } from "../infra/agent-events.js";
const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000; const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000;
@@ -177,6 +178,19 @@ export function abortChatRunById(
ops.chatDeltaLastBroadcastLen.delete(runId); ops.chatDeltaLastBroadcastLen.delete(runId);
const removed = ops.removeChatRun(runId, runId, sessionKey); const removed = ops.removeChatRun(runId, runId, sessionKey);
broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText }); broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText });
emitAgentEvent({
runId,
sessionKey,
stream: "lifecycle",
data: {
phase: "end",
status: "cancelled",
aborted: true,
stopReason,
startedAt: active.startedAtMs,
endedAt: Date.now(),
},
});
ops.agentRunSeq.delete(runId); ops.agentRunSeq.delete(runId);
if (removed?.clientRunId) { if (removed?.clientRunId) {
ops.agentRunSeq.delete(removed.clientRunId); ops.agentRunSeq.delete(removed.clientRunId);

View File

@@ -25,6 +25,7 @@ const mocks = vi.hoisted(() => ({
updateSessionStore: vi.fn(), updateSessionStore: vi.fn(),
agentCommand: vi.fn(), agentCommand: vi.fn(),
registerAgentRunContext: vi.fn(), registerAgentRunContext: vi.fn(),
emitAgentEvent: vi.fn(),
performGatewaySessionReset: vi.fn(), performGatewaySessionReset: vi.fn(),
getLatestSubagentRunByChildSessionKey: vi.fn(), getLatestSubagentRunByChildSessionKey: vi.fn(),
replaceSubagentRunAfterSteer: vi.fn(), replaceSubagentRunAfterSteer: vi.fn(),
@@ -102,6 +103,7 @@ vi.mock("../../auto-reply/reply/session-reset-prompt.js", async () => {
}); });
vi.mock("../../infra/agent-events.js", () => ({ vi.mock("../../infra/agent-events.js", () => ({
emitAgentEvent: mocks.emitAgentEvent,
registerAgentRunContext: mocks.registerAgentRunContext, registerAgentRunContext: mocks.registerAgentRunContext,
onAgentEvent: vi.fn(), onAgentEvent: vi.fn(),
})); }));

View File

@@ -2519,15 +2519,17 @@ export const chatHandlers: GatewayRequestHandlers = {
} else { } else {
void emitUserTranscriptUpdate(); void emitUserTranscriptUpdate();
} }
setGatewayDedupeEntry({ if (!context.chatAbortedRuns.has(clientRunId)) {
dedupe: context.dedupe, setGatewayDedupeEntry({
key: `chat:${clientRunId}`, dedupe: context.dedupe,
entry: { key: `chat:${clientRunId}`,
ts: Date.now(), entry: {
ok: true, ts: Date.now(),
payload: { runId: clientRunId, status: "ok" as const }, ok: true,
}, payload: { runId: clientRunId, status: "ok" as const },
}); },
});
}
}) })
.catch((err) => { .catch((err) => {
void rewriteUserTranscriptMedia().catch((rewriteErr) => { void rewriteUserTranscriptMedia().catch((rewriteErr) => {

View File

@@ -91,6 +91,7 @@ import {
} from "../session-utils.js"; } from "../session-utils.js";
import { applySessionsPatchToStore } from "../sessions-patch.js"; import { applySessionsPatchToStore } from "../sessions-patch.js";
import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js"; import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js";
import { setGatewayDedupeEntry } from "./agent-wait-dedupe.js";
import { chatHandlers } from "./chat.js"; import { chatHandlers } from "./chat.js";
import type { import type {
GatewayClient, GatewayClient,
@@ -1314,6 +1315,19 @@ export const sessionsHandlers: GatewayRequestHandlers = {
canonicalKey, canonicalKey,
runId: requestedRunId, runId: requestedRunId,
}); });
// Capture run kinds before the abort because abortChatRunById deletes entries
// from chatAbortControllers synchronously. We use this snapshot to choose the
// correct dedupe namespace: agent-kind runs use "agent:" (their runId equals
// their idempotency key), while chat-send runs use "chat:" so the abort
// snapshot does not collide with the agent RPC dedupe cache.
const preAbortRunKinds = new Map<string, "chat-send" | "agent" | undefined>();
if (requestedRunId) {
preAbortRunKinds.set(requestedRunId, context.chatAbortControllers.get(requestedRunId)?.kind);
} else {
for (const [rid, entry] of context.chatAbortControllers) {
preAbortRunKinds.set(rid, entry.kind);
}
}
let abortedRunId: string | null = null; let abortedRunId: string | null = null;
await chatHandlers["chat.abort"]({ await chatHandlers["chat.abort"]({
req, req,
@@ -1334,7 +1348,27 @@ export const sessionsHandlers: GatewayRequestHandlers = {
Boolean(normalizeOptionalString(value)), Boolean(normalizeOptionalString(value)),
) )
: []; : [];
abortedRunId = runIds[0] ?? null; const firstAbortedRunId = runIds[0] ?? null;
abortedRunId = firstAbortedRunId;
if (firstAbortedRunId) {
const endedAt = Date.now();
const runKind = preAbortRunKinds.get(firstAbortedRunId);
const dedupePrefix = runKind === "agent" ? "agent" : "chat";
setGatewayDedupeEntry({
dedupe: context.dedupe,
key: `${dedupePrefix}:${firstAbortedRunId}`,
entry: {
ts: endedAt,
ok: true,
payload: {
status: "timeout",
runId: firstAbortedRunId,
stopReason: "rpc",
endedAt,
},
},
});
}
respond( respond(
true, true,
{ {

View File

@@ -78,7 +78,11 @@ async function withGatewayChatHarness(
clearConfigCache(); clearConfigCache();
testState.sessionStorePath = undefined; testState.sessionStorePath = undefined;
ws.close(); ws.close();
await Promise.all(tempDirs.map((dir) => fs.rm(dir, { recursive: true, force: true }))); await Promise.all(
tempDirs.map((dir) =>
fs.rm(dir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 }),
),
);
} }
} }

View File

@@ -275,6 +275,26 @@ describe("gateway server chat", () => {
}); });
expect(sendRes.ok).toBe(true); expect(sendRes.ok).toBe(true);
const cancelledEventP = onceMessage(
ws,
(o) => {
const data =
o.payload?.data && typeof o.payload.data === "object"
? (o.payload.data as Record<string, unknown>)
: {};
return (
o.type === "event" &&
o.event === "agent" &&
o.payload?.runId === "idem-sessions-abort-1" &&
o.payload?.stream === "lifecycle" &&
data.phase === "end" &&
data.stopReason === "rpc"
);
},
8000,
);
void cancelledEventP.catch(() => undefined);
const abortRes = await rpcReq(ws, "sessions.abort", { const abortRes = await rpcReq(ws, "sessions.abort", {
key: "agent:main:dashboard:test-abort", key: "agent:main:dashboard:test-abort",
runId: "idem-sessions-abort-1", runId: "idem-sessions-abort-1",
@@ -283,6 +303,23 @@ describe("gateway server chat", () => {
expect(["aborted", "no-active-run"]).toContain(abortRes.payload?.status); expect(["aborted", "no-active-run"]).toContain(abortRes.payload?.status);
if (abortRes.payload?.status === "aborted") { if (abortRes.payload?.status === "aborted") {
expect(abortRes.payload?.abortedRunId).toBe("idem-sessions-abort-1"); expect(abortRes.payload?.abortedRunId).toBe("idem-sessions-abort-1");
const cancelledEvent = await cancelledEventP;
expect(cancelledEvent.payload?.data).toMatchObject({
phase: "end",
status: "cancelled",
aborted: true,
stopReason: "rpc",
});
const waitRes = await rpcReq(ws, "agent.wait", {
runId: "idem-sessions-abort-1",
timeoutMs: 0,
});
expect(waitRes.ok).toBe(true);
expect(waitRes.payload).toMatchObject({
runId: "idem-sessions-abort-1",
status: "timeout",
stopReason: "rpc",
});
} else { } else {
expect(abortRes.payload?.abortedRunId).toBeNull(); expect(abortRes.payload?.abortedRunId).toBeNull();
} }