diff --git a/src/auto-reply/reply/dispatch-acp-command-bypass.ts b/src/auto-reply/reply/dispatch-acp-command-bypass.ts new file mode 100644 index 00000000000..33a36dfe968 --- /dev/null +++ b/src/auto-reply/reply/dispatch-acp-command-bypass.ts @@ -0,0 +1,57 @@ +import type { OpenClawConfig } from "../../config/config.js"; +import { + isCommandEnabled, + maybeResolveTextAlias, + shouldHandleTextCommands, +} from "../commands-registry.js"; +import type { FinalizedMsgContext } from "../templating.js"; + +function resolveFirstContextText( + ctx: FinalizedMsgContext, + keys: Array<"BodyForAgent" | "BodyForCommands" | "CommandBody" | "RawBody" | "Body">, +): string { + for (const key of keys) { + const value = ctx[key]; + if (typeof value === "string") { + return value; + } + } + return ""; +} + +function resolveCommandCandidateText(ctx: FinalizedMsgContext): string { + return resolveFirstContextText(ctx, ["CommandBody", "BodyForCommands", "RawBody", "Body"]).trim(); +} + +export function shouldBypassAcpDispatchForCommand( + ctx: FinalizedMsgContext, + cfg: OpenClawConfig, +): boolean { + const candidate = resolveCommandCandidateText(ctx); + if (!candidate) { + return false; + } + const allowTextCommands = shouldHandleTextCommands({ + cfg, + surface: ctx.Surface ?? ctx.Provider ?? "", + commandSource: ctx.CommandSource, + }); + if (maybeResolveTextAlias(candidate, cfg) != null) { + return allowTextCommands; + } + + const normalized = candidate.trim(); + if (!normalized.startsWith("!")) { + return false; + } + + if (!ctx.CommandAuthorized) { + return false; + } + + if (!isCommandEnabled(cfg, "bash")) { + return false; + } + + return allowTextCommands; +} diff --git a/src/auto-reply/reply/dispatch-acp-delivery.ts b/src/auto-reply/reply/dispatch-acp-delivery.ts index 7cdf54e23aa..ff2966a9e5e 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.ts @@ -1,14 +1,41 @@ import { hasOutboundReplyContent } from "openclaw/plugin-sdk/reply-payload"; -import { getChannelPlugin } from "../../channels/plugins/index.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { TtsAutoMode } from "../../config/types.tts.js"; import { logVerbose } from "../../globals.js"; -import { runMessageAction } from "../../infra/outbound/message-action-runner.js"; -import { maybeApplyTtsToPayload } from "../../tts/tts.js"; +import { resolveStatusTtsSnapshot } from "../../tts/status-config.js"; +import { resolveConfiguredTtsMode } from "../../tts/tts-config.js"; import type { FinalizedMsgContext } from "../templating.js"; import type { ReplyPayload } from "../types.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; -import { routeReply } from "./route-reply.js"; + +let routeReplyRuntimePromise: Promise | null = null; +let dispatchAcpTtsRuntimePromise: Promise | null = + null; +let channelPluginRuntimePromise: Promise | null = + null; +let messageActionRuntimePromise: Promise< + typeof import("../../infra/outbound/message-action-runner.js") +> | null = null; + +function loadRouteReplyRuntime() { + routeReplyRuntimePromise ??= import("./route-reply.runtime.js"); + return routeReplyRuntimePromise; +} + +function loadDispatchAcpTtsRuntime() { + dispatchAcpTtsRuntimePromise ??= import("./dispatch-acp-tts.runtime.js"); + return dispatchAcpTtsRuntimePromise; +} + +function loadChannelPluginRuntime() { + channelPluginRuntimePromise ??= import("../../channels/plugins/index.js"); + return channelPluginRuntimePromise; +} + +function loadMessageActionRuntime() { + messageActionRuntimePromise ??= import("../../infra/outbound/message-action-runner.js"); + return messageActionRuntimePromise; +} export type AcpDispatchDeliveryMeta = { toolCallId?: string; @@ -51,11 +78,11 @@ function resolveDeliveryAccountId(params: { : undefined; } -function shouldTreatDeliveredTextAsVisible(params: { +async function shouldTreatDeliveredTextAsVisible(params: { channel: string | undefined; kind: ReplyDispatchKind; text: string | undefined; -}): boolean { +}): Promise { if (!params.text?.trim()) { return false; } @@ -66,6 +93,13 @@ function shouldTreatDeliveredTextAsVisible(params: { if (!channelId) { return false; } + // Only Telegram currently overrides block/tool visibility via channel runtime. + // Keep other channels on the fast path so ACP local delivery does not pay the + // broader channel-registry import cost on every streamed turn. + if (channelId !== "telegram") { + return false; + } + const { getChannelPlugin } = await loadChannelPluginRuntime(); return ( getChannelPlugin(channelId)?.outbound?.shouldTreatRoutedTextAsVisible?.({ kind: params.kind, @@ -74,6 +108,42 @@ function shouldTreatDeliveredTextAsVisible(params: { ); } +async function maybeApplyAcpTts(params: { + payload: ReplyPayload; + cfg: OpenClawConfig; + channel?: string; + kind: ReplyDispatchKind; + inboundAudio: boolean; + ttsAuto?: TtsAutoMode; + skipTts?: boolean; +}): Promise { + if (params.skipTts) { + return params.payload; + } + const ttsStatus = resolveStatusTtsSnapshot({ + cfg: params.cfg, + sessionAuto: params.ttsAuto, + }); + if (!ttsStatus) { + return params.payload; + } + if (ttsStatus.autoMode === "inbound" && !params.inboundAudio) { + return params.payload; + } + if (params.kind !== "final" && resolveConfiguredTtsMode(params.cfg) === "final") { + return params.payload; + } + const { maybeApplyTtsToPayload } = await loadDispatchAcpTtsRuntime(); + return await maybeApplyTtsToPayload({ + payload: params.payload, + cfg: params.cfg, + channel: params.channel, + kind: params.kind, + inboundAudio: params.inboundAudio, + ttsAuto: params.ttsAuto, + }); +} + type AcpDispatchDeliveryState = { startedReplyLifecycle: boolean; accumulatedBlockText: string; @@ -182,6 +252,7 @@ export function createAcpDispatchDeliveryCoordinator(params: { } try { + const { runMessageAction } = await loadMessageActionRuntime(); await runMessageAction({ cfg: params.cfg, action: "edit", @@ -226,16 +297,15 @@ export function createAcpDispatchDeliveryCoordinator(params: { return false; } - const ttsPayload = meta?.skipTts - ? payload - : await maybeApplyTtsToPayload({ - payload, - cfg: params.cfg, - channel: params.ttsChannel, - kind, - inboundAudio: params.inboundAudio, - ttsAuto: params.sessionTtsAuto, - }); + const ttsPayload = await maybeApplyAcpTts({ + payload, + cfg: params.cfg, + channel: params.ttsChannel, + kind, + inboundAudio: params.inboundAudio, + ttsAuto: params.sessionTtsAuto, + skipTts: meta?.skipTts, + }); if (params.shouldRouteToOriginating && params.originatingChannel && params.originatingTo) { const toolCallId = meta?.toolCallId?.trim(); @@ -246,11 +316,12 @@ export function createAcpDispatchDeliveryCoordinator(params: { } } - const tracksVisibleText = shouldTreatDeliveredTextAsVisible({ + const tracksVisibleText = await shouldTreatDeliveredTextAsVisible({ channel: routedChannel, kind, text: ttsPayload.text, }); + const { routeReply } = await loadRouteReplyRuntime(); const result = await routeReply({ payload: ttsPayload, channel: params.originatingChannel, @@ -288,7 +359,7 @@ export function createAcpDispatchDeliveryCoordinator(params: { return true; } - const tracksVisibleText = shouldTreatDeliveredTextAsVisible({ + const tracksVisibleText = await shouldTreatDeliveredTextAsVisible({ channel: directChannel, kind, text: ttsPayload.text, diff --git a/src/auto-reply/reply/dispatch-acp-manager.runtime.ts b/src/auto-reply/reply/dispatch-acp-manager.runtime.ts new file mode 100644 index 00000000000..17d4c6b8530 --- /dev/null +++ b/src/auto-reply/reply/dispatch-acp-manager.runtime.ts @@ -0,0 +1,2 @@ +export { getAcpSessionManager } from "../../acp/control-plane/manager.js"; +export { getSessionBindingService } from "../../infra/outbound/session-binding-service.js"; diff --git a/src/auto-reply/reply/dispatch-acp-media.runtime.ts b/src/auto-reply/reply/dispatch-acp-media.runtime.ts new file mode 100644 index 00000000000..94172ddb21c --- /dev/null +++ b/src/auto-reply/reply/dispatch-acp-media.runtime.ts @@ -0,0 +1,5 @@ +export { applyMediaUnderstanding } from "../../media-understanding/apply.js"; +export { MediaAttachmentCache } from "../../media-understanding/attachments.js"; +export { normalizeAttachments } from "../../media-understanding/attachments.normalize.js"; +export { isMediaUnderstandingSkipError } from "../../media-understanding/errors.js"; +export { resolveMediaAttachmentLocalRoots } from "../../media-understanding/runner.js"; diff --git a/src/auto-reply/reply/dispatch-acp-session.runtime.ts b/src/auto-reply/reply/dispatch-acp-session.runtime.ts new file mode 100644 index 00000000000..9a7dd8318a2 --- /dev/null +++ b/src/auto-reply/reply/dispatch-acp-session.runtime.ts @@ -0,0 +1 @@ +export { readAcpSessionEntry } from "../../acp/runtime/session-meta.js"; diff --git a/src/auto-reply/reply/dispatch-acp-tts.runtime.ts b/src/auto-reply/reply/dispatch-acp-tts.runtime.ts new file mode 100644 index 00000000000..72cee87645a --- /dev/null +++ b/src/auto-reply/reply/dispatch-acp-tts.runtime.ts @@ -0,0 +1 @@ +export { maybeApplyTtsToPayload } from "../../tts/tts.runtime.js"; diff --git a/src/auto-reply/reply/dispatch-acp.runtime.ts b/src/auto-reply/reply/dispatch-acp.runtime.ts index 87b654cfc76..8600cc18ddf 100644 --- a/src/auto-reply/reply/dispatch-acp.runtime.ts +++ b/src/auto-reply/reply/dispatch-acp.runtime.ts @@ -1 +1,31 @@ -export { shouldBypassAcpDispatchForCommand, tryDispatchAcpReply } from "./dispatch-acp.js"; +import type { shouldBypassAcpDispatchForCommand as ShouldBypassAcpDispatchForCommand } from "./dispatch-acp-command-bypass.js"; +import type { tryDispatchAcpReply as TryDispatchAcpReply } from "./dispatch-acp.js"; + +let dispatchAcpPromise: Promise | null = null; +let dispatchAcpCommandBypassPromise: Promise< + typeof import("./dispatch-acp-command-bypass.js") +> | null = null; + +function loadDispatchAcp() { + dispatchAcpPromise ??= import("./dispatch-acp.js"); + return dispatchAcpPromise; +} + +function loadDispatchAcpCommandBypass() { + dispatchAcpCommandBypassPromise ??= import("./dispatch-acp-command-bypass.js"); + return dispatchAcpCommandBypassPromise; +} + +export async function shouldBypassAcpDispatchForCommand( + ...args: Parameters +): Promise> { + const mod = await loadDispatchAcpCommandBypass(); + return mod.shouldBypassAcpDispatchForCommand(...args); +} + +export async function tryDispatchAcpReply( + ...args: Parameters +): ReturnType { + const mod = await loadDispatchAcp(); + return await mod.tryDispatchAcpReply(...args); +} diff --git a/src/auto-reply/reply/dispatch-acp.test.ts b/src/auto-reply/reply/dispatch-acp.test.ts index 734f0965e49..88f03672fac 100644 --- a/src/auto-reply/reply/dispatch-acp.test.ts +++ b/src/auto-reply/reply/dispatch-acp.test.ts @@ -241,6 +241,20 @@ describe("tryDispatchAcpReply", () => { maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg), })); + vi.doMock("../../tts/tts.runtime.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), + })); + vi.doMock("../../tts/status-config.js", () => ({ + resolveStatusTtsSnapshot: () => ({ + autoMode: "always", + provider: "auto", + maxLength: 1500, + summarize: true, + }), + })); + vi.doMock("./dispatch-acp-tts.runtime.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), + })); vi.doMock("../../media-understanding/apply.js", () => ({ applyMediaUnderstanding: (params: unknown) => mediaUnderstandingMocks.applyMediaUnderstanding(params), @@ -249,6 +263,10 @@ describe("tryDispatchAcpReply", () => { readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) => sessionMetaMocks.readAcpSessionEntry(params), })); + vi.doMock("./dispatch-acp-session.runtime.js", () => ({ + readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) => + sessionMetaMocks.readAcpSessionEntry(params), + })); vi.doMock("../../infra/outbound/session-binding-service.js", () => ({ getSessionBindingService: () => ({ listBySession: (targetSessionKey: string) => @@ -405,6 +423,17 @@ describe("tryDispatchAcpReply", () => { expect(onReplyStart).not.toHaveBeenCalled(); }); + it("skips media understanding for text-only ACP turns", async () => { + setReadyAcpResolution(); + mockVisibleTextTurn("text only"); + + await runDispatch({ + bodyForAgent: "plain text prompt", + }); + + expect(mediaUnderstandingMocks.applyMediaUnderstanding).not.toHaveBeenCalled(); + }); + it("forwards normalized image attachments into ACP turns", async () => { setReadyAcpResolution(); const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-")); diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 267e109f0ea..1a31c6a4b86 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -1,4 +1,3 @@ -import { getAcpSessionManager } from "../../acp/control-plane/manager.js"; import type { AcpTurnAttachment } from "../../acp/control-plane/manager.types.js"; import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../../acp/policy.js"; import { formatAcpRuntimeErrorText } from "../../acp/runtime/error-text.js"; @@ -8,26 +7,15 @@ import { isSessionIdentityPending, resolveSessionIdentityFromMeta, } from "../../acp/runtime/session-identity.js"; -import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { TtsAutoMode } from "../../config/types.tts.js"; import { logVerbose } from "../../globals.js"; import { emitAgentEvent } from "../../infra/agent-events.js"; -import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js"; import { generateSecureUuid } from "../../infra/secure-random.js"; import { prefixSystemMessage } from "../../infra/system-message.js"; -import { applyMediaUnderstanding } from "../../media-understanding/apply.js"; -import { MediaAttachmentCache } from "../../media-understanding/attachments.js"; -import { normalizeAttachments } from "../../media-understanding/attachments.normalize.js"; -import { isMediaUnderstandingSkipError } from "../../media-understanding/errors.js"; -import { resolveMediaAttachmentLocalRoots } from "../../media-understanding/runner.js"; import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; -import { maybeApplyTtsToPayload, resolveTtsConfig } from "../../tts/tts.js"; -import { - isCommandEnabled, - maybeResolveTextAlias, - shouldHandleTextCommands, -} from "../commands-registry.js"; +import { resolveStatusTtsSnapshot } from "../../tts/status-config.js"; +import { resolveConfiguredTtsMode } from "../../tts/tts-config.js"; import type { FinalizedMsgContext } from "../templating.js"; import { createAcpReplyProjector } from "./acp-projector.js"; import { @@ -36,6 +24,38 @@ import { } from "./dispatch-acp-delivery.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; +let dispatchAcpMediaRuntimePromise: Promise< + typeof import("./dispatch-acp-media.runtime.js") +> | null = null; +let dispatchAcpManagerRuntimePromise: Promise< + typeof import("./dispatch-acp-manager.runtime.js") +> | null = null; +let dispatchAcpSessionRuntimePromise: Promise< + typeof import("./dispatch-acp-session.runtime.js") +> | null = null; +let dispatchAcpTtsRuntimePromise: Promise | null = + null; + +function loadDispatchAcpMediaRuntime() { + dispatchAcpMediaRuntimePromise ??= import("./dispatch-acp-media.runtime.js"); + return dispatchAcpMediaRuntimePromise; +} + +function loadDispatchAcpManagerRuntime() { + dispatchAcpManagerRuntimePromise ??= import("./dispatch-acp-manager.runtime.js"); + return dispatchAcpManagerRuntimePromise; +} + +function loadDispatchAcpSessionRuntime() { + dispatchAcpSessionRuntimePromise ??= import("./dispatch-acp-session.runtime.js"); + return dispatchAcpSessionRuntimePromise; +} + +function loadDispatchAcpTtsRuntime() { + dispatchAcpTtsRuntimePromise ??= import("./dispatch-acp-tts.runtime.js"); + return dispatchAcpTtsRuntimePromise; +} + type DispatchProcessedRecorder = ( outcome: "completed" | "skipped" | "error", opts?: { @@ -67,6 +87,18 @@ function resolveAcpPromptText(ctx: FinalizedMsgContext): string { ]).trim(); } +function hasInboundMediaForAcp(ctx: FinalizedMsgContext): boolean { + return Boolean( + ctx.StickerMediaIncluded || + ctx.Sticker || + ctx.MediaPath?.trim() || + ctx.MediaUrl?.trim() || + ctx.MediaPaths?.some((value) => value?.trim()) || + ctx.MediaUrls?.some((value) => value?.trim()) || + ctx.MediaTypes?.length, + ); +} + const ACP_ATTACHMENT_MAX_BYTES = 10 * 1024 * 1024; const ACP_ATTACHMENT_TIMEOUT_MS = 1_000; @@ -74,6 +106,12 @@ async function resolveAcpAttachments( ctx: FinalizedMsgContext, cfg: OpenClawConfig, ): Promise { + const { + MediaAttachmentCache, + isMediaUnderstandingSkipError, + normalizeAttachments, + resolveMediaAttachmentLocalRoots, + } = await loadDispatchAcpMediaRuntime(); const mediaAttachments = normalizeAttachments(ctx).map((attachment) => attachment.path?.trim() ? { ...attachment, url: undefined } : attachment, ); @@ -114,43 +152,6 @@ async function resolveAcpAttachments( return results; } -function resolveCommandCandidateText(ctx: FinalizedMsgContext): string { - return resolveFirstContextText(ctx, ["CommandBody", "BodyForCommands", "RawBody", "Body"]).trim(); -} - -export function shouldBypassAcpDispatchForCommand( - ctx: FinalizedMsgContext, - cfg: OpenClawConfig, -): boolean { - const candidate = resolveCommandCandidateText(ctx); - if (!candidate) { - return false; - } - const allowTextCommands = shouldHandleTextCommands({ - cfg, - surface: ctx.Surface ?? ctx.Provider ?? "", - commandSource: ctx.CommandSource, - }); - if (maybeResolveTextAlias(candidate, cfg) != null) { - return allowTextCommands; - } - - const normalized = candidate.trim(); - if (!normalized.startsWith("!")) { - return false; - } - - if (!ctx.CommandAuthorized) { - return false; - } - - if (!isCommandEnabled(cfg, "bash")) { - return false; - } - - return allowTextCommands; -} - function resolveAcpRequestId(ctx: FinalizedMsgContext): string { const id = ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast; if (typeof id === "string" && id.trim()) { @@ -162,12 +163,12 @@ function resolveAcpRequestId(ctx: FinalizedMsgContext): string { return generateSecureUuid(); } -function hasBoundConversationForSession(params: { +async function hasBoundConversationForSession(params: { cfg: OpenClawConfig; sessionKey: string; channelRaw: string | undefined; accountIdRaw: string | undefined; -}): boolean { +}): Promise { const channel = String(params.channelRaw ?? "") .trim() .toLowerCase(); @@ -184,6 +185,7 @@ function hasBoundConversationForSession(params: { (typeof configuredDefaultAccountId === "string" && configuredDefaultAccountId.trim() ? configuredDefaultAccountId.trim().toLowerCase() : "default"); + const { getSessionBindingService } = await loadDispatchAcpManagerRuntime(); const bindingService = getSessionBindingService(); const bindings = bindingService.listBySession(params.sessionKey); return bindings.some((binding) => { @@ -248,6 +250,7 @@ async function maybeUnbindStaleBoundConversations(params: { return; } try { + const { getSessionBindingService } = await loadDispatchAcpManagerRuntime(); const removed = await getSessionBindingService().unbind({ targetSessionKey: params.targetSessionKey, reason: ACP_STALE_BINDING_UNBIND_REASON, @@ -276,13 +279,20 @@ async function finalizeAcpTurnOutput(params: { await params.delivery.settleVisibleText(); let queuedFinal = params.delivery.hasDeliveredVisibleText() && !params.delivery.hasFailedVisibleTextDelivery(); - const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final"; + const ttsMode = resolveConfiguredTtsMode(params.cfg); const accumulatedBlockText = params.delivery.getAccumulatedBlockText(); const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0; + const ttsStatus = resolveStatusTtsSnapshot({ + cfg: params.cfg, + sessionAuto: params.sessionTtsAuto, + }); + const canAttemptFinalTts = + ttsStatus != null && !(ttsStatus.autoMode === "inbound" && !params.inboundAudio); let finalMediaDelivered = false; - if (ttsMode === "final" && hasAccumulatedBlockText) { + if (ttsMode === "final" && hasAccumulatedBlockText && canAttemptFinalTts) { try { + const { maybeApplyTtsToPayload } = await loadDispatchAcpTtsRuntime(); const ttsSyntheticReply = await maybeApplyTtsToPayload({ payload: { text: accumulatedBlockText }, cfg: params.cfg, @@ -324,6 +334,7 @@ async function finalizeAcpTurnOutput(params: { } if (params.shouldEmitResolvedIdentityNotice) { + const { readAcpSessionEntry } = await loadDispatchAcpSessionRuntime(); const currentMeta = readAcpSessionEntry({ cfg: params.cfg, sessionKey: params.sessionKey, @@ -371,6 +382,7 @@ export async function tryDispatchAcpReply(params: { return null; } + const { getAcpSessionManager } = await loadDispatchAcpManagerRuntime(); const acpManager = getAcpSessionManager(); const acpResolution = acpManager.resolveSession({ cfg: params.cfg, @@ -403,12 +415,12 @@ export async function tryDispatchAcpReply(params: { !params.suppressUserDelivery && identityPendingBeforeTurn && (Boolean(params.ctx.MessageThreadId != null && String(params.ctx.MessageThreadId).trim()) || - hasBoundConversationForSession({ + (await hasBoundConversationForSession({ cfg: params.cfg, sessionKey: canonicalSessionKey, channelRaw: params.ctx.OriginatingChannel ?? params.ctx.Surface ?? params.ctx.Provider, accountIdRaw: params.ctx.AccountId, - })); + }))); const resolvedAcpAgent = acpResolution.kind === "ready" @@ -462,8 +474,9 @@ export async function tryDispatchAcpReply(params: { if (agentPolicyError) { throw agentPolicyError; } - if (!params.ctx.MediaUnderstanding?.length) { + if (hasInboundMediaForAcp(params.ctx) && !params.ctx.MediaUnderstanding?.length) { try { + const { applyMediaUnderstanding } = await loadDispatchAcpMediaRuntime(); await applyMediaUnderstanding({ ctx: params.ctx, cfg: params.cfg, diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index d9e4709be64..2346d187237 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -72,12 +72,18 @@ const sessionBindingMocks = vi.hoisted(() => ({ >(() => null), touch: vi.fn(), })); +const pluginConversationBindingMocks = vi.hoisted(() => ({ + shownFallbackNoticeBindingIds: new Set(), +})); const sessionStoreMocks = vi.hoisted(() => ({ currentEntry: undefined as Record | undefined, loadSessionStore: vi.fn(() => ({})), resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"), resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })), })); +const acpManagerRuntimeMocks = vi.hoisted(() => ({ + getAcpSessionManager: vi.fn(), +})); const agentEventMocks = vi.hoisted(() => ({ emitAgentEvent: vi.fn(), })); @@ -206,6 +212,48 @@ vi.mock("../../infra/outbound/session-binding-service.js", () => ({ vi.mock("../../infra/agent-events.js", () => ({ emitAgentEvent: (params: unknown) => agentEventMocks.emitAgentEvent(params), })); +vi.mock("../../plugins/conversation-binding.js", () => ({ + buildPluginBindingDeclinedText: () => "Plugin binding request was declined.", + buildPluginBindingErrorText: () => "Plugin binding request failed.", + buildPluginBindingUnavailableText: (binding: { pluginName?: string; pluginId: string }) => + `${binding.pluginName ?? binding.pluginId} is not currently loaded.`, + hasShownPluginBindingFallbackNotice: (bindingId: string) => + pluginConversationBindingMocks.shownFallbackNoticeBindingIds.has(bindingId), + isPluginOwnedSessionBindingRecord: ( + record: SessionBindingRecord | null | undefined, + ): record is SessionBindingRecord => + record?.metadata != null && + typeof record.metadata === "object" && + (record.metadata as { pluginBindingOwner?: string }).pluginBindingOwner === "plugin", + markPluginBindingFallbackNoticeShown: (bindingId: string) => { + pluginConversationBindingMocks.shownFallbackNoticeBindingIds.add(bindingId); + }, + toPluginConversationBinding: (record: SessionBindingRecord) => { + const metadata = (record.metadata ?? {}) as { + pluginId?: string; + pluginName?: string; + pluginRoot?: string; + }; + return { + bindingId: record.bindingId, + pluginId: metadata.pluginId ?? "unknown-plugin", + pluginName: metadata.pluginName, + pluginRoot: metadata.pluginRoot ?? "", + channel: record.conversation.channel, + accountId: record.conversation.accountId, + conversationId: record.conversation.conversationId, + parentConversationId: record.conversation.parentConversationId, + }; + }, +})); +vi.mock("./dispatch-acp-manager.runtime.js", () => ({ + getAcpSessionManager: () => acpManagerRuntimeMocks.getAcpSessionManager(), + getSessionBindingService: () => ({ + listBySession: (targetSessionKey: string) => + sessionBindingMocks.listBySession(targetSessionKey), + unbind: vi.fn(async () => []), + }), +})); vi.mock("../../tts/tts.js", () => ({ maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value), @@ -214,6 +262,21 @@ vi.mock("../../tts/tts.js", () => ({ vi.mock("../../tts/tts.runtime.js", () => ({ maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), })); +vi.mock("../../tts/status-config.js", () => ({ + resolveStatusTtsSnapshot: () => ({ + autoMode: "always", + provider: "auto", + maxLength: 1500, + summarize: true, + }), +})); +vi.mock("./dispatch-acp-tts.runtime.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), +})); +vi.mock("./dispatch-acp-session.runtime.js", () => ({ + readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) => + acpMocks.readAcpSessionEntry(params), +})); vi.mock("../../tts/tts-config.js", () => ({ normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value), resolveConfiguredTtsMode: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg).mode, @@ -223,8 +286,6 @@ const noAbortResult = { handled: false, aborted: false } as const; const emptyConfig = {} as OpenClawConfig; let dispatchReplyFromConfig: typeof import("./dispatch-from-config.js").dispatchReplyFromConfig; let resetInboundDedupe: typeof import("./inbound-dedupe.js").resetInboundDedupe; -let acpManagerTesting: typeof import("../../acp/control-plane/manager.js").__testing; -let pluginBindingTesting: typeof import("../../plugins/conversation-binding.js").__testing; let AcpRuntimeErrorClass: typeof import("../../acp/runtime/errors.js").AcpRuntimeError; type DispatchReplyArgs = Parameters< typeof import("./dispatch-from-config.js").dispatchReplyFromConfig @@ -266,6 +327,96 @@ function createAcpRuntime(events: Array>) { }; } +function createMockAcpSessionManager() { + return { + resolveSession: (params: { cfg: OpenClawConfig; sessionKey: string }) => { + const entry = acpMocks.readAcpSessionEntry({ + cfg: params.cfg, + sessionKey: params.sessionKey, + }) as { acp?: Record } | null; + if (entry?.acp) { + return { + kind: "ready" as const, + sessionKey: params.sessionKey, + meta: entry.acp, + }; + } + return String(params.sessionKey).startsWith("agent:") + ? { + kind: "stale" as const, + sessionKey: params.sessionKey, + error: { + code: "ACP_SESSION_INIT_FAILED", + message: `ACP metadata is missing for ${params.sessionKey}.`, + }, + } + : { + kind: "none" as const, + sessionKey: params.sessionKey, + }; + }, + getObservabilitySnapshot: () => ({ + runtimeCache: { + activeSessions: 0, + idleTtlMs: 0, + evictedTotal: 0, + }, + turns: { + active: 0, + queueDepth: 0, + completed: 0, + failed: 0, + averageLatencyMs: 0, + maxLatencyMs: 0, + }, + errorsByCode: {}, + }), + runTurn: vi.fn( + async (params: { + cfg: OpenClawConfig; + sessionKey: string; + text?: string; + attachments?: unknown[]; + mode: string; + requestId: string; + signal?: AbortSignal; + onEvent: (event: Record) => Promise; + }) => { + const entry = acpMocks.readAcpSessionEntry({ + cfg: params.cfg, + sessionKey: params.sessionKey, + }) as { + acp?: { + agent?: string; + mode?: string; + }; + } | null; + const runtimeBackend = acpMocks.requireAcpRuntimeBackend() as { + runtime?: ReturnType; + }; + if (!runtimeBackend.runtime) { + throw new Error("ACP runtime backend not mocked"); + } + await runtimeBackend.runtime.ensureSession({ + sessionKey: params.sessionKey, + mode: entry?.acp?.mode || "persistent", + agent: entry?.acp?.agent || "codex", + }); + const stream = runtimeBackend.runtime.runTurn({ + text: params.text, + attachments: params.attachments, + mode: params.mode, + requestId: params.requestId, + signal: params.signal, + }); + for await (const event of stream) { + await params.onEvent(event); + } + }, + ), + }; +} + function firstToolResultPayload(dispatcher: ReplyDispatcher): ReplyPayload | undefined { return (dispatcher.sendToolResult as ReturnType).mock.calls[0]?.[0] as | ReplyPayload @@ -286,9 +437,11 @@ async function dispatchTwiceWithFreshDispatchers(params: Omit { beforeAll(async () => { ({ dispatchReplyFromConfig } = await import("./dispatch-from-config.js")); + await import("./dispatch-acp.js"); + await import("./dispatch-acp-command-bypass.js"); + await import("./dispatch-acp-tts.runtime.js"); + await import("./dispatch-acp-session.runtime.js"); ({ resetInboundDedupe } = await import("./inbound-dedupe.js")); - ({ __testing: acpManagerTesting } = await import("../../acp/control-plane/manager.js")); - ({ __testing: pluginBindingTesting } = await import("../../plugins/conversation-binding.js")); ({ AcpRuntimeError: AcpRuntimeErrorClass } = await import("../../acp/runtime/errors.js")); }); @@ -321,7 +474,8 @@ describe("dispatchReplyFromConfig", () => { }, ]), ); - acpManagerTesting.resetAcpSessionManagerForTests(); + acpManagerRuntimeMocks.getAcpSessionManager.mockReset(); + acpManagerRuntimeMocks.getAcpSessionManager.mockReturnValue(createMockAcpSessionManager()); resetInboundDedupe(); mocks.routeReply.mockReset(); mocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" }); @@ -354,7 +508,7 @@ describe("dispatchReplyFromConfig", () => { agentEventMocks.emitAgentEvent.mockReset(); sessionBindingMocks.listBySession.mockReset(); sessionBindingMocks.listBySession.mockReturnValue([]); - pluginBindingTesting.reset(); + pluginConversationBindingMocks.shownFallbackNoticeBindingIds.clear(); sessionBindingMocks.resolveByConversation.mockReset(); sessionBindingMocks.resolveByConversation.mockReturnValue(null); sessionBindingMocks.touch.mockReset(); @@ -1065,7 +1219,7 @@ describe("dispatchReplyFromConfig", () => { { type: "text_delta", text: "world" }, { type: "done" }, ]); - acpMocks.readAcpSessionEntry.mockReturnValue({ + let currentAcpEntry = { sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, @@ -1079,6 +1233,28 @@ describe("dispatchReplyFromConfig", () => { state: "idle", lastActivityAt: Date.now(), }, + }; + acpMocks.readAcpSessionEntry.mockImplementation(() => currentAcpEntry); + acpMocks.upsertAcpSessionMeta.mockImplementation(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { + mutate: ( + current: Record | undefined, + entry: { acp?: Record } | undefined, + ) => Record | null | undefined; + }; + const nextMeta = params.mutate(currentAcpEntry.acp as Record, { + acp: currentAcpEntry.acp as Record, + }); + if (nextMeta === null) { + return null; + } + if (nextMeta) { + currentAcpEntry = { + ...currentAcpEntry, + acp: nextMeta as typeof currentAcpEntry.acp, + }; + } + return currentAcpEntry; }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 3cf334ec903..46b482b2b6f 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -7,7 +7,7 @@ import { } from "../../bindings/records.js"; import { shouldSuppressLocalExecApprovalPrompt } from "../../channels/plugins/exec-approval-local.js"; import type { OpenClawConfig } from "../../config/config.js"; -import { parseSessionThreadInfo } from "../../config/sessions/delivery-info.js"; +import { parseSessionThreadInfo } from "../../config/sessions/thread-info.js"; import type { SessionEntry } from "../../config/sessions/types.js"; import { logVerbose } from "../../globals.js"; import { fireAndForgetHook } from "../../hooks/fire-and-forget.js"; @@ -496,7 +496,10 @@ export async function dispatchReplyFromConfig(params: { } const dispatchAcpRuntime = await loadDispatchAcpRuntime(); - const bypassAcpForCommand = dispatchAcpRuntime.shouldBypassAcpDispatchForCommand(ctx, cfg); + const bypassAcpForCommand = await dispatchAcpRuntime.shouldBypassAcpDispatchForCommand( + ctx, + cfg, + ); const sendPolicy = resolveSendPolicy({ cfg, diff --git a/src/config/sessions/delivery-info.ts b/src/config/sessions/delivery-info.ts index 43ea169599c..aa2ff768615 100644 --- a/src/config/sessions/delivery-info.ts +++ b/src/config/sessions/delivery-info.ts @@ -1,18 +1,8 @@ -import { resolveSessionThreadInfo } from "../../channels/plugins/session-conversation.js"; import { loadConfig } from "../io.js"; import { resolveStorePath } from "./paths.js"; import { loadSessionStore } from "./store.js"; - -/** - * Extract deliveryContext and threadId from a sessionKey. - * Supports generic :thread: suffixes plus plugin-owned thread/session grammars. - */ -export function parseSessionThreadInfo(sessionKey: string | undefined): { - baseSessionKey: string | undefined; - threadId: string | undefined; -} { - return resolveSessionThreadInfo(sessionKey); -} +export { parseSessionThreadInfo } from "./thread-info.js"; +import { parseSessionThreadInfo } from "./thread-info.js"; export function extractDeliveryInfo(sessionKey: string | undefined): { deliveryContext: diff --git a/src/config/sessions/thread-info.ts b/src/config/sessions/thread-info.ts new file mode 100644 index 00000000000..59b2a607baa --- /dev/null +++ b/src/config/sessions/thread-info.ts @@ -0,0 +1,12 @@ +import { resolveSessionThreadInfo } from "../../channels/plugins/session-conversation.js"; + +/** + * Extract deliveryContext and threadId from a sessionKey. + * Supports generic :thread: suffixes plus plugin-owned thread/session grammars. + */ +export function parseSessionThreadInfo(sessionKey: string | undefined): { + baseSessionKey: string | undefined; + threadId: string | undefined; +} { + return resolveSessionThreadInfo(sessionKey); +} diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index b9d70901962..c478c850bc8 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -2,7 +2,6 @@ import fs from "node:fs"; import path from "node:path"; import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent"; import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; -import { parseSessionThreadInfo } from "./delivery-info.js"; import { resolveDefaultSessionStorePath, resolveSessionFilePath, @@ -11,6 +10,7 @@ import { } from "./paths.js"; import { resolveAndPersistSessionFile } from "./session-file.js"; import { loadSessionStore, normalizeStoreSessionKey } from "./store.js"; +import { parseSessionThreadInfo } from "./thread-info.js"; import { resolveMirroredTranscriptText } from "./transcript-mirror.js"; import type { SessionEntry } from "./types.js"; diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index 4e51e9875cf..72f2c881cf1 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -2,7 +2,7 @@ import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-help import { getChannelPlugin, normalizeChannelId } from "../channels/plugins/index.js"; import type { CliDeps } from "../cli/deps.js"; import { resolveMainSessionKeyFromConfig } from "../config/sessions.js"; -import { parseSessionThreadInfo } from "../config/sessions/delivery-info.js"; +import { parseSessionThreadInfo } from "../config/sessions/thread-info.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { ackDelivery, enqueueDelivery, failDelivery } from "../infra/outbound/delivery-queue.js";