import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../../acp/policy.js"; import { formatAcpRuntimeErrorText } from "../../acp/runtime/error-text.js"; import { toAcpRuntimeError } from "../../acp/runtime/errors.js"; import { resolveAcpThreadSessionDetailLines } from "../../acp/runtime/session-identifiers.js"; import { isSessionIdentityPending, resolveSessionIdentityFromMeta, } from "../../acp/runtime/session-identity.js"; import { resolveAgentDir } from "../../agents/agent-scope.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import type { TtsAutoMode } from "../../config/types.tts.js"; import { logVerbose } from "../../globals.js"; import { emitAgentEvent } from "../../infra/agent-events.js"; import { isDiagnosticsEnabled } from "../../infra/diagnostic-events.js"; import { formatErrorMessage } from "../../infra/errors.js"; import { generateSecureUuid } from "../../infra/secure-random.js"; import { prefixSystemMessage } from "../../infra/system-message.js"; import { markDiagnosticSessionProgress } from "../../logging/diagnostic.js"; import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; import { createLazyImportLoader } from "../../shared/lazy-promise.js"; import { normalizeLowercaseStringOrEmpty, normalizeOptionalLowercaseString, normalizeOptionalString, } from "../../shared/string-coerce.js"; import { resolveStatusTtsSnapshot } from "../../tts/status-config.js"; import { resolveConfiguredTtsMode } from "../../tts/tts-config.js"; import type { SourceReplyDeliveryMode } from "../get-reply-options.types.js"; import type { FinalizedMsgContext } from "../templating.js"; import { createAcpReplyProjector } from "./acp-projector.js"; import { loadDispatchAcpMediaRuntime, resolveAcpAttachments, resolveAcpInlineImageAttachments, } from "./dispatch-acp-attachments.js"; import { createAcpDispatchDeliveryCoordinator, type AcpDispatchDeliveryCoordinator, } from "./dispatch-acp-delivery.js"; import { hasInboundMedia } from "./inbound-media.js"; import type { ReplyDispatchKind, ReplyDispatcher } from "./reply-dispatcher.types.js"; const dispatchAcpManagerRuntimeLoader = createLazyImportLoader( () => import("./dispatch-acp-manager.runtime.js"), ); const dispatchAcpSessionRuntimeLoader = createLazyImportLoader( () => import("./dispatch-acp-session.runtime.js"), ); const dispatchAcpTtsRuntimeLoader = createLazyImportLoader( () => import("./dispatch-acp-tts.runtime.js"), ); const dispatchAcpTranscriptRuntimeLoader = createLazyImportLoader( () => import("./dispatch-acp-transcript.runtime.js"), ); function loadDispatchAcpManagerRuntime() { return dispatchAcpManagerRuntimeLoader.load(); } function loadDispatchAcpSessionRuntime() { return dispatchAcpSessionRuntimeLoader.load(); } function loadDispatchAcpTtsRuntime() { return dispatchAcpTtsRuntimeLoader.load(); } function loadDispatchAcpTranscriptRuntime() { return dispatchAcpTranscriptRuntimeLoader.load(); } type DispatchProcessedRecorder = ( outcome: "completed" | "skipped" | "error", opts?: { reason?: string; error?: string; }, ) => void; function resolveFirstContextText( ctx: FinalizedMsgContext, keys: Array<"BodyForAgent" | "BodyForCommands" | "CommandBody" | "RawBody" | "Body">, ): string { for (const key of keys) { const value = ctx[key]; if (typeof value === "string") { return value; } } return ""; } function resolveAcpPromptText(ctx: FinalizedMsgContext): string { return resolveFirstContextText(ctx, [ "BodyForAgent", "BodyForCommands", "CommandBody", "RawBody", "Body", ]).trim(); } function resolveAcpRequestId(ctx: FinalizedMsgContext): string { const id = ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast; if (typeof id === "string") { const normalizedId = normalizeOptionalString(id); if (normalizedId) { return normalizedId; } } if (typeof id === "number" || typeof id === "bigint") { return String(id); } return generateSecureUuid(); } function resolveAcpTurnText(params: { promptText: string; sourceReplyDeliveryMode?: SourceReplyDeliveryMode; }): string { if (params.sourceReplyDeliveryMode !== "message_tool_only") { return params.promptText; } const guidance = prefixSystemMessage( [ "Source channel delivery is private by default for this turn.", "Normal ACP final output will not be automatically posted to the source channel.", "To send visible output, use message(action=send). The target defaults to the current source channel.", ].join(" "), ); return params.promptText ? `${guidance}\n\n${params.promptText}` : guidance; } async function hasBoundConversationForSession(params: { cfg: OpenClawConfig; sessionKey: string; channelRaw: string | undefined; accountIdRaw: string | undefined; }): Promise { const channel = normalizeOptionalLowercaseString(params.channelRaw) ?? ""; if (!channel) { return false; } const accountId = normalizeOptionalLowercaseString(params.accountIdRaw) ?? ""; const channels = params.cfg.channels as Record; const configuredDefaultAccountId = channels?.[channel]?.defaultAccount; const normalizedAccountId = accountId || normalizeOptionalLowercaseString(configuredDefaultAccountId) || "default"; const { getSessionBindingService } = await loadDispatchAcpManagerRuntime(); const bindingService = getSessionBindingService(); const bindings = bindingService.listBySession(params.sessionKey); return bindings.some((binding) => { const bindingChannel = normalizeOptionalLowercaseString(binding.conversation.channel) ?? ""; const bindingAccountId = normalizeOptionalLowercaseString(binding.conversation.accountId) ?? ""; const conversationId = normalizeOptionalString(binding.conversation.conversationId) ?? ""; return ( bindingChannel === channel && (bindingAccountId || "default") === normalizedAccountId && conversationId.length > 0 ); }); } export type AcpDispatchAttemptResult = { queuedFinal: boolean; counts: Record; }; const ACP_STALE_BINDING_UNBIND_REASON = "acp-session-init-failed"; function isStaleSessionInitError(params: { code: string; message: string }): boolean { if (params.code !== "ACP_SESSION_INIT_FAILED") { return false; } return /(ACP (session )?metadata is missing|missing ACP metadata|Session is not ACP-enabled|Resource not found)/i.test( params.message, ); } async function maybeUnbindStaleBoundConversations(params: { targetSessionKey: string; error: { code: string; message: string }; }): Promise { if (!isStaleSessionInitError(params.error)) { return; } try { const { getSessionBindingService } = await loadDispatchAcpManagerRuntime(); const removed = await getSessionBindingService().unbind({ targetSessionKey: params.targetSessionKey, reason: ACP_STALE_BINDING_UNBIND_REASON, }); if (removed.length > 0) { logVerbose( `dispatch-acp: removed ${removed.length} stale bound conversation(s) for ${params.targetSessionKey} after ${params.error.code}: ${params.error.message}`, ); } } catch (error) { logVerbose( `dispatch-acp: failed to unbind stale bound conversations for ${params.targetSessionKey}: ${formatErrorMessage(error)}`, ); } } async function finalizeAcpTurnOutput(params: { cfg: OpenClawConfig; sessionKey: string; agentId: string; delivery: AcpDispatchDeliveryCoordinator; inboundAudio: boolean; sessionTtsAuto?: TtsAutoMode; ttsChannel?: string; ttsAccountId?: string; shouldEmitResolvedIdentityNotice: boolean; }): Promise { await params.delivery.settleVisibleText(); let queuedFinal = params.delivery.hasDeliveredVisibleText() && !params.delivery.hasFailedVisibleTextDelivery(); const ttsMode = resolveConfiguredTtsMode(params.cfg, { agentId: params.agentId, channelId: params.ttsChannel, accountId: params.ttsAccountId, }); const accumulatedVisibleBlockText = params.delivery.getAccumulatedVisibleBlockText(); const accumulatedBlockTtsText = params.delivery.getAccumulatedBlockTtsText(); const hasAccumulatedBlockText = accumulatedBlockTtsText.trim().length > 0; const ttsStatus = resolveStatusTtsSnapshot({ cfg: params.cfg, sessionAuto: params.sessionTtsAuto, agentId: params.agentId, channelId: params.ttsChannel, accountId: params.ttsAccountId, }); const canAttemptFinalTts = ttsStatus != null && !(ttsStatus.autoMode === "inbound" && !params.inboundAudio); let finalMediaDelivered = false; if (ttsMode === "final" && hasAccumulatedBlockText && canAttemptFinalTts) { try { const { maybeApplyTtsToPayload } = await loadDispatchAcpTtsRuntime(); const ttsSyntheticReply = await maybeApplyTtsToPayload({ payload: { text: accumulatedBlockTtsText }, cfg: params.cfg, channel: params.ttsChannel, kind: "final", inboundAudio: params.inboundAudio, ttsAuto: params.sessionTtsAuto, agentId: params.agentId, accountId: params.ttsAccountId, }); if (ttsSyntheticReply.mediaUrl) { const delivered = await params.delivery.deliver("final", { mediaUrl: ttsSyntheticReply.mediaUrl, audioAsVoice: ttsSyntheticReply.audioAsVoice, spokenText: accumulatedBlockTtsText, }); queuedFinal = queuedFinal || delivered; finalMediaDelivered = delivered; } } catch (err) { logVerbose(`dispatch-acp: accumulated ACP block TTS failed: ${formatErrorMessage(err)}`); } } // Some ACP parent surfaces only expose terminal replies, so block routing alone is not enough // to prove the final result was visible to the user. const shouldDeliverTextFallback = ttsMode !== "all" && accumulatedVisibleBlockText.trim().length > 0 && !finalMediaDelivered && !params.delivery.hasDeliveredFinalReply() && (!params.delivery.hasDeliveredVisibleText() || params.delivery.hasFailedVisibleTextDelivery()); if (shouldDeliverTextFallback) { const delivered = await params.delivery.deliver( "final", { text: accumulatedVisibleBlockText }, { skipTts: true }, ); queuedFinal = queuedFinal || delivered; } if (params.shouldEmitResolvedIdentityNotice) { const { readAcpSessionEntry } = await loadDispatchAcpSessionRuntime(); const currentMeta = readAcpSessionEntry({ cfg: params.cfg, sessionKey: params.sessionKey, })?.acp; const identityAfterTurn = resolveSessionIdentityFromMeta(currentMeta); if (!isSessionIdentityPending(identityAfterTurn)) { const resolvedDetails = resolveAcpThreadSessionDetailLines({ sessionKey: params.sessionKey, meta: currentMeta, }); if (resolvedDetails.length > 0) { const delivered = await params.delivery.deliver("final", { text: prefixSystemMessage(["Session ids resolved.", ...resolvedDetails].join("\n")), }); queuedFinal = queuedFinal || delivered; } } } return queuedFinal; } export async function tryDispatchAcpReply(params: { ctx: FinalizedMsgContext; cfg: OpenClawConfig; dispatcher: ReplyDispatcher; runId?: string; sessionKey?: string; images?: Array<{ data: string; mimeType: string }>; abortSignal?: AbortSignal; inboundAudio: boolean; sessionTtsAuto?: TtsAutoMode; ttsChannel?: string; suppressUserDelivery?: boolean; suppressReplyLifecycle?: boolean; sourceReplyDeliveryMode?: SourceReplyDeliveryMode; shouldRouteToOriginating: boolean; originatingChannel?: string; originatingTo?: string; shouldSendToolSummaries: boolean; bypassForCommand: boolean; onReplyStart?: () => Promise | void; recordProcessed: DispatchProcessedRecorder; markIdle: (reason: string) => void; }): Promise { const sessionKey = normalizeOptionalString(params.sessionKey); if (!sessionKey || params.bypassForCommand) { return null; } const { getAcpSessionManager } = await loadDispatchAcpManagerRuntime(); const acpManager = getAcpSessionManager(); const acpResolution = acpManager.resolveSession({ cfg: params.cfg, sessionKey, }); if (acpResolution.kind === "none") { return null; } const canonicalSessionKey = acpResolution.sessionKey; const acpAgentId = resolveAgentIdFromSessionKey(canonicalSessionKey); const progressSessionKeys = isDiagnosticsEnabled(params.cfg) ? Array.from( new Set( [params.ctx.SessionKey, sessionKey, canonicalSessionKey] .map((key) => normalizeOptionalString(key)) .filter((key): key is string => Boolean(key)), ), ) : []; const markAcpProgress = progressSessionKeys.length > 0 ? () => { for (const key of progressSessionKeys) { markDiagnosticSessionProgress({ sessionKey: key }); } } : undefined; let queuedFinal = false; const delivery = createAcpDispatchDeliveryCoordinator({ cfg: params.cfg, agentId: acpAgentId, ctx: params.ctx, dispatcher: params.dispatcher, inboundAudio: params.inboundAudio, sessionKey: canonicalSessionKey, sessionTtsAuto: params.sessionTtsAuto, ttsChannel: params.ttsChannel, suppressUserDelivery: params.suppressUserDelivery, suppressReplyLifecycle: params.suppressReplyLifecycle, shouldRouteToOriginating: params.shouldRouteToOriginating, originatingChannel: params.originatingChannel, originatingTo: params.originatingTo, onReplyStart: params.onReplyStart, }); const identityPendingBeforeTurn = isSessionIdentityPending( resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined), ); const shouldEmitResolvedIdentityNotice = !params.suppressUserDelivery && identityPendingBeforeTurn && (Boolean( params.ctx.MessageThreadId != null && (normalizeOptionalString(String(params.ctx.MessageThreadId)) ?? ""), ) || (await hasBoundConversationForSession({ cfg: params.cfg, sessionKey: canonicalSessionKey, channelRaw: params.ctx.OriginatingChannel ?? params.ctx.Surface ?? params.ctx.Provider, accountIdRaw: params.ctx.AccountId, }))); const resolvedAcpAgent = acpResolution.kind === "ready" ? (normalizeOptionalString(acpResolution.meta.agent) ?? normalizeOptionalString(params.cfg.acp?.defaultAgent) ?? resolveAgentIdFromSessionKey(canonicalSessionKey)) : resolveAgentIdFromSessionKey(canonicalSessionKey); const normalizedDispatchChannel = normalizeOptionalLowercaseString( params.ctx.OriginatingChannel ?? params.ctx.Surface ?? params.ctx.Provider, ); const explicitDispatchAccountId = normalizeOptionalString(params.ctx.AccountId); const dispatchChannels = params.cfg.channels as | Record | undefined; const defaultDispatchAccount = normalizedDispatchChannel == null ? undefined : dispatchChannels?.[normalizedDispatchChannel]?.defaultAccount; const effectiveDispatchAccountId = explicitDispatchAccountId ?? normalizeOptionalString(defaultDispatchAccount); const projector = createAcpReplyProjector({ cfg: params.cfg, shouldSendToolSummaries: params.shouldSendToolSummaries, deliver: delivery.deliver, onProgress: markAcpProgress, provider: params.ctx.Surface ?? params.ctx.Provider, accountId: effectiveDispatchAccountId, }); const acpDispatchStartedAt = Date.now(); try { const dispatchPolicyError = resolveAcpDispatchPolicyError(params.cfg); if (dispatchPolicyError) { throw dispatchPolicyError; } if (acpResolution.kind === "stale") { await maybeUnbindStaleBoundConversations({ targetSessionKey: canonicalSessionKey, error: acpResolution.error, }); const delivered = await delivery.deliver("final", { text: formatAcpRuntimeErrorText(acpResolution.error), isError: true, }); const counts = params.dispatcher.getQueuedCounts(); delivery.applyRoutedCounts(counts); const acpStats = acpManager.getObservabilitySnapshot(params.cfg); logVerbose( `acp-dispatch: session=${sessionKey} outcome=error code=${acpResolution.error.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`, ); params.recordProcessed("completed", { reason: `acp_error:${normalizeLowercaseStringOrEmpty(acpResolution.error.code)}`, }); params.markIdle("message_completed"); return { queuedFinal: delivered, counts }; } const agentPolicyError = resolveAcpAgentPolicyError(params.cfg, resolvedAcpAgent); if (agentPolicyError) { throw agentPolicyError; } if (hasInboundMedia(params.ctx) && !params.ctx.MediaUnderstanding?.length) { try { const { applyMediaUnderstanding } = await loadDispatchAcpMediaRuntime(); await applyMediaUnderstanding({ ctx: params.ctx, cfg: params.cfg, agentDir: resolveAgentDir(params.cfg, acpAgentId), }); } catch (err) { logVerbose( `dispatch-acp: media understanding failed, proceeding with raw content: ${formatErrorMessage(err)}`, ); } } const promptText = resolveAcpPromptText(params.ctx); const mediaAttachments = hasInboundMedia(params.ctx) ? await resolveAcpAttachments({ ctx: params.ctx, cfg: params.cfg }) : []; const attachments = mediaAttachments.length > 0 ? mediaAttachments : resolveAcpInlineImageAttachments(params.images); if (!promptText && attachments.length === 0) { const counts = params.dispatcher.getQueuedCounts(); delivery.applyRoutedCounts(counts); params.recordProcessed("completed", { reason: "acp_empty_prompt" }); params.markIdle("message_completed"); return { queuedFinal: false, counts }; } try { await delivery.startReplyLifecycle(); } catch (error) { logVerbose(`dispatch-acp: start reply lifecycle failed: ${formatErrorMessage(error)}`); } await acpManager.runTurn({ cfg: params.cfg, sessionKey: canonicalSessionKey, text: resolveAcpTurnText({ promptText, sourceReplyDeliveryMode: params.sourceReplyDeliveryMode, }), attachments: attachments.length > 0 ? attachments : undefined, mode: "prompt", requestId: resolveAcpRequestId(params.ctx), ...(params.abortSignal ? { signal: params.abortSignal } : {}), onEvent: async (event) => await projector.onEvent(event), }); await projector.flush(true); if (params.abortSignal?.aborted) { const counts = params.dispatcher.getQueuedCounts(); delivery.applyRoutedCounts(counts); params.recordProcessed("completed", { reason: "acp_aborted" }); params.markIdle("message_aborted"); return { queuedFinal, counts }; } try { const { persistAcpDispatchTranscript } = await loadDispatchAcpTranscriptRuntime(); await persistAcpDispatchTranscript({ cfg: params.cfg, sessionKey: canonicalSessionKey, promptText, finalText: delivery.getAccumulatedFinalText() || delivery.getAccumulatedBlockText(), meta: acpResolution.meta, threadId: params.ctx.MessageThreadId, }); } catch (error) { logVerbose( `dispatch-acp: transcript persistence failed for ${canonicalSessionKey}: ${formatErrorMessage( error, )}`, ); } queuedFinal = (await finalizeAcpTurnOutput({ cfg: params.cfg, sessionKey: canonicalSessionKey, agentId: acpAgentId, delivery, inboundAudio: params.inboundAudio, sessionTtsAuto: params.sessionTtsAuto, ttsChannel: params.ttsChannel, ttsAccountId: effectiveDispatchAccountId, shouldEmitResolvedIdentityNotice, })) || queuedFinal; const counts = params.dispatcher.getQueuedCounts(); delivery.applyRoutedCounts(counts); const acpStats = acpManager.getObservabilitySnapshot(params.cfg); const runId = normalizeOptionalString(params.runId); if (runId) { emitAgentEvent({ runId, sessionKey, stream: "lifecycle", data: { phase: "end", startedAt: acpDispatchStartedAt, endedAt: Date.now(), }, }); } logVerbose( `acp-dispatch: session=${sessionKey} outcome=ok latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`, ); params.recordProcessed("completed", { reason: "acp_dispatch" }); params.markIdle("message_completed"); return { queuedFinal, counts }; } catch (err) { await projector.flush(true); const acpError = toAcpRuntimeError({ error: err, fallbackCode: "ACP_TURN_FAILED", fallbackMessage: "ACP turn failed before completion.", }); await maybeUnbindStaleBoundConversations({ targetSessionKey: canonicalSessionKey, error: acpError, }); const delivered = await delivery.deliver("final", { text: formatAcpRuntimeErrorText(acpError), isError: true, }); queuedFinal = queuedFinal || delivered; const counts = params.dispatcher.getQueuedCounts(); delivery.applyRoutedCounts(counts); const acpStats = acpManager.getObservabilitySnapshot(params.cfg); const runId = normalizeOptionalString(params.runId); if (runId) { emitAgentEvent({ runId, sessionKey, stream: "lifecycle", data: { phase: "error", startedAt: acpDispatchStartedAt, endedAt: Date.now(), error: acpError.message, }, }); } logVerbose( `acp-dispatch: session=${sessionKey} outcome=error code=${acpError.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`, ); params.recordProcessed("completed", { reason: `acp_error:${normalizeLowercaseStringOrEmpty(acpError.code)}`, }); params.markIdle("message_completed"); return { queuedFinal, counts }; } }