refactor: share acp dispatch text helpers

This commit is contained in:
Vincent Koc
2026-05-29 06:59:48 +02:00
parent 13cb9f8277
commit 6fdf6b0680
3 changed files with 105 additions and 83 deletions

View File

@@ -0,0 +1,21 @@
import type { FinalizedMsgContext } from "../templating.js";
export type ContextTextKey =
| "BodyForAgent"
| "BodyForCommands"
| "CommandBody"
| "RawBody"
| "Body";
export function resolveFirstContextText(
ctx: FinalizedMsgContext,
keys: readonly ContextTextKey[],
): string {
for (const key of keys) {
const value = ctx[key];
if (typeof value === "string") {
return value;
}
}
return "";
}

View File

@@ -3,19 +3,7 @@ import { isCommandEnabled } from "../commands-registry-list.js";
import { maybeResolveTextAlias } from "../commands-registry-normalize.js";
import { shouldHandleTextCommands } from "../commands-text-routing.js";
import type { FinalizedMsgContext } from "../templating.js";
function resolveFirstContextText(
ctx: FinalizedMsgContext,
keys: Array<"BodyForAgent" | "BodyForCommands" | "CommandBody" | "RawBody" | "Body">,
): string {
for (const key of keys) {
const value = ctx[key];
if (typeof value === "string") {
return value;
}
}
return "";
}
import { resolveFirstContextText } from "./context-text.js";
function resolveCommandCandidateText(ctx: FinalizedMsgContext): string {
return resolveFirstContextText(ctx, ["CommandBody", "BodyForCommands", "RawBody", "Body"]).trim();

View File

@@ -1,6 +1,6 @@
import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../../acp/policy.js";
import { formatAcpRuntimeErrorText } from "../../acp/runtime/error-text.js";
import { toAcpRuntimeError } from "../../acp/runtime/errors.js";
import { type AcpRuntimeError, toAcpRuntimeError } from "../../acp/runtime/errors.js";
import { resolveAcpThreadSessionDetailLines } from "../../acp/runtime/session-identifiers.js";
import {
isSessionIdentityPending,
@@ -34,6 +34,7 @@ import {
resolveAgentTurnAttachments,
resolveInlineAgentImageAttachments,
} from "./agent-turn-attachments.js";
import { resolveFirstContextText } from "./context-text.js";
import {
createAcpDispatchDeliveryCoordinator,
type AcpDispatchDeliveryCoordinator,
@@ -79,19 +80,6 @@ type DispatchProcessedRecorder = (
},
) => void;
function resolveFirstContextText(
ctx: FinalizedMsgContext,
keys: Array<"BodyForAgent" | "BodyForCommands" | "CommandBody" | "RawBody" | "Body">,
): string {
for (const key of keys) {
const value = ctx[key];
if (typeof value === "string") {
return value;
}
}
return "";
}
function resolveAcpPromptText(ctx: FinalizedMsgContext): string {
return resolveFirstContextText(ctx, [
"BodyForAgent",
@@ -168,6 +156,59 @@ export type AcpDispatchAttemptResult = {
counts: Record<ReplyDispatchKind, number>;
};
type AcpDispatchStatsSnapshot = {
turns: { queueDepth: number };
runtimeCache: { activeSessions: number };
};
type AcpDispatchOutcome = { kind: "ok" } | { kind: "error"; error: AcpRuntimeError };
function finishAcpDispatchAttempt(params: {
queuedFinal: boolean;
dispatcher: ReplyDispatcher;
delivery: AcpDispatchDeliveryCoordinator;
getStats: () => AcpDispatchStatsSnapshot;
sessionKey: string;
runId?: string;
startedAt: number;
outcome: AcpDispatchOutcome;
lifecyclePhase?: "end" | "error";
recordProcessed: DispatchProcessedRecorder;
markIdle: (reason: string) => void;
}): AcpDispatchAttemptResult {
const counts = params.dispatcher.getQueuedCounts();
params.delivery.applyRoutedCounts(counts);
const acpStats = params.getStats();
const runId = normalizeOptionalString(params.runId);
if (runId && params.lifecyclePhase) {
emitAgentEvent({
runId,
sessionKey: params.sessionKey,
stream: "lifecycle",
data: {
phase: params.lifecyclePhase,
startedAt: params.startedAt,
endedAt: Date.now(),
...(params.outcome.kind === "error" ? { error: params.outcome.error.message } : {}),
},
});
}
if (params.outcome.kind === "ok") {
logVerbose(
`acp-dispatch: session=${params.sessionKey} outcome=ok latencyMs=${Date.now() - params.startedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
params.recordProcessed("completed", { reason: "acp_dispatch" });
} else {
logVerbose(
`acp-dispatch: session=${params.sessionKey} outcome=error code=${params.outcome.error.code} latencyMs=${Date.now() - params.startedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
params.recordProcessed("completed", {
reason: `acp_error:${normalizeLowercaseStringOrEmpty(params.outcome.error.code)}`,
});
}
params.markIdle("message_completed");
return { queuedFinal: params.queuedFinal, counts };
}
const ACP_STALE_BINDING_UNBIND_REASON = "acp-session-init-failed";
function isStaleSessionInitError(params: { code: string; message: string }): boolean {
@@ -437,6 +478,22 @@ export async function tryDispatchAcpReply(params: {
});
const acpDispatchStartedAt = Date.now();
const finishAttempt = (options: {
queuedFinal: boolean;
outcome: AcpDispatchOutcome;
lifecyclePhase?: "end" | "error";
}) =>
finishAcpDispatchAttempt({
...options,
dispatcher: params.dispatcher,
delivery,
getStats: () => acpManager.getObservabilitySnapshot(params.cfg),
sessionKey,
runId: params.runId,
startedAt: acpDispatchStartedAt,
recordProcessed: params.recordProcessed,
markIdle: params.markIdle,
});
try {
const dispatchPolicyError = resolveAcpDispatchPolicyError(params.cfg);
if (dispatchPolicyError) {
@@ -451,17 +508,10 @@ export async function tryDispatchAcpReply(params: {
text: formatAcpRuntimeErrorText(acpResolution.error),
isError: true,
});
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=error code=${acpResolution.error.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
params.recordProcessed("completed", {
reason: `acp_error:${normalizeLowercaseStringOrEmpty(acpResolution.error.code)}`,
return finishAttempt({
queuedFinal: delivered,
outcome: { kind: "error", error: acpResolution.error },
});
params.markIdle("message_completed");
return { queuedFinal: delivered, counts };
}
const agentPolicyError = resolveAcpAgentPolicyError(params.cfg, resolvedAcpAgent);
if (agentPolicyError) {
@@ -572,28 +622,11 @@ export async function tryDispatchAcpReply(params: {
shouldEmitResolvedIdentityNotice,
})) || queuedFinal;
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
const runId = normalizeOptionalString(params.runId);
if (runId) {
emitAgentEvent({
runId,
sessionKey,
stream: "lifecycle",
data: {
phase: "end",
startedAt: acpDispatchStartedAt,
endedAt: Date.now(),
},
});
}
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=ok latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
params.recordProcessed("completed", { reason: "acp_dispatch" });
params.markIdle("message_completed");
return { queuedFinal, counts };
return finishAttempt({
queuedFinal,
outcome: { kind: "ok" },
lifecyclePhase: "end",
});
} catch (err) {
await projector.flush(true);
const acpError = toAcpRuntimeError({
@@ -610,30 +643,10 @@ export async function tryDispatchAcpReply(params: {
isError: true,
});
queuedFinal = queuedFinal || delivered;
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
const runId = normalizeOptionalString(params.runId);
if (runId) {
emitAgentEvent({
runId,
sessionKey,
stream: "lifecycle",
data: {
phase: "error",
startedAt: acpDispatchStartedAt,
endedAt: Date.now(),
error: acpError.message,
},
});
}
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=error code=${acpError.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
params.recordProcessed("completed", {
reason: `acp_error:${normalizeLowercaseStringOrEmpty(acpError.code)}`,
return finishAttempt({
queuedFinal,
outcome: { kind: "error", error: acpError },
lifecyclePhase: "error",
});
params.markIdle("message_completed");
return { queuedFinal, counts };
}
}