Files
openclaw/src/auto-reply/reply/dispatch-acp.ts
2026-04-20 14:27:26 +01:00

508 lines
18 KiB
TypeScript

import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../../acp/policy.js";
import { formatAcpRuntimeErrorText } from "../../acp/runtime/error-text.js";
import { toAcpRuntimeError } from "../../acp/runtime/errors.js";
import { resolveAcpThreadSessionDetailLines } from "../../acp/runtime/session-identifiers.js";
import {
isSessionIdentityPending,
resolveSessionIdentityFromMeta,
} from "../../acp/runtime/session-identity.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import type { TtsAutoMode } from "../../config/types.tts.js";
import { logVerbose } from "../../globals.js";
import { emitAgentEvent } from "../../infra/agent-events.js";
import { formatErrorMessage } from "../../infra/errors.js";
import { generateSecureUuid } from "../../infra/secure-random.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalLowercaseString,
normalizeOptionalString,
} from "../../shared/string-coerce.js";
import { resolveStatusTtsSnapshot } from "../../tts/status-config.js";
import { resolveConfiguredTtsMode } from "../../tts/tts-config.js";
import type { FinalizedMsgContext } from "../templating.js";
import { createAcpReplyProjector } from "./acp-projector.js";
import { loadDispatchAcpMediaRuntime, resolveAcpAttachments } from "./dispatch-acp-attachments.js";
import {
createAcpDispatchDeliveryCoordinator,
type AcpDispatchDeliveryCoordinator,
} from "./dispatch-acp-delivery.js";
import { hasInboundMedia } from "./inbound-media.js";
import type { ReplyDispatchKind, ReplyDispatcher } from "./reply-dispatcher.types.js";
let dispatchAcpManagerRuntimePromise: Promise<
typeof import("./dispatch-acp-manager.runtime.js")
> | null = null;
let dispatchAcpSessionRuntimePromise: Promise<
typeof import("./dispatch-acp-session.runtime.js")
> | null = null;
let dispatchAcpTtsRuntimePromise: Promise<typeof import("./dispatch-acp-tts.runtime.js")> | null =
null;
function loadDispatchAcpManagerRuntime() {
dispatchAcpManagerRuntimePromise ??= import("./dispatch-acp-manager.runtime.js");
return dispatchAcpManagerRuntimePromise;
}
function loadDispatchAcpSessionRuntime() {
dispatchAcpSessionRuntimePromise ??= import("./dispatch-acp-session.runtime.js");
return dispatchAcpSessionRuntimePromise;
}
function loadDispatchAcpTtsRuntime() {
dispatchAcpTtsRuntimePromise ??= import("./dispatch-acp-tts.runtime.js");
return dispatchAcpTtsRuntimePromise;
}
type DispatchProcessedRecorder = (
outcome: "completed" | "skipped" | "error",
opts?: {
reason?: string;
error?: string;
},
) => void;
function resolveFirstContextText(
ctx: FinalizedMsgContext,
keys: Array<"BodyForAgent" | "BodyForCommands" | "CommandBody" | "RawBody" | "Body">,
): string {
for (const key of keys) {
const value = ctx[key];
if (typeof value === "string") {
return value;
}
}
return "";
}
function resolveAcpPromptText(ctx: FinalizedMsgContext): string {
return resolveFirstContextText(ctx, [
"BodyForAgent",
"BodyForCommands",
"CommandBody",
"RawBody",
"Body",
]).trim();
}
function resolveAcpRequestId(ctx: FinalizedMsgContext): string {
const id = ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
if (typeof id === "string") {
const normalizedId = normalizeOptionalString(id);
if (normalizedId) {
return normalizedId;
}
}
if (typeof id === "number" || typeof id === "bigint") {
return String(id);
}
return generateSecureUuid();
}
async function hasBoundConversationForSession(params: {
cfg: OpenClawConfig;
sessionKey: string;
channelRaw: string | undefined;
accountIdRaw: string | undefined;
}): Promise<boolean> {
const channel = normalizeOptionalLowercaseString(params.channelRaw) ?? "";
if (!channel) {
return false;
}
const accountId = normalizeOptionalLowercaseString(params.accountIdRaw) ?? "";
const channels = params.cfg.channels as Record<string, { defaultAccount?: unknown } | undefined>;
const configuredDefaultAccountId = channels?.[channel]?.defaultAccount;
const normalizedAccountId =
accountId || normalizeOptionalLowercaseString(configuredDefaultAccountId) || "default";
const { getSessionBindingService } = await loadDispatchAcpManagerRuntime();
const bindingService = getSessionBindingService();
const bindings = bindingService.listBySession(params.sessionKey);
return bindings.some((binding) => {
const bindingChannel = normalizeOptionalLowercaseString(binding.conversation.channel) ?? "";
const bindingAccountId = normalizeOptionalLowercaseString(binding.conversation.accountId) ?? "";
const conversationId = normalizeOptionalString(binding.conversation.conversationId) ?? "";
return (
bindingChannel === channel &&
(bindingAccountId || "default") === normalizedAccountId &&
conversationId.length > 0
);
});
}
export type AcpDispatchAttemptResult = {
queuedFinal: boolean;
counts: Record<ReplyDispatchKind, number>;
};
const ACP_STALE_BINDING_UNBIND_REASON = "acp-session-init-failed";
function isStaleSessionInitError(params: { code: string; message: string }): boolean {
if (params.code !== "ACP_SESSION_INIT_FAILED") {
return false;
}
return /(ACP (session )?metadata is missing|missing ACP metadata|Session is not ACP-enabled|Resource not found)/i.test(
params.message,
);
}
async function maybeUnbindStaleBoundConversations(params: {
targetSessionKey: string;
error: { code: string; message: string };
}): Promise<void> {
if (!isStaleSessionInitError(params.error)) {
return;
}
try {
const { getSessionBindingService } = await loadDispatchAcpManagerRuntime();
const removed = await getSessionBindingService().unbind({
targetSessionKey: params.targetSessionKey,
reason: ACP_STALE_BINDING_UNBIND_REASON,
});
if (removed.length > 0) {
logVerbose(
`dispatch-acp: removed ${removed.length} stale bound conversation(s) for ${params.targetSessionKey} after ${params.error.code}: ${params.error.message}`,
);
}
} catch (error) {
logVerbose(
`dispatch-acp: failed to unbind stale bound conversations for ${params.targetSessionKey}: ${formatErrorMessage(error)}`,
);
}
}
async function finalizeAcpTurnOutput(params: {
cfg: OpenClawConfig;
sessionKey: string;
delivery: AcpDispatchDeliveryCoordinator;
inboundAudio: boolean;
sessionTtsAuto?: TtsAutoMode;
ttsChannel?: string;
shouldEmitResolvedIdentityNotice: boolean;
}): Promise<boolean> {
await params.delivery.settleVisibleText();
let queuedFinal =
params.delivery.hasDeliveredVisibleText() && !params.delivery.hasFailedVisibleTextDelivery();
const ttsMode = resolveConfiguredTtsMode(params.cfg);
const accumulatedBlockText = params.delivery.getAccumulatedBlockText();
const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0;
const ttsStatus = resolveStatusTtsSnapshot({
cfg: params.cfg,
sessionAuto: params.sessionTtsAuto,
});
const canAttemptFinalTts =
ttsStatus != null && !(ttsStatus.autoMode === "inbound" && !params.inboundAudio);
let finalMediaDelivered = false;
if (ttsMode === "final" && hasAccumulatedBlockText && canAttemptFinalTts) {
try {
const { maybeApplyTtsToPayload } = await loadDispatchAcpTtsRuntime();
const ttsSyntheticReply = await maybeApplyTtsToPayload({
payload: { text: accumulatedBlockText },
cfg: params.cfg,
channel: params.ttsChannel,
kind: "final",
inboundAudio: params.inboundAudio,
ttsAuto: params.sessionTtsAuto,
});
if (ttsSyntheticReply.mediaUrl) {
const delivered = await params.delivery.deliver("final", {
mediaUrl: ttsSyntheticReply.mediaUrl,
audioAsVoice: ttsSyntheticReply.audioAsVoice,
});
queuedFinal = queuedFinal || delivered;
finalMediaDelivered = delivered;
}
} catch (err) {
logVerbose(`dispatch-acp: accumulated ACP block TTS failed: ${formatErrorMessage(err)}`);
}
}
// Some ACP parent surfaces only expose terminal replies, so block routing alone is not enough
// to prove the final result was visible to the user.
const shouldDeliverTextFallback =
ttsMode !== "all" &&
hasAccumulatedBlockText &&
!finalMediaDelivered &&
!params.delivery.hasDeliveredFinalReply() &&
(!params.delivery.hasDeliveredVisibleText() || params.delivery.hasFailedVisibleTextDelivery());
if (shouldDeliverTextFallback) {
const delivered = await params.delivery.deliver(
"final",
{ text: accumulatedBlockText },
{ skipTts: true },
);
queuedFinal = queuedFinal || delivered;
}
if (params.shouldEmitResolvedIdentityNotice) {
const { readAcpSessionEntry } = await loadDispatchAcpSessionRuntime();
const currentMeta = readAcpSessionEntry({
cfg: params.cfg,
sessionKey: params.sessionKey,
})?.acp;
const identityAfterTurn = resolveSessionIdentityFromMeta(currentMeta);
if (!isSessionIdentityPending(identityAfterTurn)) {
const resolvedDetails = resolveAcpThreadSessionDetailLines({
sessionKey: params.sessionKey,
meta: currentMeta,
});
if (resolvedDetails.length > 0) {
const delivered = await params.delivery.deliver("final", {
text: prefixSystemMessage(["Session ids resolved.", ...resolvedDetails].join("\n")),
});
queuedFinal = queuedFinal || delivered;
}
}
}
return queuedFinal;
}
export async function tryDispatchAcpReply(params: {
ctx: FinalizedMsgContext;
cfg: OpenClawConfig;
dispatcher: ReplyDispatcher;
runId?: string;
sessionKey?: string;
abortSignal?: AbortSignal;
inboundAudio: boolean;
sessionTtsAuto?: TtsAutoMode;
ttsChannel?: string;
suppressUserDelivery?: boolean;
shouldRouteToOriginating: boolean;
originatingChannel?: string;
originatingTo?: string;
shouldSendToolSummaries: boolean;
bypassForCommand: boolean;
onReplyStart?: () => Promise<void> | void;
recordProcessed: DispatchProcessedRecorder;
markIdle: (reason: string) => void;
}): Promise<AcpDispatchAttemptResult | null> {
const sessionKey = normalizeOptionalString(params.sessionKey);
if (!sessionKey || params.bypassForCommand) {
return null;
}
const { getAcpSessionManager } = await loadDispatchAcpManagerRuntime();
const acpManager = getAcpSessionManager();
const acpResolution = acpManager.resolveSession({
cfg: params.cfg,
sessionKey,
});
if (acpResolution.kind === "none") {
return null;
}
const canonicalSessionKey = acpResolution.sessionKey;
let queuedFinal = false;
const delivery = createAcpDispatchDeliveryCoordinator({
cfg: params.cfg,
ctx: params.ctx,
dispatcher: params.dispatcher,
inboundAudio: params.inboundAudio,
sessionTtsAuto: params.sessionTtsAuto,
ttsChannel: params.ttsChannel,
suppressUserDelivery: params.suppressUserDelivery,
shouldRouteToOriginating: params.shouldRouteToOriginating,
originatingChannel: params.originatingChannel,
originatingTo: params.originatingTo,
onReplyStart: params.onReplyStart,
});
const identityPendingBeforeTurn = isSessionIdentityPending(
resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined),
);
const shouldEmitResolvedIdentityNotice =
!params.suppressUserDelivery &&
identityPendingBeforeTurn &&
(Boolean(
params.ctx.MessageThreadId != null &&
(normalizeOptionalString(String(params.ctx.MessageThreadId)) ?? ""),
) ||
(await hasBoundConversationForSession({
cfg: params.cfg,
sessionKey: canonicalSessionKey,
channelRaw: params.ctx.OriginatingChannel ?? params.ctx.Surface ?? params.ctx.Provider,
accountIdRaw: params.ctx.AccountId,
})));
const resolvedAcpAgent =
acpResolution.kind === "ready"
? (normalizeOptionalString(acpResolution.meta.agent) ??
normalizeOptionalString(params.cfg.acp?.defaultAgent) ??
resolveAgentIdFromSessionKey(canonicalSessionKey))
: resolveAgentIdFromSessionKey(canonicalSessionKey);
const normalizedDispatchChannel = normalizeOptionalLowercaseString(
params.ctx.OriginatingChannel ?? params.ctx.Surface ?? params.ctx.Provider,
);
const explicitDispatchAccountId = normalizeOptionalString(params.ctx.AccountId);
const dispatchChannels = params.cfg.channels as
| Record<string, { defaultAccount?: unknown } | undefined>
| undefined;
const defaultDispatchAccount =
normalizedDispatchChannel == null
? undefined
: dispatchChannels?.[normalizedDispatchChannel]?.defaultAccount;
const effectiveDispatchAccountId =
explicitDispatchAccountId ?? normalizeOptionalString(defaultDispatchAccount);
const projector = createAcpReplyProjector({
cfg: params.cfg,
shouldSendToolSummaries: params.shouldSendToolSummaries,
deliver: delivery.deliver,
provider: params.ctx.Surface ?? params.ctx.Provider,
accountId: effectiveDispatchAccountId,
});
const acpDispatchStartedAt = Date.now();
try {
const dispatchPolicyError = resolveAcpDispatchPolicyError(params.cfg);
if (dispatchPolicyError) {
throw dispatchPolicyError;
}
if (acpResolution.kind === "stale") {
await maybeUnbindStaleBoundConversations({
targetSessionKey: canonicalSessionKey,
error: acpResolution.error,
});
const delivered = await delivery.deliver("final", {
text: formatAcpRuntimeErrorText(acpResolution.error),
isError: true,
});
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=error code=${acpResolution.error.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
params.recordProcessed("completed", {
reason: `acp_error:${normalizeLowercaseStringOrEmpty(acpResolution.error.code)}`,
});
params.markIdle("message_completed");
return { queuedFinal: delivered, counts };
}
const agentPolicyError = resolveAcpAgentPolicyError(params.cfg, resolvedAcpAgent);
if (agentPolicyError) {
throw agentPolicyError;
}
if (hasInboundMedia(params.ctx) && !params.ctx.MediaUnderstanding?.length) {
try {
const { applyMediaUnderstanding } = await loadDispatchAcpMediaRuntime();
await applyMediaUnderstanding({
ctx: params.ctx,
cfg: params.cfg,
});
} catch (err) {
logVerbose(
`dispatch-acp: media understanding failed, proceeding with raw content: ${formatErrorMessage(err)}`,
);
}
}
const promptText = resolveAcpPromptText(params.ctx);
const attachments = hasInboundMedia(params.ctx)
? await resolveAcpAttachments({ ctx: params.ctx, cfg: params.cfg })
: [];
if (!promptText && attachments.length === 0) {
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
params.recordProcessed("completed", { reason: "acp_empty_prompt" });
params.markIdle("message_completed");
return { queuedFinal: false, counts };
}
try {
await delivery.startReplyLifecycle();
} catch (error) {
logVerbose(`dispatch-acp: start reply lifecycle failed: ${formatErrorMessage(error)}`);
}
await acpManager.runTurn({
cfg: params.cfg,
sessionKey: canonicalSessionKey,
text: promptText,
attachments: attachments.length > 0 ? attachments : undefined,
mode: "prompt",
requestId: resolveAcpRequestId(params.ctx),
...(params.abortSignal ? { signal: params.abortSignal } : {}),
onEvent: async (event) => await projector.onEvent(event),
});
await projector.flush(true);
queuedFinal =
(await finalizeAcpTurnOutput({
cfg: params.cfg,
sessionKey: canonicalSessionKey,
delivery,
inboundAudio: params.inboundAudio,
sessionTtsAuto: params.sessionTtsAuto,
ttsChannel: params.ttsChannel,
shouldEmitResolvedIdentityNotice,
})) || queuedFinal;
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
const runId = normalizeOptionalString(params.runId);
if (runId) {
emitAgentEvent({
runId,
sessionKey,
stream: "lifecycle",
data: {
phase: "end",
startedAt: acpDispatchStartedAt,
endedAt: Date.now(),
},
});
}
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=ok latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
params.recordProcessed("completed", { reason: "acp_dispatch" });
params.markIdle("message_completed");
return { queuedFinal, counts };
} catch (err) {
await projector.flush(true);
const acpError = toAcpRuntimeError({
error: err,
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "ACP turn failed before completion.",
});
await maybeUnbindStaleBoundConversations({
targetSessionKey: canonicalSessionKey,
error: acpError,
});
const delivered = await delivery.deliver("final", {
text: formatAcpRuntimeErrorText(acpError),
isError: true,
});
queuedFinal = queuedFinal || delivered;
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
const runId = normalizeOptionalString(params.runId);
if (runId) {
emitAgentEvent({
runId,
sessionKey,
stream: "lifecycle",
data: {
phase: "error",
startedAt: acpDispatchStartedAt,
endedAt: Date.now(),
error: acpError.message,
},
});
}
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=error code=${acpError.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
);
params.recordProcessed("completed", {
reason: `acp_error:${normalizeLowercaseStringOrEmpty(acpError.code)}`,
});
params.markIdle("message_completed");
return { queuedFinal, counts };
}
}