refactor: centralize dispatcher lifecycle ownership

This commit is contained in:
Peter Steinberger
2026-02-14 00:41:27 +01:00
parent 5caf829d28
commit d5e25e0ad8
6 changed files with 107 additions and 88 deletions

View File

@@ -6,7 +6,7 @@ import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js";
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import { resolveThinkingDefault } from "../../agents/model-selection.js";
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
import { dispatchInboundMessage, withReplyDispatcher } from "../../auto-reply/dispatch.js";
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
import { resolveSessionFilePath } from "../../config/sessions.js";
@@ -524,40 +524,36 @@ export const chatHandlers: GatewayRequestHandlers = {
});
let agentRunStarted = false;
void withReplyDispatcher({
void dispatchInboundMessage({
ctx,
cfg,
dispatcher,
run: () =>
dispatchInboundMessage({
ctx,
cfg,
dispatcher,
replyOptions: {
runId: clientRunId,
abortSignal: abortController.signal,
images: parsedImages.length > 0 ? parsedImages : undefined,
disableBlockStreaming: true,
onAgentRunStart: (runId) => {
agentRunStarted = true;
const connId = typeof client?.connId === "string" ? client.connId : undefined;
const wantsToolEvents = hasGatewayClientCap(
client?.connect?.caps,
GATEWAY_CLIENT_CAPS.TOOL_EVENTS,
);
if (connId && wantsToolEvents) {
context.registerToolEventRecipient(runId, connId);
// Register for any other active runs *in the same session* so
// late-joining clients (e.g. page refresh mid-response) receive
// in-progress tool events without leaking cross-session data.
for (const [activeRunId, active] of context.chatAbortControllers) {
if (activeRunId !== runId && active.sessionKey === p.sessionKey) {
context.registerToolEventRecipient(activeRunId, connId);
}
}
replyOptions: {
runId: clientRunId,
abortSignal: abortController.signal,
images: parsedImages.length > 0 ? parsedImages : undefined,
disableBlockStreaming: true,
onAgentRunStart: (runId) => {
agentRunStarted = true;
const connId = typeof client?.connId === "string" ? client.connId : undefined;
const wantsToolEvents = hasGatewayClientCap(
client?.connect?.caps,
GATEWAY_CLIENT_CAPS.TOOL_EVENTS,
);
if (connId && wantsToolEvents) {
context.registerToolEventRecipient(runId, connId);
// Register for any other active runs *in the same session* so
// late-joining clients (e.g. page refresh mid-response) receive
// in-progress tool events without leaking cross-session data.
for (const [activeRunId, active] of context.chatAbortControllers) {
if (activeRunId !== runId && active.sessionKey === p.sessionKey) {
context.registerToolEventRecipient(activeRunId, connId);
}
},
onModelSelected,
},
}),
}
}
},
onModelSelected,
},
})
.then(() => {
if (!agentRunStarted) {