diff --git a/src/auto-reply/reply/context-text.ts b/src/auto-reply/reply/context-text.ts new file mode 100644 index 00000000000..894ecbd534d --- /dev/null +++ b/src/auto-reply/reply/context-text.ts @@ -0,0 +1,21 @@ +import type { FinalizedMsgContext } from "../templating.js"; + +export type ContextTextKey = + | "BodyForAgent" + | "BodyForCommands" + | "CommandBody" + | "RawBody" + | "Body"; + +export function resolveFirstContextText( + ctx: FinalizedMsgContext, + keys: readonly ContextTextKey[], +): string { + for (const key of keys) { + const value = ctx[key]; + if (typeof value === "string") { + return value; + } + } + return ""; +} diff --git a/src/auto-reply/reply/dispatch-acp-command-bypass.ts b/src/auto-reply/reply/dispatch-acp-command-bypass.ts index a65f1fee6eb..14ecc820a7f 100644 --- a/src/auto-reply/reply/dispatch-acp-command-bypass.ts +++ b/src/auto-reply/reply/dispatch-acp-command-bypass.ts @@ -3,19 +3,7 @@ import { isCommandEnabled } from "../commands-registry-list.js"; import { maybeResolveTextAlias } from "../commands-registry-normalize.js"; import { shouldHandleTextCommands } from "../commands-text-routing.js"; import type { FinalizedMsgContext } from "../templating.js"; - -function resolveFirstContextText( - ctx: FinalizedMsgContext, - keys: Array<"BodyForAgent" | "BodyForCommands" | "CommandBody" | "RawBody" | "Body">, -): string { - for (const key of keys) { - const value = ctx[key]; - if (typeof value === "string") { - return value; - } - } - return ""; -} +import { resolveFirstContextText } from "./context-text.js"; function resolveCommandCandidateText(ctx: FinalizedMsgContext): string { return resolveFirstContextText(ctx, ["CommandBody", "BodyForCommands", "RawBody", "Body"]).trim(); diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 69aa58a8e1f..03288848385 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -1,6 +1,6 @@ import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../../acp/policy.js"; import { formatAcpRuntimeErrorText } from "../../acp/runtime/error-text.js"; -import { toAcpRuntimeError } from "../../acp/runtime/errors.js"; +import { type AcpRuntimeError, toAcpRuntimeError } from "../../acp/runtime/errors.js"; import { resolveAcpThreadSessionDetailLines } from "../../acp/runtime/session-identifiers.js"; import { isSessionIdentityPending, @@ -34,6 +34,7 @@ import { resolveAgentTurnAttachments, resolveInlineAgentImageAttachments, } from "./agent-turn-attachments.js"; +import { resolveFirstContextText } from "./context-text.js"; import { createAcpDispatchDeliveryCoordinator, type AcpDispatchDeliveryCoordinator, @@ -79,19 +80,6 @@ type DispatchProcessedRecorder = ( }, ) => void; -function resolveFirstContextText( - ctx: FinalizedMsgContext, - keys: Array<"BodyForAgent" | "BodyForCommands" | "CommandBody" | "RawBody" | "Body">, -): string { - for (const key of keys) { - const value = ctx[key]; - if (typeof value === "string") { - return value; - } - } - return ""; -} - function resolveAcpPromptText(ctx: FinalizedMsgContext): string { return resolveFirstContextText(ctx, [ "BodyForAgent", @@ -168,6 +156,59 @@ export type AcpDispatchAttemptResult = { counts: Record; }; +type AcpDispatchStatsSnapshot = { + turns: { queueDepth: number }; + runtimeCache: { activeSessions: number }; +}; +type AcpDispatchOutcome = { kind: "ok" } | { kind: "error"; error: AcpRuntimeError }; + +function finishAcpDispatchAttempt(params: { + queuedFinal: boolean; + dispatcher: ReplyDispatcher; + delivery: AcpDispatchDeliveryCoordinator; + getStats: () => AcpDispatchStatsSnapshot; + sessionKey: string; + runId?: string; + startedAt: number; + outcome: AcpDispatchOutcome; + lifecyclePhase?: "end" | "error"; + recordProcessed: DispatchProcessedRecorder; + markIdle: (reason: string) => void; +}): AcpDispatchAttemptResult { + const counts = params.dispatcher.getQueuedCounts(); + params.delivery.applyRoutedCounts(counts); + const acpStats = params.getStats(); + const runId = normalizeOptionalString(params.runId); + if (runId && params.lifecyclePhase) { + emitAgentEvent({ + runId, + sessionKey: params.sessionKey, + stream: "lifecycle", + data: { + phase: params.lifecyclePhase, + startedAt: params.startedAt, + endedAt: Date.now(), + ...(params.outcome.kind === "error" ? { error: params.outcome.error.message } : {}), + }, + }); + } + if (params.outcome.kind === "ok") { + logVerbose( + `acp-dispatch: session=${params.sessionKey} outcome=ok latencyMs=${Date.now() - params.startedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`, + ); + params.recordProcessed("completed", { reason: "acp_dispatch" }); + } else { + logVerbose( + `acp-dispatch: session=${params.sessionKey} outcome=error code=${params.outcome.error.code} latencyMs=${Date.now() - params.startedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`, + ); + params.recordProcessed("completed", { + reason: `acp_error:${normalizeLowercaseStringOrEmpty(params.outcome.error.code)}`, + }); + } + params.markIdle("message_completed"); + return { queuedFinal: params.queuedFinal, counts }; +} + const ACP_STALE_BINDING_UNBIND_REASON = "acp-session-init-failed"; function isStaleSessionInitError(params: { code: string; message: string }): boolean { @@ -437,6 +478,22 @@ export async function tryDispatchAcpReply(params: { }); const acpDispatchStartedAt = Date.now(); + const finishAttempt = (options: { + queuedFinal: boolean; + outcome: AcpDispatchOutcome; + lifecyclePhase?: "end" | "error"; + }) => + finishAcpDispatchAttempt({ + ...options, + dispatcher: params.dispatcher, + delivery, + getStats: () => acpManager.getObservabilitySnapshot(params.cfg), + sessionKey, + runId: params.runId, + startedAt: acpDispatchStartedAt, + recordProcessed: params.recordProcessed, + markIdle: params.markIdle, + }); try { const dispatchPolicyError = resolveAcpDispatchPolicyError(params.cfg); if (dispatchPolicyError) { @@ -451,17 +508,10 @@ export async function tryDispatchAcpReply(params: { text: formatAcpRuntimeErrorText(acpResolution.error), isError: true, }); - const counts = params.dispatcher.getQueuedCounts(); - delivery.applyRoutedCounts(counts); - const acpStats = acpManager.getObservabilitySnapshot(params.cfg); - logVerbose( - `acp-dispatch: session=${sessionKey} outcome=error code=${acpResolution.error.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`, - ); - params.recordProcessed("completed", { - reason: `acp_error:${normalizeLowercaseStringOrEmpty(acpResolution.error.code)}`, + return finishAttempt({ + queuedFinal: delivered, + outcome: { kind: "error", error: acpResolution.error }, }); - params.markIdle("message_completed"); - return { queuedFinal: delivered, counts }; } const agentPolicyError = resolveAcpAgentPolicyError(params.cfg, resolvedAcpAgent); if (agentPolicyError) { @@ -572,28 +622,11 @@ export async function tryDispatchAcpReply(params: { shouldEmitResolvedIdentityNotice, })) || queuedFinal; - const counts = params.dispatcher.getQueuedCounts(); - delivery.applyRoutedCounts(counts); - const acpStats = acpManager.getObservabilitySnapshot(params.cfg); - const runId = normalizeOptionalString(params.runId); - if (runId) { - emitAgentEvent({ - runId, - sessionKey, - stream: "lifecycle", - data: { - phase: "end", - startedAt: acpDispatchStartedAt, - endedAt: Date.now(), - }, - }); - } - logVerbose( - `acp-dispatch: session=${sessionKey} outcome=ok latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`, - ); - params.recordProcessed("completed", { reason: "acp_dispatch" }); - params.markIdle("message_completed"); - return { queuedFinal, counts }; + return finishAttempt({ + queuedFinal, + outcome: { kind: "ok" }, + lifecyclePhase: "end", + }); } catch (err) { await projector.flush(true); const acpError = toAcpRuntimeError({ @@ -610,30 +643,10 @@ export async function tryDispatchAcpReply(params: { isError: true, }); queuedFinal = queuedFinal || delivered; - const counts = params.dispatcher.getQueuedCounts(); - delivery.applyRoutedCounts(counts); - const acpStats = acpManager.getObservabilitySnapshot(params.cfg); - const runId = normalizeOptionalString(params.runId); - if (runId) { - emitAgentEvent({ - runId, - sessionKey, - stream: "lifecycle", - data: { - phase: "error", - startedAt: acpDispatchStartedAt, - endedAt: Date.now(), - error: acpError.message, - }, - }); - } - logVerbose( - `acp-dispatch: session=${sessionKey} outcome=error code=${acpError.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`, - ); - params.recordProcessed("completed", { - reason: `acp_error:${normalizeLowercaseStringOrEmpty(acpError.code)}`, + return finishAttempt({ + queuedFinal, + outcome: { kind: "error", error: acpError }, + lifecyclePhase: "error", }); - params.markIdle("message_completed"); - return { queuedFinal, counts }; } }