fix(gateway): allow chat.abort to stop agent RPC runs

Register agent RPC runs in the shared abort controller map so chat.abort and sessions.abort can interrupt them like chat.send runs.

Also centralize abort-controller registration/owned cleanup, preserve agent timeout semantics for maintenance expiry, and cover pre-dispatch failure cleanup with regression tests.

Fixes #71128.
This commit is contained in:
bitloi
2026-04-24 16:15:56 -03:00
committed by GitHub
parent 0e50fee996
commit 8cae2ed645
6 changed files with 544 additions and 101 deletions

View File

@@ -66,6 +66,7 @@ Docs: https://docs.openclaw.ai
- Plugins/providers: mirror runtime auth choices in bundled provider manifests and detect `KIMI_API_KEY` for Moonshot/Kimi web search before plugin runtime loads. Thanks @vincentkoc.
- Gateway/chat: register chat.send runs in the chat run registry so lifecycle error events reach the client instead of being silently dropped, fixing stuck 'waiting' state and /abort reporting no active run. (#69747) Thanks @wangshu94.
- Plugins/QQ Bot: enable the bundled qqbot plugin by default so its runtime dependency `@tencent-connect/qqbot-connector` is installed on first launch, unblocking the QR-code binding flow that dynamically imports the connector before any account is configured. (#71051) Thanks @cxyhhhhh.
- Gateway/agent RPC: register active `agent` runs into the chat abort controller map so `chat.abort` and `sessions.abort` can interrupt them, matching `chat.send` behavior and unblocking external runtimes that drive the Gateway through the public `agent` RPC. Fixes #71128. (#71214) Thanks @bitloi.
## 2026.4.23

View File

@@ -1,5 +1,7 @@
import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js";
const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000;
export type ChatAbortControllerEntry = {
controller: AbortController;
sessionId: string;
@@ -8,6 +10,20 @@ export type ChatAbortControllerEntry = {
expiresAtMs: number;
ownerConnId?: string;
ownerDeviceId?: string;
/**
* Which RPC owns this registration. Absent (undefined) is treated as
* `"chat-send"` so pre-existing callers that constructed entries without
* a kind keep their behavior. Consumers that need "chat.send specifically
* is active" must check `kind !== "agent"`, not just `.has(runId)`.
*/
kind?: "chat-send" | "agent";
};
export type RegisteredChatAbortController = {
controller: AbortController;
registered: boolean;
entry?: ChatAbortControllerEntry;
cleanup: () => void;
};
export function isChatStopCommandText(text: string): boolean {
@@ -21,7 +37,13 @@ export function resolveChatRunExpiresAtMs(params: {
minMs?: number;
maxMs?: number;
}): number {
const { now, timeoutMs, graceMs = 60_000, minMs = 2 * 60_000, maxMs = 24 * 60 * 60_000 } = params;
const {
now,
timeoutMs,
graceMs = DEFAULT_CHAT_RUN_ABORT_GRACE_MS,
minMs = 2 * 60_000,
maxMs = 24 * 60 * 60_000,
} = params;
const boundedTimeoutMs = Math.max(0, timeoutMs);
const target = now + boundedTimeoutMs + graceMs;
const min = now + minMs;
@@ -29,6 +51,61 @@ export function resolveChatRunExpiresAtMs(params: {
return Math.min(max, Math.max(min, target));
}
export function resolveAgentRunExpiresAtMs(params: {
now: number;
timeoutMs: number;
graceMs?: number;
}): number {
const graceMs = Math.max(0, params.graceMs ?? DEFAULT_CHAT_RUN_ABORT_GRACE_MS);
return resolveChatRunExpiresAtMs({
now: params.now,
timeoutMs: params.timeoutMs,
graceMs,
minMs: graceMs,
maxMs: Math.max(0, params.timeoutMs) + graceMs,
});
}
export function registerChatAbortController(params: {
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
runId: string;
sessionId: string;
sessionKey?: string | null;
timeoutMs: number;
ownerConnId?: string;
ownerDeviceId?: string;
kind?: ChatAbortControllerEntry["kind"];
now?: number;
expiresAtMs?: number;
}): RegisteredChatAbortController {
const controller = new AbortController();
const cleanup = () => {
const entry = params.chatAbortControllers.get(params.runId);
if (entry?.controller === controller) {
params.chatAbortControllers.delete(params.runId);
}
};
if (!params.sessionKey || params.chatAbortControllers.has(params.runId)) {
return { controller, registered: false, cleanup };
}
const now = params.now ?? Date.now();
const entry: ChatAbortControllerEntry = {
controller,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
startedAtMs: now,
expiresAtMs:
params.expiresAtMs ?? resolveChatRunExpiresAtMs({ now, timeoutMs: params.timeoutMs }),
ownerConnId: params.ownerConnId,
ownerDeviceId: params.ownerDeviceId,
kind: params.kind,
};
params.chatAbortControllers.set(params.runId, entry);
return { controller, registered: true, entry, cleanup };
}
export type ChatAbortOps = {
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatRunBuffers: Map<string, string>;

View File

@@ -9,6 +9,7 @@ import {
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
import { withTempDir } from "../../test-helpers/temp-dir.js";
import { agentHandlers } from "./agent.js";
import { chatHandlers } from "./chat.js";
import { expectSubagentFollowupReactivation } from "./subagent-followup.test-helpers.js";
import type { GatewayRequestContext } from "./types.js";
@@ -125,7 +126,16 @@ const makeContext = (): GatewayRequestContext =>
({
dedupe: new Map(),
addChatRun: vi.fn(),
logGateway: { info: vi.fn(), error: vi.fn() },
removeChatRun: vi.fn(),
chatAbortControllers: new Map(),
chatRunBuffers: new Map(),
chatDeltaSentAt: new Map(),
chatDeltaLastBroadcastLen: new Map(),
chatAbortedRuns: new Map(),
agentRunSeq: new Map(),
broadcast: vi.fn(),
nodeSendToSession: vi.fn(),
logGateway: { info: vi.fn(), warn: vi.fn(), error: vi.fn() },
broadcastToConnIds: vi.fn(),
getSessionEventSubscriberConnIds: () => new Set(),
}) as unknown as GatewayRequestContext;
@@ -554,6 +564,7 @@ describe("gateway agent handler", () => {
context: {
dedupe: new Map(),
addChatRun: vi.fn(),
chatAbortControllers: new Map(),
logGateway: { info: vi.fn(), error: vi.fn() },
broadcastToConnIds,
getSessionEventSubscriberConnIds: () => new Set(["conn-1"]),
@@ -634,6 +645,7 @@ describe("gateway agent handler", () => {
context: {
dedupe: new Map(),
addChatRun: vi.fn(),
chatAbortControllers: new Map(),
logGateway: { info: vi.fn(), error: vi.fn() },
broadcastToConnIds,
getSessionEventSubscriberConnIds: () => new Set(["conn-1"]),
@@ -774,6 +786,7 @@ describe("gateway agent handler", () => {
context: {
dedupe: new Map(),
addChatRun: vi.fn(),
chatAbortControllers: new Map(),
logGateway: { info: logInfo, error: vi.fn() },
broadcastToConnIds: vi.fn(),
getSessionEventSubscriberConnIds: () => new Set(),
@@ -1541,3 +1554,300 @@ describe("gateway agent handler", () => {
);
});
});
describe("gateway agent handler chat.abort integration", () => {
afterEach(() => {
mocks.agentCommand.mockReset();
mocks.getLatestSubagentRunByChildSessionKey.mockReset();
mocks.replaceSubagentRunAfterSteer.mockReset();
});
function prime(sessionId = "existing-session-id", cfg: Record<string, unknown> = {}) {
mockMainSessionEntry({ sessionId }, cfg);
mocks.updateSessionStore.mockResolvedValue(undefined);
}
it("registers an abort controller into chatAbortControllers for an agent run", async () => {
prime();
const pending = new Promise(() => {});
mocks.agentCommand.mockReturnValueOnce(pending);
const context = makeContext();
const runId = "idem-abort-register";
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{
context,
reqId: runId,
client: { connId: "conn-1" } as AgentHandlerArgs["client"],
},
);
const entry = context.chatAbortControllers.get(runId);
expect(entry).toBeDefined();
expect(entry?.sessionKey).toBe("agent:main:main");
expect(entry?.sessionId).toBe("existing-session-id");
expect(entry?.ownerConnId).toBe("conn-1");
expect(entry?.controller.signal.aborted).toBe(false);
expect((entry?.expiresAtMs ?? 0) - (entry?.startedAtMs ?? 0)).toBeGreaterThan(24 * 60 * 60_000);
});
it("uses the explicit no-timeout agent expiry instead of the chat 24h cap", async () => {
prime();
mocks.agentCommand.mockReturnValueOnce(new Promise(() => {}));
const context = makeContext();
const runId = "idem-abort-no-timeout";
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
timeout: 0,
},
{ context, reqId: runId },
);
const entry = context.chatAbortControllers.get(runId);
expect(entry).toBeDefined();
expect((entry?.expiresAtMs ?? 0) - (entry?.startedAtMs ?? 0)).toBeGreaterThan(24 * 60 * 60_000);
});
it("sets the maintenance expiry to the configured agent timeout, not the 24h chat default", async () => {
prime();
const pending = new Promise(() => {});
mocks.agentCommand.mockReturnValueOnce(pending);
mocks.loadConfigReturn = {
agents: { defaults: { timeoutSeconds: 48 * 60 * 60 } },
};
const context = makeContext();
const runId = "idem-abort-expires";
const before = Date.now();
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: runId },
);
mocks.loadConfigReturn = {};
const entry = context.chatAbortControllers.get(runId);
expect(entry).toBeDefined();
// 48h configured timeout must not be silently truncated to the 24h
// chat.send default cap baked into resolveChatRunExpiresAtMs. Assert
// at least 25h to leave headroom above the 24h cap; the expected
// value is ~48h.
const TWENTY_FIVE_HOURS_MS = 25 * 60 * 60 * 1_000;
expect((entry?.expiresAtMs ?? 0) - before).toBeGreaterThan(TWENTY_FIVE_HOURS_MS);
});
it("chat.abort by runId aborts the agent run's signal and removes the entry", async () => {
prime();
const pending = new Promise(() => {});
let capturedSignal: AbortSignal | undefined;
mocks.agentCommand.mockImplementationOnce((opts: { abortSignal?: AbortSignal }) => {
capturedSignal = opts.abortSignal;
return pending;
});
const context = makeContext();
const runId = "idem-abort-run";
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: runId },
);
expect(context.chatAbortControllers.has(runId)).toBe(true);
expect(capturedSignal?.aborted).toBe(false);
const abortRespond = vi.fn();
await chatHandlers["chat.abort"]({
params: { sessionKey: "agent:main:main", runId },
respond: abortRespond as never,
context,
req: { type: "req", id: "abort-req", method: "chat.abort" },
client: null,
isWebchatConnect: () => false,
});
expect(abortRespond).toHaveBeenCalledWith(
true,
expect.objectContaining({ aborted: true, runIds: [runId] }),
);
expect(capturedSignal?.aborted).toBe(true);
expect(context.chatAbortControllers.has(runId)).toBe(false);
});
it("chat.abort without runId aborts the active agent run for the sessionKey", async () => {
prime();
let capturedSignal: AbortSignal | undefined;
mocks.agentCommand.mockImplementationOnce((opts: { abortSignal?: AbortSignal }) => {
capturedSignal = opts.abortSignal;
return new Promise(() => {});
});
const context = makeContext();
const runId = "idem-abort-session";
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: runId },
);
const abortRespond = vi.fn();
await chatHandlers["chat.abort"]({
params: { sessionKey: "agent:main:main" },
respond: abortRespond as never,
context,
req: { type: "req", id: "abort-req", method: "chat.abort" },
client: null,
isWebchatConnect: () => false,
});
expect(abortRespond).toHaveBeenCalledWith(
true,
expect.objectContaining({ aborted: true, runIds: [runId] }),
);
expect(capturedSignal?.aborted).toBe(true);
});
it("removes the chatAbortControllers entry after the run completes successfully", async () => {
prime();
mocks.agentCommand.mockResolvedValueOnce({
payloads: [{ text: "ok" }],
meta: { durationMs: 1 },
});
const context = makeContext();
const runId = "idem-abort-cleanup-ok";
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: runId },
);
await waitForAssertion(() => {
expect(context.chatAbortControllers.has(runId)).toBe(false);
});
});
it("removes the chatAbortControllers entry after the run errors", async () => {
prime();
mocks.agentCommand.mockRejectedValueOnce(new Error("boom"));
const context = makeContext();
const runId = "idem-abort-cleanup-err";
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: runId },
);
await waitForAssertion(() => {
expect(context.chatAbortControllers.has(runId)).toBe(false);
});
});
it("removes the chatAbortControllers entry if pre-dispatch reactivation fails", async () => {
prime("reactivation-session");
mocks.getLatestSubagentRunByChildSessionKey.mockReturnValueOnce({
runId: "previous-run",
childSessionKey: "agent:main:main",
controllerSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
requesterDisplayKey: "main",
task: "old task",
cleanup: "keep",
createdAt: 1,
startedAt: 2,
endedAt: 3,
outcome: { status: "ok" },
});
mocks.replaceSubagentRunAfterSteer.mockRejectedValueOnce(new Error("reactivate boom"));
const context = makeContext();
const runId = "idem-abort-reactivation-fails";
await expect(
invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: runId },
),
).rejects.toThrow("reactivate boom");
expect(context.chatAbortControllers.has(runId)).toBe(false);
expect(mocks.agentCommand).not.toHaveBeenCalled();
});
it("does not overwrite or evict a pre-existing chatAbortControllers entry with the same runId", async () => {
prime();
mocks.agentCommand.mockResolvedValueOnce({
payloads: [{ text: "ok" }],
meta: { durationMs: 1 },
});
const context = makeContext();
const runId = "idem-abort-collision";
const preExisting = {
controller: new AbortController(),
sessionId: "chat-send-session",
sessionKey: "agent:main:main",
startedAtMs: Date.now(),
expiresAtMs: Date.now() + 60_000,
ownerConnId: "chat-send-conn",
ownerDeviceId: undefined,
};
context.chatAbortControllers.set(runId, preExisting);
await invokeAgent(
{
message: "hi",
agentId: "main",
sessionKey: "agent:main:main",
idempotencyKey: runId,
},
{ context, reqId: runId },
);
expect(context.chatAbortControllers.get(runId)).toBe(preExisting);
// Cleanup after the agent run completes must not evict the pre-existing
// entry owned by a concurrent chat.send.
await waitForAssertion(() => {
expect(mocks.agentCommand).toHaveBeenCalled();
});
await new Promise((resolve) => setImmediate(resolve));
expect(context.chatAbortControllers.get(runId)).toBe(preExisting);
});
});

View File

@@ -5,6 +5,7 @@ import {
normalizeSpawnedRunMetadata,
resolveIngressWorkspaceOverrideForSpawnedRun,
} from "../../agents/spawned-context.js";
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
import {
resolveBareResetBootstrapFileAccess,
resolveBareSessionResetPromptState,
@@ -58,6 +59,7 @@ import {
normalizeMessageChannel,
} from "../../utils/message-channel.js";
import { resolveAssistantIdentity } from "../assistant-identity.js";
import { registerChatAbortController, resolveAgentRunExpiresAtMs } from "../chat-abort.js";
import { MediaOffloadError, parseMessageWithAttachments } from "../chat-attachments.js";
import { resolveAssistantAvatarUrl } from "../control-ui-shared.js";
import { ADMIN_SCOPE } from "../method-scopes.js";
@@ -230,6 +232,12 @@ function dispatchAgentRunFromGateway(params: {
ingressOpts: Parameters<typeof agentCommandFromIngress>[0];
runId: string;
idempotencyKey: string;
/**
* Controller whose signal is wired into `ingressOpts.abortSignal`. Used on
* completion to drop the matching `chatAbortControllers` entry without
* touching a same-runId entry owned by a concurrent chat.send.
*/
abortController: AbortController;
respond: GatewayRequestHandlerOptions["respond"];
context: GatewayRequestHandlerOptions["context"];
}) {
@@ -301,6 +309,12 @@ function dispatchAgentRunFromGateway(params: {
runId: params.runId,
error: formatForLog(err),
});
})
.finally(() => {
const entry = params.context.chatAbortControllers.get(params.runId);
if (entry?.controller === params.abortController) {
params.context.chatAbortControllers.delete(params.runId);
}
});
}
@@ -862,6 +876,28 @@ export const agentHandlers: GatewayRequestHandlers = {
const deliver = request.deliver === true && resolvedChannel !== INTERNAL_MESSAGE_CHANNEL;
// Register before the accepted ack so an immediate chat.abort/sessions.abort
// cannot race the active-run entry. Agent RPC runs use the agent timeout;
// chat.send keeps the shorter chat cleanup cap.
const now = Date.now();
const timeoutMs = resolveAgentTimeoutMs({
cfg: cfgForAgent ?? cfg,
overrideSeconds: typeof request.timeout === "number" ? request.timeout : undefined,
});
const activeRunAbort = registerChatAbortController({
chatAbortControllers: context.chatAbortControllers,
runId,
sessionId: resolvedSessionId ?? runId,
sessionKey: resolvedSessionKey,
timeoutMs,
now,
expiresAtMs: resolveAgentRunExpiresAtMs({ now, timeoutMs }),
ownerConnId: typeof client?.connId === "string" ? client.connId : undefined,
ownerDeviceId:
typeof client?.connect?.device?.id === "string" ? client.connect.device.id : undefined,
kind: "agent",
});
const accepted = {
runId,
status: "accepted" as const,
@@ -879,102 +915,112 @@ export const agentHandlers: GatewayRequestHandlers = {
});
respond(true, accepted, undefined, { runId });
if (resolvedSessionKey) {
await reactivateCompletedSubagentSession({
sessionKey: resolvedSessionKey,
runId,
});
}
if (requestedSessionKey && resolvedSessionKey && isNewSession) {
emitSessionsChanged(context, {
sessionKey: resolvedSessionKey,
reason: "create",
});
}
if (resolvedSessionKey) {
emitSessionsChanged(context, {
sessionKey: resolvedSessionKey,
reason: "send",
});
}
if (shouldPrependStartupContext && resolvedSessionKey) {
const { runtimeWorkspaceDir } = resolveSessionRuntimeWorkspace({
cfg: cfgForAgent ?? cfg,
sessionKey: resolvedSessionKey,
sessionEntry,
spawnedBy: spawnedByValue,
});
const startupContextPrelude = await buildSessionStartupContextPrelude({
workspaceDir: runtimeWorkspaceDir,
cfg: cfgForAgent ?? cfg,
});
if (startupContextPrelude) {
message = `${startupContextPrelude}\n\n${message}`;
let dispatched = false;
try {
if (resolvedSessionKey) {
await reactivateCompletedSubagentSession({
sessionKey: resolvedSessionKey,
runId,
});
}
}
const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId;
const ingressAgentId =
agentId &&
(!resolvedSessionKey || resolveAgentIdFromSessionKey(resolvedSessionKey) === agentId)
? agentId
: undefined;
if (requestedSessionKey && resolvedSessionKey && isNewSession) {
emitSessionsChanged(context, {
sessionKey: resolvedSessionKey,
reason: "create",
});
}
if (resolvedSessionKey) {
emitSessionsChanged(context, {
sessionKey: resolvedSessionKey,
reason: "send",
});
}
dispatchAgentRunFromGateway({
ingressOpts: {
message,
images,
imageOrder,
agentId: ingressAgentId,
provider: providerOverride,
model: modelOverride,
to: resolvedTo,
sessionId: resolvedSessionId,
sessionKey: resolvedSessionKey,
thinking: request.thinking,
deliver,
deliveryTargetMode,
channel: resolvedChannel,
accountId: resolvedAccountId,
threadId: resolvedThreadId,
runContext: {
messageChannel: originMessageChannel,
if (shouldPrependStartupContext && resolvedSessionKey) {
const { runtimeWorkspaceDir } = resolveSessionRuntimeWorkspace({
cfg: cfgForAgent ?? cfg,
sessionKey: resolvedSessionKey,
sessionEntry,
spawnedBy: spawnedByValue,
});
const startupContextPrelude = await buildSessionStartupContextPrelude({
workspaceDir: runtimeWorkspaceDir,
cfg: cfgForAgent ?? cfg,
});
if (startupContextPrelude) {
message = `${startupContextPrelude}\n\n${message}`;
}
}
const resolvedThreadId = explicitThreadId ?? deliveryPlan.resolvedThreadId;
const ingressAgentId =
agentId &&
(!resolvedSessionKey || resolveAgentIdFromSessionKey(resolvedSessionKey) === agentId)
? agentId
: undefined;
dispatchAgentRunFromGateway({
ingressOpts: {
message,
images,
imageOrder,
agentId: ingressAgentId,
provider: providerOverride,
model: modelOverride,
to: resolvedTo,
sessionId: resolvedSessionId,
sessionKey: resolvedSessionKey,
thinking: request.thinking,
deliver,
deliveryTargetMode,
channel: resolvedChannel,
accountId: resolvedAccountId,
threadId: resolvedThreadId,
runContext: {
messageChannel: originMessageChannel,
accountId: resolvedAccountId,
groupId: resolvedGroupId,
groupChannel: resolvedGroupChannel,
groupSpace: resolvedGroupSpace,
currentThreadTs: resolvedThreadId != null ? String(resolvedThreadId) : undefined,
},
groupId: resolvedGroupId,
groupChannel: resolvedGroupChannel,
groupSpace: resolvedGroupSpace,
currentThreadTs: resolvedThreadId != null ? String(resolvedThreadId) : undefined,
},
groupId: resolvedGroupId,
groupChannel: resolvedGroupChannel,
groupSpace: resolvedGroupSpace,
spawnedBy: spawnedByValue,
timeout: request.timeout?.toString(),
bestEffortDeliver,
messageChannel: originMessageChannel,
runId,
lane: request.lane,
cleanupBundleMcpOnRunEnd: request.cleanupBundleMcpOnRunEnd === true,
extraSystemPrompt: request.extraSystemPrompt,
bootstrapContextMode: request.bootstrapContextMode,
bootstrapContextRunKind: request.bootstrapContextRunKind,
internalEvents: request.internalEvents,
inputProvenance,
// Internal-only: allow workspace override for spawned subagent runs.
workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({
spawnedBy: spawnedByValue,
workspaceDir: sessionEntry?.spawnedWorkspaceDir,
}),
senderIsOwner,
allowModelOverride,
},
runId,
idempotencyKey: idem,
respond,
context,
});
timeout: request.timeout?.toString(),
bestEffortDeliver,
messageChannel: originMessageChannel,
runId,
lane: request.lane,
cleanupBundleMcpOnRunEnd: request.cleanupBundleMcpOnRunEnd === true,
extraSystemPrompt: request.extraSystemPrompt,
bootstrapContextMode: request.bootstrapContextMode,
bootstrapContextRunKind: request.bootstrapContextRunKind,
internalEvents: request.internalEvents,
inputProvenance,
abortSignal: activeRunAbort.controller.signal,
// Internal-only: allow workspace override for spawned subagent runs.
workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({
spawnedBy: spawnedByValue,
workspaceDir: sessionEntry?.spawnedWorkspaceDir,
}),
senderIsOwner,
allowModelOverride,
},
runId,
idempotencyKey: idem,
abortController: activeRunAbort.controller,
respond,
context,
});
dispatched = true;
} finally {
if (!dispatched) {
activeRunAbort.cleanup();
}
}
},
"agent.identity.get": ({ params, respond }) => {
if (!validateAgentIdentityParams(params)) {
@@ -1048,7 +1094,11 @@ export const agentHandlers: GatewayRequestHandlers = {
typeof p.timeoutMs === "number" && Number.isFinite(p.timeoutMs)
? Math.max(0, Math.floor(p.timeoutMs))
: 30_000;
const hasActiveChatRun = context.chatAbortControllers.has(runId);
// `hasActiveChatRun` drives snapshot preference, so it must reflect
// chat.send specifically — not an agent-kind entry registered by the
// `agent` RPC for its own abort surface.
const activeChatEntry = context.chatAbortControllers.get(runId);
const hasActiveChatRun = activeChatEntry !== undefined && activeChatEntry.kind !== "agent";
const cachedGatewaySnapshot = readTerminalSnapshotFromGatewayDedupe({
dedupe: context.dedupe,

View File

@@ -49,7 +49,7 @@ import {
type ChatAbortControllerEntry,
type ChatAbortOps,
isChatStopCommandText,
resolveChatRunExpiresAtMs,
registerChatAbortController,
} from "../chat-abort.js";
import {
type ChatImageContent,
@@ -2238,15 +2238,16 @@ export const chatHandlers: GatewayRequestHandlers = {
}
try {
const abortController = new AbortController();
context.chatAbortControllers.set(clientRunId, {
controller: abortController,
const activeRunAbort = registerChatAbortController({
chatAbortControllers: context.chatAbortControllers,
runId: clientRunId,
sessionId: entry?.sessionId ?? clientRunId,
sessionKey: rawSessionKey,
startedAtMs: now,
expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }),
timeoutMs,
now,
ownerConnId: normalizeOptionalText(client?.connId),
ownerDeviceId: normalizeOptionalText(client?.connect?.device?.id),
kind: "chat-send",
});
context.addChatRun(clientRunId, {
sessionKey,
@@ -2506,7 +2507,7 @@ export const chatHandlers: GatewayRequestHandlers = {
dispatcher,
replyOptions: {
runId: clientRunId,
abortSignal: abortController.signal,
abortSignal: activeRunAbort.controller.signal,
images: parsedImages.length > 0 ? parsedImages : undefined,
imageOrder: imageOrder.length > 0 ? imageOrder : undefined,
onAgentRunStart: (runId) => {
@@ -2743,7 +2744,7 @@ export const chatHandlers: GatewayRequestHandlers = {
});
})
.finally(() => {
context.chatAbortControllers.delete(clientRunId);
activeRunAbort.cleanup();
context.removeChatRun(clientRunId, clientRunId, sessionKey);
});
} catch (err) {

View File

@@ -2,6 +2,7 @@ import { onAgentEvent } from "../infra/agent-events.js";
import { onHeartbeatEvent } from "../infra/heartbeat-events.js";
import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import type { ChatAbortControllerEntry } from "./chat-abort.js";
import {
createAgentEventHandler,
type ChatRunState,
@@ -30,7 +31,7 @@ export function startGatewayEventSubscriptions(params: {
toolEventRecipients: ToolEventRecipientRegistry;
sessionEventSubscribers: SessionEventSubscriberRegistry;
sessionMessageSubscribers: SessionMessageSubscriberRegistry;
chatAbortControllers: Map<string, unknown>;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
}) {
const agentUnsub = onAgentEvent(
createAgentEventHandler({
@@ -43,7 +44,10 @@ export function startGatewayEventSubscriptions(params: {
clearAgentRunContext: params.clearAgentRunContext,
toolEventRecipients: params.toolEventRecipients,
sessionEventSubscribers: params.sessionEventSubscribers,
isChatSendRunActive: (runId) => params.chatAbortControllers.has(runId),
isChatSendRunActive: (runId) => {
const entry = params.chatAbortControllers.get(runId);
return entry !== undefined && entry.kind !== "agent";
},
}),
);