From 8cae2ed645ed8edfd108e3f7833d8498ff9d7f24 Mon Sep 17 00:00:00 2001 From: bitloi <89318445+bitloi@users.noreply.github.com> Date: Fri, 24 Apr 2026 16:15:56 -0300 Subject: [PATCH] fix(gateway): allow chat.abort to stop agent RPC runs Register agent RPC runs in the shared abort controller map so chat.abort and sessions.abort can interrupt them like chat.send runs. Also centralize abort-controller registration/owned cleanup, preserve agent timeout semantics for maintenance expiry, and cover pre-dispatch failure cleanup with regression tests. Fixes #71128. --- CHANGELOG.md | 1 + src/gateway/chat-abort.ts | 79 ++++- src/gateway/server-methods/agent.test.ts | 312 +++++++++++++++++++- src/gateway/server-methods/agent.ts | 228 ++++++++------ src/gateway/server-methods/chat.ts | 17 +- src/gateway/server-runtime-subscriptions.ts | 8 +- 6 files changed, 544 insertions(+), 101 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2927975a1a0..befacce67fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,7 @@ Docs: https://docs.openclaw.ai - Plugins/providers: mirror runtime auth choices in bundled provider manifests and detect `KIMI_API_KEY` for Moonshot/Kimi web search before plugin runtime loads. Thanks @vincentkoc. - Gateway/chat: register chat.send runs in the chat run registry so lifecycle error events reach the client instead of being silently dropped, fixing stuck 'waiting' state and /abort reporting no active run. (#69747) Thanks @wangshu94. - Plugins/QQ Bot: enable the bundled qqbot plugin by default so its runtime dependency `@tencent-connect/qqbot-connector` is installed on first launch, unblocking the QR-code binding flow that dynamically imports the connector before any account is configured. (#71051) Thanks @cxyhhhhh. +- Gateway/agent RPC: register active `agent` runs into the chat abort controller map so `chat.abort` and `sessions.abort` can interrupt them, matching `chat.send` behavior and unblocking external runtimes that drive the Gateway through the public `agent` RPC. Fixes #71128. (#71214) Thanks @bitloi. ## 2026.4.23 diff --git a/src/gateway/chat-abort.ts b/src/gateway/chat-abort.ts index 25c0cde1150..96ad6201556 100644 --- a/src/gateway/chat-abort.ts +++ b/src/gateway/chat-abort.ts @@ -1,5 +1,7 @@ import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js"; +const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000; + export type ChatAbortControllerEntry = { controller: AbortController; sessionId: string; @@ -8,6 +10,20 @@ export type ChatAbortControllerEntry = { expiresAtMs: number; ownerConnId?: string; ownerDeviceId?: string; + /** + * Which RPC owns this registration. Absent (undefined) is treated as + * `"chat-send"` so pre-existing callers that constructed entries without + * a kind keep their behavior. Consumers that need "chat.send specifically + * is active" must check `kind !== "agent"`, not just `.has(runId)`. + */ + kind?: "chat-send" | "agent"; +}; + +export type RegisteredChatAbortController = { + controller: AbortController; + registered: boolean; + entry?: ChatAbortControllerEntry; + cleanup: () => void; }; export function isChatStopCommandText(text: string): boolean { @@ -21,7 +37,13 @@ export function resolveChatRunExpiresAtMs(params: { minMs?: number; maxMs?: number; }): number { - const { now, timeoutMs, graceMs = 60_000, minMs = 2 * 60_000, maxMs = 24 * 60 * 60_000 } = params; + const { + now, + timeoutMs, + graceMs = DEFAULT_CHAT_RUN_ABORT_GRACE_MS, + minMs = 2 * 60_000, + maxMs = 24 * 60 * 60_000, + } = params; const boundedTimeoutMs = Math.max(0, timeoutMs); const target = now + boundedTimeoutMs + graceMs; const min = now + minMs; @@ -29,6 +51,61 @@ export function resolveChatRunExpiresAtMs(params: { return Math.min(max, Math.max(min, target)); } +export function resolveAgentRunExpiresAtMs(params: { + now: number; + timeoutMs: number; + graceMs?: number; +}): number { + const graceMs = Math.max(0, params.graceMs ?? DEFAULT_CHAT_RUN_ABORT_GRACE_MS); + return resolveChatRunExpiresAtMs({ + now: params.now, + timeoutMs: params.timeoutMs, + graceMs, + minMs: graceMs, + maxMs: Math.max(0, params.timeoutMs) + graceMs, + }); +} + +export function registerChatAbortController(params: { + chatAbortControllers: Map; + runId: string; + sessionId: string; + sessionKey?: string | null; + timeoutMs: number; + ownerConnId?: string; + ownerDeviceId?: string; + kind?: ChatAbortControllerEntry["kind"]; + now?: number; + expiresAtMs?: number; +}): RegisteredChatAbortController { + const controller = new AbortController(); + const cleanup = () => { + const entry = params.chatAbortControllers.get(params.runId); + if (entry?.controller === controller) { + params.chatAbortControllers.delete(params.runId); + } + }; + + if (!params.sessionKey || params.chatAbortControllers.has(params.runId)) { + return { controller, registered: false, cleanup }; + } + + const now = params.now ?? Date.now(); + const entry: ChatAbortControllerEntry = { + controller, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + startedAtMs: now, + expiresAtMs: + params.expiresAtMs ?? resolveChatRunExpiresAtMs({ now, timeoutMs: params.timeoutMs }), + ownerConnId: params.ownerConnId, + ownerDeviceId: params.ownerDeviceId, + kind: params.kind, + }; + params.chatAbortControllers.set(params.runId, entry); + return { controller, registered: true, entry, cleanup }; +} + export type ChatAbortOps = { chatAbortControllers: Map; chatRunBuffers: Map; diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index 235a977aaf8..5c5cc03208c 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -9,6 +9,7 @@ import { import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js"; import { withTempDir } from "../../test-helpers/temp-dir.js"; import { agentHandlers } from "./agent.js"; +import { chatHandlers } from "./chat.js"; import { expectSubagentFollowupReactivation } from "./subagent-followup.test-helpers.js"; import type { GatewayRequestContext } from "./types.js"; @@ -125,7 +126,16 @@ const makeContext = (): GatewayRequestContext => ({ dedupe: new Map(), addChatRun: vi.fn(), - logGateway: { info: vi.fn(), error: vi.fn() }, + removeChatRun: vi.fn(), + chatAbortControllers: new Map(), + chatRunBuffers: new Map(), + chatDeltaSentAt: new Map(), + chatDeltaLastBroadcastLen: new Map(), + chatAbortedRuns: new Map(), + agentRunSeq: new Map(), + broadcast: vi.fn(), + nodeSendToSession: vi.fn(), + logGateway: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, broadcastToConnIds: vi.fn(), getSessionEventSubscriberConnIds: () => new Set(), }) as unknown as GatewayRequestContext; @@ -554,6 +564,7 @@ describe("gateway agent handler", () => { context: { dedupe: new Map(), addChatRun: vi.fn(), + chatAbortControllers: new Map(), logGateway: { info: vi.fn(), error: vi.fn() }, broadcastToConnIds, getSessionEventSubscriberConnIds: () => new Set(["conn-1"]), @@ -634,6 +645,7 @@ describe("gateway agent handler", () => { context: { dedupe: new Map(), addChatRun: vi.fn(), + chatAbortControllers: new Map(), logGateway: { info: vi.fn(), error: vi.fn() }, broadcastToConnIds, getSessionEventSubscriberConnIds: () => new Set(["conn-1"]), @@ -774,6 +786,7 @@ describe("gateway agent handler", () => { context: { dedupe: new Map(), addChatRun: vi.fn(), + chatAbortControllers: new Map(), logGateway: { info: logInfo, error: vi.fn() }, broadcastToConnIds: vi.fn(), getSessionEventSubscriberConnIds: () => new Set(), @@ -1541,3 +1554,300 @@ describe("gateway agent handler", () => { ); }); }); + +describe("gateway agent handler chat.abort integration", () => { + afterEach(() => { + mocks.agentCommand.mockReset(); + mocks.getLatestSubagentRunByChildSessionKey.mockReset(); + mocks.replaceSubagentRunAfterSteer.mockReset(); + }); + + function prime(sessionId = "existing-session-id", cfg: Record = {}) { + mockMainSessionEntry({ sessionId }, cfg); + mocks.updateSessionStore.mockResolvedValue(undefined); + } + + it("registers an abort controller into chatAbortControllers for an agent run", async () => { + prime(); + const pending = new Promise(() => {}); + mocks.agentCommand.mockReturnValueOnce(pending); + + const context = makeContext(); + const runId = "idem-abort-register"; + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { + context, + reqId: runId, + client: { connId: "conn-1" } as AgentHandlerArgs["client"], + }, + ); + + const entry = context.chatAbortControllers.get(runId); + expect(entry).toBeDefined(); + expect(entry?.sessionKey).toBe("agent:main:main"); + expect(entry?.sessionId).toBe("existing-session-id"); + expect(entry?.ownerConnId).toBe("conn-1"); + expect(entry?.controller.signal.aborted).toBe(false); + expect((entry?.expiresAtMs ?? 0) - (entry?.startedAtMs ?? 0)).toBeGreaterThan(24 * 60 * 60_000); + }); + + it("uses the explicit no-timeout agent expiry instead of the chat 24h cap", async () => { + prime(); + mocks.agentCommand.mockReturnValueOnce(new Promise(() => {})); + + const context = makeContext(); + const runId = "idem-abort-no-timeout"; + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + timeout: 0, + }, + { context, reqId: runId }, + ); + + const entry = context.chatAbortControllers.get(runId); + expect(entry).toBeDefined(); + expect((entry?.expiresAtMs ?? 0) - (entry?.startedAtMs ?? 0)).toBeGreaterThan(24 * 60 * 60_000); + }); + + it("sets the maintenance expiry to the configured agent timeout, not the 24h chat default", async () => { + prime(); + const pending = new Promise(() => {}); + mocks.agentCommand.mockReturnValueOnce(pending); + + mocks.loadConfigReturn = { + agents: { defaults: { timeoutSeconds: 48 * 60 * 60 } }, + }; + const context = makeContext(); + const runId = "idem-abort-expires"; + const before = Date.now(); + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, reqId: runId }, + ); + mocks.loadConfigReturn = {}; + + const entry = context.chatAbortControllers.get(runId); + expect(entry).toBeDefined(); + // 48h configured timeout must not be silently truncated to the 24h + // chat.send default cap baked into resolveChatRunExpiresAtMs. Assert + // at least 25h to leave headroom above the 24h cap; the expected + // value is ~48h. + const TWENTY_FIVE_HOURS_MS = 25 * 60 * 60 * 1_000; + expect((entry?.expiresAtMs ?? 0) - before).toBeGreaterThan(TWENTY_FIVE_HOURS_MS); + }); + + it("chat.abort by runId aborts the agent run's signal and removes the entry", async () => { + prime(); + const pending = new Promise(() => {}); + let capturedSignal: AbortSignal | undefined; + mocks.agentCommand.mockImplementationOnce((opts: { abortSignal?: AbortSignal }) => { + capturedSignal = opts.abortSignal; + return pending; + }); + + const context = makeContext(); + const runId = "idem-abort-run"; + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, reqId: runId }, + ); + + expect(context.chatAbortControllers.has(runId)).toBe(true); + expect(capturedSignal?.aborted).toBe(false); + + const abortRespond = vi.fn(); + await chatHandlers["chat.abort"]({ + params: { sessionKey: "agent:main:main", runId }, + respond: abortRespond as never, + context, + req: { type: "req", id: "abort-req", method: "chat.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expect(abortRespond).toHaveBeenCalledWith( + true, + expect.objectContaining({ aborted: true, runIds: [runId] }), + ); + expect(capturedSignal?.aborted).toBe(true); + expect(context.chatAbortControllers.has(runId)).toBe(false); + }); + + it("chat.abort without runId aborts the active agent run for the sessionKey", async () => { + prime(); + let capturedSignal: AbortSignal | undefined; + mocks.agentCommand.mockImplementationOnce((opts: { abortSignal?: AbortSignal }) => { + capturedSignal = opts.abortSignal; + return new Promise(() => {}); + }); + + const context = makeContext(); + const runId = "idem-abort-session"; + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, reqId: runId }, + ); + + const abortRespond = vi.fn(); + await chatHandlers["chat.abort"]({ + params: { sessionKey: "agent:main:main" }, + respond: abortRespond as never, + context, + req: { type: "req", id: "abort-req", method: "chat.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expect(abortRespond).toHaveBeenCalledWith( + true, + expect.objectContaining({ aborted: true, runIds: [runId] }), + ); + expect(capturedSignal?.aborted).toBe(true); + }); + + it("removes the chatAbortControllers entry after the run completes successfully", async () => { + prime(); + mocks.agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "ok" }], + meta: { durationMs: 1 }, + }); + + const context = makeContext(); + const runId = "idem-abort-cleanup-ok"; + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, reqId: runId }, + ); + + await waitForAssertion(() => { + expect(context.chatAbortControllers.has(runId)).toBe(false); + }); + }); + + it("removes the chatAbortControllers entry after the run errors", async () => { + prime(); + mocks.agentCommand.mockRejectedValueOnce(new Error("boom")); + + const context = makeContext(); + const runId = "idem-abort-cleanup-err"; + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, reqId: runId }, + ); + + await waitForAssertion(() => { + expect(context.chatAbortControllers.has(runId)).toBe(false); + }); + }); + + it("removes the chatAbortControllers entry if pre-dispatch reactivation fails", async () => { + prime("reactivation-session"); + mocks.getLatestSubagentRunByChildSessionKey.mockReturnValueOnce({ + runId: "previous-run", + childSessionKey: "agent:main:main", + controllerSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", + requesterDisplayKey: "main", + task: "old task", + cleanup: "keep", + createdAt: 1, + startedAt: 2, + endedAt: 3, + outcome: { status: "ok" }, + }); + mocks.replaceSubagentRunAfterSteer.mockRejectedValueOnce(new Error("reactivate boom")); + + const context = makeContext(); + const runId = "idem-abort-reactivation-fails"; + await expect( + invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, reqId: runId }, + ), + ).rejects.toThrow("reactivate boom"); + + expect(context.chatAbortControllers.has(runId)).toBe(false); + expect(mocks.agentCommand).not.toHaveBeenCalled(); + }); + + it("does not overwrite or evict a pre-existing chatAbortControllers entry with the same runId", async () => { + prime(); + mocks.agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "ok" }], + meta: { durationMs: 1 }, + }); + + const context = makeContext(); + const runId = "idem-abort-collision"; + const preExisting = { + controller: new AbortController(), + sessionId: "chat-send-session", + sessionKey: "agent:main:main", + startedAtMs: Date.now(), + expiresAtMs: Date.now() + 60_000, + ownerConnId: "chat-send-conn", + ownerDeviceId: undefined, + }; + context.chatAbortControllers.set(runId, preExisting); + + await invokeAgent( + { + message: "hi", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: runId, + }, + { context, reqId: runId }, + ); + + expect(context.chatAbortControllers.get(runId)).toBe(preExisting); + // Cleanup after the agent run completes must not evict the pre-existing + // entry owned by a concurrent chat.send. + await waitForAssertion(() => { + expect(mocks.agentCommand).toHaveBeenCalled(); + }); + await new Promise((resolve) => setImmediate(resolve)); + expect(context.chatAbortControllers.get(runId)).toBe(preExisting); + }); +}); diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 6e235d06968..47d6c105d6b 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -5,6 +5,7 @@ import { normalizeSpawnedRunMetadata, resolveIngressWorkspaceOverrideForSpawnedRun, } from "../../agents/spawned-context.js"; +import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; import { resolveBareResetBootstrapFileAccess, resolveBareSessionResetPromptState, @@ -58,6 +59,7 @@ import { normalizeMessageChannel, } from "../../utils/message-channel.js"; import { resolveAssistantIdentity } from "../assistant-identity.js"; +import { registerChatAbortController, resolveAgentRunExpiresAtMs } from "../chat-abort.js"; import { MediaOffloadError, parseMessageWithAttachments } from "../chat-attachments.js"; import { resolveAssistantAvatarUrl } from "../control-ui-shared.js"; import { ADMIN_SCOPE } from "../method-scopes.js"; @@ -230,6 +232,12 @@ function dispatchAgentRunFromGateway(params: { ingressOpts: Parameters[0]; runId: string; idempotencyKey: string; + /** + * Controller whose signal is wired into `ingressOpts.abortSignal`. Used on + * completion to drop the matching `chatAbortControllers` entry without + * touching a same-runId entry owned by a concurrent chat.send. + */ + abortController: AbortController; respond: GatewayRequestHandlerOptions["respond"]; context: GatewayRequestHandlerOptions["context"]; }) { @@ -301,6 +309,12 @@ function dispatchAgentRunFromGateway(params: { runId: params.runId, error: formatForLog(err), }); + }) + .finally(() => { + const entry = params.context.chatAbortControllers.get(params.runId); + if (entry?.controller === params.abortController) { + params.context.chatAbortControllers.delete(params.runId); + } }); } @@ -862,6 +876,28 @@ export const agentHandlers: GatewayRequestHandlers = { const deliver = request.deliver === true && resolvedChannel !== INTERNAL_MESSAGE_CHANNEL; + // Register before the accepted ack so an immediate chat.abort/sessions.abort + // cannot race the active-run entry. Agent RPC runs use the agent timeout; + // chat.send keeps the shorter chat cleanup cap. + const now = Date.now(); + const timeoutMs = resolveAgentTimeoutMs({ + cfg: cfgForAgent ?? cfg, + overrideSeconds: typeof request.timeout === "number" ? request.timeout : undefined, + }); + const activeRunAbort = registerChatAbortController({ + chatAbortControllers: context.chatAbortControllers, + runId, + sessionId: resolvedSessionId ?? runId, + sessionKey: resolvedSessionKey, + timeoutMs, + now, + expiresAtMs: resolveAgentRunExpiresAtMs({ now, timeoutMs }), + ownerConnId: typeof client?.connId === "string" ? client.connId : undefined, + ownerDeviceId: + typeof client?.connect?.device?.id === "string" ? client.connect.device.id : undefined, + kind: "agent", + }); + const accepted = { runId, status: "accepted" as const, @@ -879,102 +915,112 @@ export const agentHandlers: GatewayRequestHandlers = { }); respond(true, accepted, undefined, { runId }); - if (resolvedSessionKey) { - await reactivateCompletedSubagentSession({ - sessionKey: resolvedSessionKey, - runId, - }); - } - - if (requestedSessionKey && resolvedSessionKey && isNewSession) { - emitSessionsChanged(context, { - sessionKey: resolvedSessionKey, - reason: "create", - }); - } - if (resolvedSessionKey) { - emitSessionsChanged(context, { - sessionKey: resolvedSessionKey, - reason: "send", - }); - } - - if (shouldPrependStartupContext && resolvedSessionKey) { - const { runtimeWorkspaceDir } = resolveSessionRuntimeWorkspace({ - cfg: cfgForAgent ?? cfg, - sessionKey: resolvedSessionKey, - sessionEntry, - spawnedBy: spawnedByValue, - }); - const startupContextPrelude = await buildSessionStartupContextPrelude({ - workspaceDir: runtimeWorkspaceDir, - cfg: cfgForAgent ?? cfg, - }); - if (startupContextPrelude) { - message = `${startupContextPrelude}\n\n${message}`; + let dispatched = false; + try { + if (resolvedSessionKey) { + await reactivateCompletedSubagentSession({ + sessionKey: resolvedSessionKey, + runId, + }); } - } - const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId; - const ingressAgentId = - agentId && - (!resolvedSessionKey || resolveAgentIdFromSessionKey(resolvedSessionKey) === agentId) - ? agentId - : undefined; + if (requestedSessionKey && resolvedSessionKey && isNewSession) { + emitSessionsChanged(context, { + sessionKey: resolvedSessionKey, + reason: "create", + }); + } + if (resolvedSessionKey) { + emitSessionsChanged(context, { + sessionKey: resolvedSessionKey, + reason: "send", + }); + } - dispatchAgentRunFromGateway({ - ingressOpts: { - message, - images, - imageOrder, - agentId: ingressAgentId, - provider: providerOverride, - model: modelOverride, - to: resolvedTo, - sessionId: resolvedSessionId, - sessionKey: resolvedSessionKey, - thinking: request.thinking, - deliver, - deliveryTargetMode, - channel: resolvedChannel, - accountId: resolvedAccountId, - threadId: resolvedThreadId, - runContext: { - messageChannel: originMessageChannel, + if (shouldPrependStartupContext && resolvedSessionKey) { + const { runtimeWorkspaceDir } = resolveSessionRuntimeWorkspace({ + cfg: cfgForAgent ?? cfg, + sessionKey: resolvedSessionKey, + sessionEntry, + spawnedBy: spawnedByValue, + }); + const startupContextPrelude = await buildSessionStartupContextPrelude({ + workspaceDir: runtimeWorkspaceDir, + cfg: cfgForAgent ?? cfg, + }); + if (startupContextPrelude) { + message = `${startupContextPrelude}\n\n${message}`; + } + } + + const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId; + const ingressAgentId = + agentId && + (!resolvedSessionKey || resolveAgentIdFromSessionKey(resolvedSessionKey) === agentId) + ? agentId + : undefined; + + dispatchAgentRunFromGateway({ + ingressOpts: { + message, + images, + imageOrder, + agentId: ingressAgentId, + provider: providerOverride, + model: modelOverride, + to: resolvedTo, + sessionId: resolvedSessionId, + sessionKey: resolvedSessionKey, + thinking: request.thinking, + deliver, + deliveryTargetMode, + channel: resolvedChannel, accountId: resolvedAccountId, + threadId: resolvedThreadId, + runContext: { + messageChannel: originMessageChannel, + accountId: resolvedAccountId, + groupId: resolvedGroupId, + groupChannel: resolvedGroupChannel, + groupSpace: resolvedGroupSpace, + currentThreadTs: resolvedThreadId != null ? String(resolvedThreadId) : undefined, + }, groupId: resolvedGroupId, groupChannel: resolvedGroupChannel, groupSpace: resolvedGroupSpace, - currentThreadTs: resolvedThreadId != null ? String(resolvedThreadId) : undefined, - }, - groupId: resolvedGroupId, - groupChannel: resolvedGroupChannel, - groupSpace: resolvedGroupSpace, - spawnedBy: spawnedByValue, - timeout: request.timeout?.toString(), - bestEffortDeliver, - messageChannel: originMessageChannel, - runId, - lane: request.lane, - cleanupBundleMcpOnRunEnd: request.cleanupBundleMcpOnRunEnd === true, - extraSystemPrompt: request.extraSystemPrompt, - bootstrapContextMode: request.bootstrapContextMode, - bootstrapContextRunKind: request.bootstrapContextRunKind, - internalEvents: request.internalEvents, - inputProvenance, - // Internal-only: allow workspace override for spawned subagent runs. - workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({ spawnedBy: spawnedByValue, - workspaceDir: sessionEntry?.spawnedWorkspaceDir, - }), - senderIsOwner, - allowModelOverride, - }, - runId, - idempotencyKey: idem, - respond, - context, - }); + timeout: request.timeout?.toString(), + bestEffortDeliver, + messageChannel: originMessageChannel, + runId, + lane: request.lane, + cleanupBundleMcpOnRunEnd: request.cleanupBundleMcpOnRunEnd === true, + extraSystemPrompt: request.extraSystemPrompt, + bootstrapContextMode: request.bootstrapContextMode, + bootstrapContextRunKind: request.bootstrapContextRunKind, + internalEvents: request.internalEvents, + inputProvenance, + abortSignal: activeRunAbort.controller.signal, + // Internal-only: allow workspace override for spawned subagent runs. + workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({ + spawnedBy: spawnedByValue, + workspaceDir: sessionEntry?.spawnedWorkspaceDir, + }), + senderIsOwner, + allowModelOverride, + }, + runId, + idempotencyKey: idem, + abortController: activeRunAbort.controller, + respond, + context, + }); + dispatched = true; + } finally { + if (!dispatched) { + activeRunAbort.cleanup(); + } + } }, "agent.identity.get": ({ params, respond }) => { if (!validateAgentIdentityParams(params)) { @@ -1048,7 +1094,11 @@ export const agentHandlers: GatewayRequestHandlers = { typeof p.timeoutMs === "number" && Number.isFinite(p.timeoutMs) ? Math.max(0, Math.floor(p.timeoutMs)) : 30_000; - const hasActiveChatRun = context.chatAbortControllers.has(runId); + // `hasActiveChatRun` drives snapshot preference, so it must reflect + // chat.send specifically — not an agent-kind entry registered by the + // `agent` RPC for its own abort surface. + const activeChatEntry = context.chatAbortControllers.get(runId); + const hasActiveChatRun = activeChatEntry !== undefined && activeChatEntry.kind !== "agent"; const cachedGatewaySnapshot = readTerminalSnapshotFromGatewayDedupe({ dedupe: context.dedupe, diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 5a8c7c608ee..bb69d33d4bd 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -49,7 +49,7 @@ import { type ChatAbortControllerEntry, type ChatAbortOps, isChatStopCommandText, - resolveChatRunExpiresAtMs, + registerChatAbortController, } from "../chat-abort.js"; import { type ChatImageContent, @@ -2238,15 +2238,16 @@ export const chatHandlers: GatewayRequestHandlers = { } try { - const abortController = new AbortController(); - context.chatAbortControllers.set(clientRunId, { - controller: abortController, + const activeRunAbort = registerChatAbortController({ + chatAbortControllers: context.chatAbortControllers, + runId: clientRunId, sessionId: entry?.sessionId ?? clientRunId, sessionKey: rawSessionKey, - startedAtMs: now, - expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }), + timeoutMs, + now, ownerConnId: normalizeOptionalText(client?.connId), ownerDeviceId: normalizeOptionalText(client?.connect?.device?.id), + kind: "chat-send", }); context.addChatRun(clientRunId, { sessionKey, @@ -2506,7 +2507,7 @@ export const chatHandlers: GatewayRequestHandlers = { dispatcher, replyOptions: { runId: clientRunId, - abortSignal: abortController.signal, + abortSignal: activeRunAbort.controller.signal, images: parsedImages.length > 0 ? parsedImages : undefined, imageOrder: imageOrder.length > 0 ? imageOrder : undefined, onAgentRunStart: (runId) => { @@ -2743,7 +2744,7 @@ export const chatHandlers: GatewayRequestHandlers = { }); }) .finally(() => { - context.chatAbortControllers.delete(clientRunId); + activeRunAbort.cleanup(); context.removeChatRun(clientRunId, clientRunId, sessionKey); }); } catch (err) { diff --git a/src/gateway/server-runtime-subscriptions.ts b/src/gateway/server-runtime-subscriptions.ts index c442133a32e..59335c13dcb 100644 --- a/src/gateway/server-runtime-subscriptions.ts +++ b/src/gateway/server-runtime-subscriptions.ts @@ -2,6 +2,7 @@ import { onAgentEvent } from "../infra/agent-events.js"; import { onHeartbeatEvent } from "../infra/heartbeat-events.js"; import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js"; import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; +import type { ChatAbortControllerEntry } from "./chat-abort.js"; import { createAgentEventHandler, type ChatRunState, @@ -30,7 +31,7 @@ export function startGatewayEventSubscriptions(params: { toolEventRecipients: ToolEventRecipientRegistry; sessionEventSubscribers: SessionEventSubscriberRegistry; sessionMessageSubscribers: SessionMessageSubscriberRegistry; - chatAbortControllers: Map; + chatAbortControllers: Map; }) { const agentUnsub = onAgentEvent( createAgentEventHandler({ @@ -43,7 +44,10 @@ export function startGatewayEventSubscriptions(params: { clearAgentRunContext: params.clearAgentRunContext, toolEventRecipients: params.toolEventRecipients, sessionEventSubscribers: params.sessionEventSubscribers, - isChatSendRunActive: (runId) => params.chatAbortControllers.has(runId), + isChatSendRunActive: (runId) => { + const entry = params.chatAbortControllers.get(runId); + return entry !== undefined && entry.kind !== "agent"; + }, }), );