perf(auto-reply): defer ACP runtime imports

This commit is contained in:
Peter Steinberger
2026-04-05 15:27:22 +01:00
parent 7b05253bed
commit 934641df86
15 changed files with 490 additions and 100 deletions

View File

@@ -0,0 +1,57 @@
import type { OpenClawConfig } from "../../config/config.js";
import {
isCommandEnabled,
maybeResolveTextAlias,
shouldHandleTextCommands,
} from "../commands-registry.js";
import type { FinalizedMsgContext } from "../templating.js";
function resolveFirstContextText(
ctx: FinalizedMsgContext,
keys: Array<"BodyForAgent" | "BodyForCommands" | "CommandBody" | "RawBody" | "Body">,
): string {
for (const key of keys) {
const value = ctx[key];
if (typeof value === "string") {
return value;
}
}
return "";
}
function resolveCommandCandidateText(ctx: FinalizedMsgContext): string {
return resolveFirstContextText(ctx, ["CommandBody", "BodyForCommands", "RawBody", "Body"]).trim();
}
export function shouldBypassAcpDispatchForCommand(
ctx: FinalizedMsgContext,
cfg: OpenClawConfig,
): boolean {
const candidate = resolveCommandCandidateText(ctx);
if (!candidate) {
return false;
}
const allowTextCommands = shouldHandleTextCommands({
cfg,
surface: ctx.Surface ?? ctx.Provider ?? "",
commandSource: ctx.CommandSource,
});
if (maybeResolveTextAlias(candidate, cfg) != null) {
return allowTextCommands;
}
const normalized = candidate.trim();
if (!normalized.startsWith("!")) {
return false;
}
if (!ctx.CommandAuthorized) {
return false;
}
if (!isCommandEnabled(cfg, "bash")) {
return false;
}
return allowTextCommands;
}

View File

@@ -1,14 +1,41 @@
import { hasOutboundReplyContent } from "openclaw/plugin-sdk/reply-payload";
import { getChannelPlugin } from "../../channels/plugins/index.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { TtsAutoMode } from "../../config/types.tts.js";
import { logVerbose } from "../../globals.js";
import { runMessageAction } from "../../infra/outbound/message-action-runner.js";
import { maybeApplyTtsToPayload } from "../../tts/tts.js";
import { resolveStatusTtsSnapshot } from "../../tts/status-config.js";
import { resolveConfiguredTtsMode } from "../../tts/tts-config.js";
import type { FinalizedMsgContext } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
import { routeReply } from "./route-reply.js";
let routeReplyRuntimePromise: Promise<typeof import("./route-reply.runtime.js")> | null = null;
let dispatchAcpTtsRuntimePromise: Promise<typeof import("./dispatch-acp-tts.runtime.js")> | null =
null;
let channelPluginRuntimePromise: Promise<typeof import("../../channels/plugins/index.js")> | null =
null;
let messageActionRuntimePromise: Promise<
typeof import("../../infra/outbound/message-action-runner.js")
> | null = null;
function loadRouteReplyRuntime() {
routeReplyRuntimePromise ??= import("./route-reply.runtime.js");
return routeReplyRuntimePromise;
}
function loadDispatchAcpTtsRuntime() {
dispatchAcpTtsRuntimePromise ??= import("./dispatch-acp-tts.runtime.js");
return dispatchAcpTtsRuntimePromise;
}
function loadChannelPluginRuntime() {
channelPluginRuntimePromise ??= import("../../channels/plugins/index.js");
return channelPluginRuntimePromise;
}
function loadMessageActionRuntime() {
messageActionRuntimePromise ??= import("../../infra/outbound/message-action-runner.js");
return messageActionRuntimePromise;
}
export type AcpDispatchDeliveryMeta = {
toolCallId?: string;
@@ -51,11 +78,11 @@ function resolveDeliveryAccountId(params: {
: undefined;
}
function shouldTreatDeliveredTextAsVisible(params: {
async function shouldTreatDeliveredTextAsVisible(params: {
channel: string | undefined;
kind: ReplyDispatchKind;
text: string | undefined;
}): boolean {
}): Promise<boolean> {
if (!params.text?.trim()) {
return false;
}
@@ -66,6 +93,13 @@ function shouldTreatDeliveredTextAsVisible(params: {
if (!channelId) {
return false;
}
// Only Telegram currently overrides block/tool visibility via channel runtime.
// Keep other channels on the fast path so ACP local delivery does not pay the
// broader channel-registry import cost on every streamed turn.
if (channelId !== "telegram") {
return false;
}
const { getChannelPlugin } = await loadChannelPluginRuntime();
return (
getChannelPlugin(channelId)?.outbound?.shouldTreatRoutedTextAsVisible?.({
kind: params.kind,
@@ -74,6 +108,42 @@ function shouldTreatDeliveredTextAsVisible(params: {
);
}
async function maybeApplyAcpTts(params: {
payload: ReplyPayload;
cfg: OpenClawConfig;
channel?: string;
kind: ReplyDispatchKind;
inboundAudio: boolean;
ttsAuto?: TtsAutoMode;
skipTts?: boolean;
}): Promise<ReplyPayload> {
if (params.skipTts) {
return params.payload;
}
const ttsStatus = resolveStatusTtsSnapshot({
cfg: params.cfg,
sessionAuto: params.ttsAuto,
});
if (!ttsStatus) {
return params.payload;
}
if (ttsStatus.autoMode === "inbound" && !params.inboundAudio) {
return params.payload;
}
if (params.kind !== "final" && resolveConfiguredTtsMode(params.cfg) === "final") {
return params.payload;
}
const { maybeApplyTtsToPayload } = await loadDispatchAcpTtsRuntime();
return await maybeApplyTtsToPayload({
payload: params.payload,
cfg: params.cfg,
channel: params.channel,
kind: params.kind,
inboundAudio: params.inboundAudio,
ttsAuto: params.ttsAuto,
});
}
type AcpDispatchDeliveryState = {
startedReplyLifecycle: boolean;
accumulatedBlockText: string;
@@ -182,6 +252,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
}
try {
const { runMessageAction } = await loadMessageActionRuntime();
await runMessageAction({
cfg: params.cfg,
action: "edit",
@@ -226,16 +297,15 @@ export function createAcpDispatchDeliveryCoordinator(params: {
return false;
}
const ttsPayload = meta?.skipTts
? payload
: await maybeApplyTtsToPayload({
payload,
cfg: params.cfg,
channel: params.ttsChannel,
kind,
inboundAudio: params.inboundAudio,
ttsAuto: params.sessionTtsAuto,
});
const ttsPayload = await maybeApplyAcpTts({
payload,
cfg: params.cfg,
channel: params.ttsChannel,
kind,
inboundAudio: params.inboundAudio,
ttsAuto: params.sessionTtsAuto,
skipTts: meta?.skipTts,
});
if (params.shouldRouteToOriginating && params.originatingChannel && params.originatingTo) {
const toolCallId = meta?.toolCallId?.trim();
@@ -246,11 +316,12 @@ export function createAcpDispatchDeliveryCoordinator(params: {
}
}
const tracksVisibleText = shouldTreatDeliveredTextAsVisible({
const tracksVisibleText = await shouldTreatDeliveredTextAsVisible({
channel: routedChannel,
kind,
text: ttsPayload.text,
});
const { routeReply } = await loadRouteReplyRuntime();
const result = await routeReply({
payload: ttsPayload,
channel: params.originatingChannel,
@@ -288,7 +359,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
return true;
}
const tracksVisibleText = shouldTreatDeliveredTextAsVisible({
const tracksVisibleText = await shouldTreatDeliveredTextAsVisible({
channel: directChannel,
kind,
text: ttsPayload.text,

View File

@@ -0,0 +1,2 @@
export { getAcpSessionManager } from "../../acp/control-plane/manager.js";
export { getSessionBindingService } from "../../infra/outbound/session-binding-service.js";

View File

@@ -0,0 +1,5 @@
export { applyMediaUnderstanding } from "../../media-understanding/apply.js";
export { MediaAttachmentCache } from "../../media-understanding/attachments.js";
export { normalizeAttachments } from "../../media-understanding/attachments.normalize.js";
export { isMediaUnderstandingSkipError } from "../../media-understanding/errors.js";
export { resolveMediaAttachmentLocalRoots } from "../../media-understanding/runner.js";

View File

@@ -0,0 +1 @@
export { readAcpSessionEntry } from "../../acp/runtime/session-meta.js";

View File

@@ -0,0 +1 @@
export { maybeApplyTtsToPayload } from "../../tts/tts.runtime.js";

View File

@@ -1 +1,31 @@
export { shouldBypassAcpDispatchForCommand, tryDispatchAcpReply } from "./dispatch-acp.js";
import type { shouldBypassAcpDispatchForCommand as ShouldBypassAcpDispatchForCommand } from "./dispatch-acp-command-bypass.js";
import type { tryDispatchAcpReply as TryDispatchAcpReply } from "./dispatch-acp.js";
let dispatchAcpPromise: Promise<typeof import("./dispatch-acp.js")> | null = null;
let dispatchAcpCommandBypassPromise: Promise<
typeof import("./dispatch-acp-command-bypass.js")
> | null = null;
function loadDispatchAcp() {
dispatchAcpPromise ??= import("./dispatch-acp.js");
return dispatchAcpPromise;
}
function loadDispatchAcpCommandBypass() {
dispatchAcpCommandBypassPromise ??= import("./dispatch-acp-command-bypass.js");
return dispatchAcpCommandBypassPromise;
}
export async function shouldBypassAcpDispatchForCommand(
...args: Parameters<ShouldBypassAcpDispatchForCommand>
): Promise<ReturnType<ShouldBypassAcpDispatchForCommand>> {
const mod = await loadDispatchAcpCommandBypass();
return mod.shouldBypassAcpDispatchForCommand(...args);
}
export async function tryDispatchAcpReply(
...args: Parameters<TryDispatchAcpReply>
): ReturnType<TryDispatchAcpReply> {
const mod = await loadDispatchAcp();
return await mod.tryDispatchAcpReply(...args);
}

View File

@@ -241,6 +241,20 @@ describe("tryDispatchAcpReply", () => {
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg),
}));
vi.doMock("../../tts/tts.runtime.js", () => ({
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
}));
vi.doMock("../../tts/status-config.js", () => ({
resolveStatusTtsSnapshot: () => ({
autoMode: "always",
provider: "auto",
maxLength: 1500,
summarize: true,
}),
}));
vi.doMock("./dispatch-acp-tts.runtime.js", () => ({
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
}));
vi.doMock("../../media-understanding/apply.js", () => ({
applyMediaUnderstanding: (params: unknown) =>
mediaUnderstandingMocks.applyMediaUnderstanding(params),
@@ -249,6 +263,10 @@ describe("tryDispatchAcpReply", () => {
readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) =>
sessionMetaMocks.readAcpSessionEntry(params),
}));
vi.doMock("./dispatch-acp-session.runtime.js", () => ({
readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) =>
sessionMetaMocks.readAcpSessionEntry(params),
}));
vi.doMock("../../infra/outbound/session-binding-service.js", () => ({
getSessionBindingService: () => ({
listBySession: (targetSessionKey: string) =>
@@ -405,6 +423,17 @@ describe("tryDispatchAcpReply", () => {
expect(onReplyStart).not.toHaveBeenCalled();
});
it("skips media understanding for text-only ACP turns", async () => {
setReadyAcpResolution();
mockVisibleTextTurn("text only");
await runDispatch({
bodyForAgent: "plain text prompt",
});
expect(mediaUnderstandingMocks.applyMediaUnderstanding).not.toHaveBeenCalled();
});
it("forwards normalized image attachments into ACP turns", async () => {
setReadyAcpResolution();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));

View File

@@ -1,4 +1,3 @@
import { getAcpSessionManager } from "../../acp/control-plane/manager.js";
import type { AcpTurnAttachment } from "../../acp/control-plane/manager.types.js";
import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../../acp/policy.js";
import { formatAcpRuntimeErrorText } from "../../acp/runtime/error-text.js";
@@ -8,26 +7,15 @@ import {
isSessionIdentityPending,
resolveSessionIdentityFromMeta,
} from "../../acp/runtime/session-identity.js";
import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { TtsAutoMode } from "../../config/types.tts.js";
import { logVerbose } from "../../globals.js";
import { emitAgentEvent } from "../../infra/agent-events.js";
import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js";
import { generateSecureUuid } from "../../infra/secure-random.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
import { applyMediaUnderstanding } from "../../media-understanding/apply.js";
import { MediaAttachmentCache } from "../../media-understanding/attachments.js";
import { normalizeAttachments } from "../../media-understanding/attachments.normalize.js";
import { isMediaUnderstandingSkipError } from "../../media-understanding/errors.js";
import { resolveMediaAttachmentLocalRoots } from "../../media-understanding/runner.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { maybeApplyTtsToPayload, resolveTtsConfig } from "../../tts/tts.js";
import {
isCommandEnabled,
maybeResolveTextAlias,
shouldHandleTextCommands,
} from "../commands-registry.js";
import { resolveStatusTtsSnapshot } from "../../tts/status-config.js";
import { resolveConfiguredTtsMode } from "../../tts/tts-config.js";
import type { FinalizedMsgContext } from "../templating.js";
import { createAcpReplyProjector } from "./acp-projector.js";
import {
@@ -36,6 +24,38 @@ import {
} from "./dispatch-acp-delivery.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
let dispatchAcpMediaRuntimePromise: Promise<
typeof import("./dispatch-acp-media.runtime.js")
> | null = null;
let dispatchAcpManagerRuntimePromise: Promise<
typeof import("./dispatch-acp-manager.runtime.js")
> | null = null;
let dispatchAcpSessionRuntimePromise: Promise<
typeof import("./dispatch-acp-session.runtime.js")
> | null = null;
let dispatchAcpTtsRuntimePromise: Promise<typeof import("./dispatch-acp-tts.runtime.js")> | null =
null;
function loadDispatchAcpMediaRuntime() {
dispatchAcpMediaRuntimePromise ??= import("./dispatch-acp-media.runtime.js");
return dispatchAcpMediaRuntimePromise;
}
function loadDispatchAcpManagerRuntime() {
dispatchAcpManagerRuntimePromise ??= import("./dispatch-acp-manager.runtime.js");
return dispatchAcpManagerRuntimePromise;
}
function loadDispatchAcpSessionRuntime() {
dispatchAcpSessionRuntimePromise ??= import("./dispatch-acp-session.runtime.js");
return dispatchAcpSessionRuntimePromise;
}
function loadDispatchAcpTtsRuntime() {
dispatchAcpTtsRuntimePromise ??= import("./dispatch-acp-tts.runtime.js");
return dispatchAcpTtsRuntimePromise;
}
type DispatchProcessedRecorder = (
outcome: "completed" | "skipped" | "error",
opts?: {
@@ -67,6 +87,18 @@ function resolveAcpPromptText(ctx: FinalizedMsgContext): string {
]).trim();
}
function hasInboundMediaForAcp(ctx: FinalizedMsgContext): boolean {
return Boolean(
ctx.StickerMediaIncluded ||
ctx.Sticker ||
ctx.MediaPath?.trim() ||
ctx.MediaUrl?.trim() ||
ctx.MediaPaths?.some((value) => value?.trim()) ||
ctx.MediaUrls?.some((value) => value?.trim()) ||
ctx.MediaTypes?.length,
);
}
const ACP_ATTACHMENT_MAX_BYTES = 10 * 1024 * 1024;
const ACP_ATTACHMENT_TIMEOUT_MS = 1_000;
@@ -74,6 +106,12 @@ async function resolveAcpAttachments(
ctx: FinalizedMsgContext,
cfg: OpenClawConfig,
): Promise<AcpTurnAttachment[]> {
const {
MediaAttachmentCache,
isMediaUnderstandingSkipError,
normalizeAttachments,
resolveMediaAttachmentLocalRoots,
} = await loadDispatchAcpMediaRuntime();
const mediaAttachments = normalizeAttachments(ctx).map((attachment) =>
attachment.path?.trim() ? { ...attachment, url: undefined } : attachment,
);
@@ -114,43 +152,6 @@ async function resolveAcpAttachments(
return results;
}
function resolveCommandCandidateText(ctx: FinalizedMsgContext): string {
return resolveFirstContextText(ctx, ["CommandBody", "BodyForCommands", "RawBody", "Body"]).trim();
}
export function shouldBypassAcpDispatchForCommand(
ctx: FinalizedMsgContext,
cfg: OpenClawConfig,
): boolean {
const candidate = resolveCommandCandidateText(ctx);
if (!candidate) {
return false;
}
const allowTextCommands = shouldHandleTextCommands({
cfg,
surface: ctx.Surface ?? ctx.Provider ?? "",
commandSource: ctx.CommandSource,
});
if (maybeResolveTextAlias(candidate, cfg) != null) {
return allowTextCommands;
}
const normalized = candidate.trim();
if (!normalized.startsWith("!")) {
return false;
}
if (!ctx.CommandAuthorized) {
return false;
}
if (!isCommandEnabled(cfg, "bash")) {
return false;
}
return allowTextCommands;
}
function resolveAcpRequestId(ctx: FinalizedMsgContext): string {
const id = ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
if (typeof id === "string" && id.trim()) {
@@ -162,12 +163,12 @@ function resolveAcpRequestId(ctx: FinalizedMsgContext): string {
return generateSecureUuid();
}
function hasBoundConversationForSession(params: {
async function hasBoundConversationForSession(params: {
cfg: OpenClawConfig;
sessionKey: string;
channelRaw: string | undefined;
accountIdRaw: string | undefined;
}): boolean {
}): Promise<boolean> {
const channel = String(params.channelRaw ?? "")
.trim()
.toLowerCase();
@@ -184,6 +185,7 @@ function hasBoundConversationForSession(params: {
(typeof configuredDefaultAccountId === "string" && configuredDefaultAccountId.trim()
? configuredDefaultAccountId.trim().toLowerCase()
: "default");
const { getSessionBindingService } = await loadDispatchAcpManagerRuntime();
const bindingService = getSessionBindingService();
const bindings = bindingService.listBySession(params.sessionKey);
return bindings.some((binding) => {
@@ -248,6 +250,7 @@ async function maybeUnbindStaleBoundConversations(params: {
return;
}
try {
const { getSessionBindingService } = await loadDispatchAcpManagerRuntime();
const removed = await getSessionBindingService().unbind({
targetSessionKey: params.targetSessionKey,
reason: ACP_STALE_BINDING_UNBIND_REASON,
@@ -276,13 +279,20 @@ async function finalizeAcpTurnOutput(params: {
await params.delivery.settleVisibleText();
let queuedFinal =
params.delivery.hasDeliveredVisibleText() && !params.delivery.hasFailedVisibleTextDelivery();
const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final";
const ttsMode = resolveConfiguredTtsMode(params.cfg);
const accumulatedBlockText = params.delivery.getAccumulatedBlockText();
const hasAccumulatedBlockText = accumulatedBlockText.trim().length > 0;
const ttsStatus = resolveStatusTtsSnapshot({
cfg: params.cfg,
sessionAuto: params.sessionTtsAuto,
});
const canAttemptFinalTts =
ttsStatus != null && !(ttsStatus.autoMode === "inbound" && !params.inboundAudio);
let finalMediaDelivered = false;
if (ttsMode === "final" && hasAccumulatedBlockText) {
if (ttsMode === "final" && hasAccumulatedBlockText && canAttemptFinalTts) {
try {
const { maybeApplyTtsToPayload } = await loadDispatchAcpTtsRuntime();
const ttsSyntheticReply = await maybeApplyTtsToPayload({
payload: { text: accumulatedBlockText },
cfg: params.cfg,
@@ -324,6 +334,7 @@ async function finalizeAcpTurnOutput(params: {
}
if (params.shouldEmitResolvedIdentityNotice) {
const { readAcpSessionEntry } = await loadDispatchAcpSessionRuntime();
const currentMeta = readAcpSessionEntry({
cfg: params.cfg,
sessionKey: params.sessionKey,
@@ -371,6 +382,7 @@ export async function tryDispatchAcpReply(params: {
return null;
}
const { getAcpSessionManager } = await loadDispatchAcpManagerRuntime();
const acpManager = getAcpSessionManager();
const acpResolution = acpManager.resolveSession({
cfg: params.cfg,
@@ -403,12 +415,12 @@ export async function tryDispatchAcpReply(params: {
!params.suppressUserDelivery &&
identityPendingBeforeTurn &&
(Boolean(params.ctx.MessageThreadId != null && String(params.ctx.MessageThreadId).trim()) ||
hasBoundConversationForSession({
(await hasBoundConversationForSession({
cfg: params.cfg,
sessionKey: canonicalSessionKey,
channelRaw: params.ctx.OriginatingChannel ?? params.ctx.Surface ?? params.ctx.Provider,
accountIdRaw: params.ctx.AccountId,
}));
})));
const resolvedAcpAgent =
acpResolution.kind === "ready"
@@ -462,8 +474,9 @@ export async function tryDispatchAcpReply(params: {
if (agentPolicyError) {
throw agentPolicyError;
}
if (!params.ctx.MediaUnderstanding?.length) {
if (hasInboundMediaForAcp(params.ctx) && !params.ctx.MediaUnderstanding?.length) {
try {
const { applyMediaUnderstanding } = await loadDispatchAcpMediaRuntime();
await applyMediaUnderstanding({
ctx: params.ctx,
cfg: params.cfg,

View File

@@ -72,12 +72,18 @@ const sessionBindingMocks = vi.hoisted(() => ({
>(() => null),
touch: vi.fn(),
}));
const pluginConversationBindingMocks = vi.hoisted(() => ({
shownFallbackNoticeBindingIds: new Set<string>(),
}));
const sessionStoreMocks = vi.hoisted(() => ({
currentEntry: undefined as Record<string, unknown> | undefined,
loadSessionStore: vi.fn(() => ({})),
resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"),
resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })),
}));
const acpManagerRuntimeMocks = vi.hoisted(() => ({
getAcpSessionManager: vi.fn(),
}));
const agentEventMocks = vi.hoisted(() => ({
emitAgentEvent: vi.fn(),
}));
@@ -206,6 +212,48 @@ vi.mock("../../infra/outbound/session-binding-service.js", () => ({
vi.mock("../../infra/agent-events.js", () => ({
emitAgentEvent: (params: unknown) => agentEventMocks.emitAgentEvent(params),
}));
vi.mock("../../plugins/conversation-binding.js", () => ({
buildPluginBindingDeclinedText: () => "Plugin binding request was declined.",
buildPluginBindingErrorText: () => "Plugin binding request failed.",
buildPluginBindingUnavailableText: (binding: { pluginName?: string; pluginId: string }) =>
`${binding.pluginName ?? binding.pluginId} is not currently loaded.`,
hasShownPluginBindingFallbackNotice: (bindingId: string) =>
pluginConversationBindingMocks.shownFallbackNoticeBindingIds.has(bindingId),
isPluginOwnedSessionBindingRecord: (
record: SessionBindingRecord | null | undefined,
): record is SessionBindingRecord =>
record?.metadata != null &&
typeof record.metadata === "object" &&
(record.metadata as { pluginBindingOwner?: string }).pluginBindingOwner === "plugin",
markPluginBindingFallbackNoticeShown: (bindingId: string) => {
pluginConversationBindingMocks.shownFallbackNoticeBindingIds.add(bindingId);
},
toPluginConversationBinding: (record: SessionBindingRecord) => {
const metadata = (record.metadata ?? {}) as {
pluginId?: string;
pluginName?: string;
pluginRoot?: string;
};
return {
bindingId: record.bindingId,
pluginId: metadata.pluginId ?? "unknown-plugin",
pluginName: metadata.pluginName,
pluginRoot: metadata.pluginRoot ?? "",
channel: record.conversation.channel,
accountId: record.conversation.accountId,
conversationId: record.conversation.conversationId,
parentConversationId: record.conversation.parentConversationId,
};
},
}));
vi.mock("./dispatch-acp-manager.runtime.js", () => ({
getAcpSessionManager: () => acpManagerRuntimeMocks.getAcpSessionManager(),
getSessionBindingService: () => ({
listBySession: (targetSessionKey: string) =>
sessionBindingMocks.listBySession(targetSessionKey),
unbind: vi.fn(async () => []),
}),
}));
vi.mock("../../tts/tts.js", () => ({
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value),
@@ -214,6 +262,21 @@ vi.mock("../../tts/tts.js", () => ({
vi.mock("../../tts/tts.runtime.js", () => ({
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
}));
vi.mock("../../tts/status-config.js", () => ({
resolveStatusTtsSnapshot: () => ({
autoMode: "always",
provider: "auto",
maxLength: 1500,
summarize: true,
}),
}));
vi.mock("./dispatch-acp-tts.runtime.js", () => ({
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
}));
vi.mock("./dispatch-acp-session.runtime.js", () => ({
readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) =>
acpMocks.readAcpSessionEntry(params),
}));
vi.mock("../../tts/tts-config.js", () => ({
normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value),
resolveConfiguredTtsMode: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg).mode,
@@ -223,8 +286,6 @@ const noAbortResult = { handled: false, aborted: false } as const;
const emptyConfig = {} as OpenClawConfig;
let dispatchReplyFromConfig: typeof import("./dispatch-from-config.js").dispatchReplyFromConfig;
let resetInboundDedupe: typeof import("./inbound-dedupe.js").resetInboundDedupe;
let acpManagerTesting: typeof import("../../acp/control-plane/manager.js").__testing;
let pluginBindingTesting: typeof import("../../plugins/conversation-binding.js").__testing;
let AcpRuntimeErrorClass: typeof import("../../acp/runtime/errors.js").AcpRuntimeError;
type DispatchReplyArgs = Parameters<
typeof import("./dispatch-from-config.js").dispatchReplyFromConfig
@@ -266,6 +327,96 @@ function createAcpRuntime(events: Array<Record<string, unknown>>) {
};
}
function createMockAcpSessionManager() {
return {
resolveSession: (params: { cfg: OpenClawConfig; sessionKey: string }) => {
const entry = acpMocks.readAcpSessionEntry({
cfg: params.cfg,
sessionKey: params.sessionKey,
}) as { acp?: Record<string, unknown> } | null;
if (entry?.acp) {
return {
kind: "ready" as const,
sessionKey: params.sessionKey,
meta: entry.acp,
};
}
return String(params.sessionKey).startsWith("agent:")
? {
kind: "stale" as const,
sessionKey: params.sessionKey,
error: {
code: "ACP_SESSION_INIT_FAILED",
message: `ACP metadata is missing for ${params.sessionKey}.`,
},
}
: {
kind: "none" as const,
sessionKey: params.sessionKey,
};
},
getObservabilitySnapshot: () => ({
runtimeCache: {
activeSessions: 0,
idleTtlMs: 0,
evictedTotal: 0,
},
turns: {
active: 0,
queueDepth: 0,
completed: 0,
failed: 0,
averageLatencyMs: 0,
maxLatencyMs: 0,
},
errorsByCode: {},
}),
runTurn: vi.fn(
async (params: {
cfg: OpenClawConfig;
sessionKey: string;
text?: string;
attachments?: unknown[];
mode: string;
requestId: string;
signal?: AbortSignal;
onEvent: (event: Record<string, unknown>) => Promise<void>;
}) => {
const entry = acpMocks.readAcpSessionEntry({
cfg: params.cfg,
sessionKey: params.sessionKey,
}) as {
acp?: {
agent?: string;
mode?: string;
};
} | null;
const runtimeBackend = acpMocks.requireAcpRuntimeBackend() as {
runtime?: ReturnType<typeof createAcpRuntime>;
};
if (!runtimeBackend.runtime) {
throw new Error("ACP runtime backend not mocked");
}
await runtimeBackend.runtime.ensureSession({
sessionKey: params.sessionKey,
mode: entry?.acp?.mode || "persistent",
agent: entry?.acp?.agent || "codex",
});
const stream = runtimeBackend.runtime.runTurn({
text: params.text,
attachments: params.attachments,
mode: params.mode,
requestId: params.requestId,
signal: params.signal,
});
for await (const event of stream) {
await params.onEvent(event);
}
},
),
};
}
function firstToolResultPayload(dispatcher: ReplyDispatcher): ReplyPayload | undefined {
return (dispatcher.sendToolResult as ReturnType<typeof vi.fn>).mock.calls[0]?.[0] as
| ReplyPayload
@@ -286,9 +437,11 @@ async function dispatchTwiceWithFreshDispatchers(params: Omit<DispatchReplyArgs,
describe("dispatchReplyFromConfig", () => {
beforeAll(async () => {
({ dispatchReplyFromConfig } = await import("./dispatch-from-config.js"));
await import("./dispatch-acp.js");
await import("./dispatch-acp-command-bypass.js");
await import("./dispatch-acp-tts.runtime.js");
await import("./dispatch-acp-session.runtime.js");
({ resetInboundDedupe } = await import("./inbound-dedupe.js"));
({ __testing: acpManagerTesting } = await import("../../acp/control-plane/manager.js"));
({ __testing: pluginBindingTesting } = await import("../../plugins/conversation-binding.js"));
({ AcpRuntimeError: AcpRuntimeErrorClass } = await import("../../acp/runtime/errors.js"));
});
@@ -321,7 +474,8 @@ describe("dispatchReplyFromConfig", () => {
},
]),
);
acpManagerTesting.resetAcpSessionManagerForTests();
acpManagerRuntimeMocks.getAcpSessionManager.mockReset();
acpManagerRuntimeMocks.getAcpSessionManager.mockReturnValue(createMockAcpSessionManager());
resetInboundDedupe();
mocks.routeReply.mockReset();
mocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" });
@@ -354,7 +508,7 @@ describe("dispatchReplyFromConfig", () => {
agentEventMocks.emitAgentEvent.mockReset();
sessionBindingMocks.listBySession.mockReset();
sessionBindingMocks.listBySession.mockReturnValue([]);
pluginBindingTesting.reset();
pluginConversationBindingMocks.shownFallbackNoticeBindingIds.clear();
sessionBindingMocks.resolveByConversation.mockReset();
sessionBindingMocks.resolveByConversation.mockReturnValue(null);
sessionBindingMocks.touch.mockReset();
@@ -1065,7 +1219,7 @@ describe("dispatchReplyFromConfig", () => {
{ type: "text_delta", text: "world" },
{ type: "done" },
]);
acpMocks.readAcpSessionEntry.mockReturnValue({
let currentAcpEntry = {
sessionKey: "agent:codex-acp:session-1",
storeSessionKey: "agent:codex-acp:session-1",
cfg: {},
@@ -1079,6 +1233,28 @@ describe("dispatchReplyFromConfig", () => {
state: "idle",
lastActivityAt: Date.now(),
},
};
acpMocks.readAcpSessionEntry.mockImplementation(() => currentAcpEntry);
acpMocks.upsertAcpSessionMeta.mockImplementation(async (paramsUnknown: unknown) => {
const params = paramsUnknown as {
mutate: (
current: Record<string, unknown> | undefined,
entry: { acp?: Record<string, unknown> } | undefined,
) => Record<string, unknown> | null | undefined;
};
const nextMeta = params.mutate(currentAcpEntry.acp as Record<string, unknown>, {
acp: currentAcpEntry.acp as Record<string, unknown>,
});
if (nextMeta === null) {
return null;
}
if (nextMeta) {
currentAcpEntry = {
...currentAcpEntry,
acp: nextMeta as typeof currentAcpEntry.acp,
};
}
return currentAcpEntry;
});
acpMocks.requireAcpRuntimeBackend.mockReturnValue({
id: "acpx",

View File

@@ -7,7 +7,7 @@ import {
} from "../../bindings/records.js";
import { shouldSuppressLocalExecApprovalPrompt } from "../../channels/plugins/exec-approval-local.js";
import type { OpenClawConfig } from "../../config/config.js";
import { parseSessionThreadInfo } from "../../config/sessions/delivery-info.js";
import { parseSessionThreadInfo } from "../../config/sessions/thread-info.js";
import type { SessionEntry } from "../../config/sessions/types.js";
import { logVerbose } from "../../globals.js";
import { fireAndForgetHook } from "../../hooks/fire-and-forget.js";
@@ -496,7 +496,10 @@ export async function dispatchReplyFromConfig(params: {
}
const dispatchAcpRuntime = await loadDispatchAcpRuntime();
const bypassAcpForCommand = dispatchAcpRuntime.shouldBypassAcpDispatchForCommand(ctx, cfg);
const bypassAcpForCommand = await dispatchAcpRuntime.shouldBypassAcpDispatchForCommand(
ctx,
cfg,
);
const sendPolicy = resolveSendPolicy({
cfg,

View File

@@ -1,18 +1,8 @@
import { resolveSessionThreadInfo } from "../../channels/plugins/session-conversation.js";
import { loadConfig } from "../io.js";
import { resolveStorePath } from "./paths.js";
import { loadSessionStore } from "./store.js";
/**
* Extract deliveryContext and threadId from a sessionKey.
* Supports generic :thread: suffixes plus plugin-owned thread/session grammars.
*/
export function parseSessionThreadInfo(sessionKey: string | undefined): {
baseSessionKey: string | undefined;
threadId: string | undefined;
} {
return resolveSessionThreadInfo(sessionKey);
}
export { parseSessionThreadInfo } from "./thread-info.js";
import { parseSessionThreadInfo } from "./thread-info.js";
export function extractDeliveryInfo(sessionKey: string | undefined): {
deliveryContext:

View File

@@ -0,0 +1,12 @@
import { resolveSessionThreadInfo } from "../../channels/plugins/session-conversation.js";
/**
* Extract deliveryContext and threadId from a sessionKey.
* Supports generic :thread: suffixes plus plugin-owned thread/session grammars.
*/
export function parseSessionThreadInfo(sessionKey: string | undefined): {
baseSessionKey: string | undefined;
threadId: string | undefined;
} {
return resolveSessionThreadInfo(sessionKey);
}

View File

@@ -2,7 +2,6 @@ import fs from "node:fs";
import path from "node:path";
import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent";
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
import { parseSessionThreadInfo } from "./delivery-info.js";
import {
resolveDefaultSessionStorePath,
resolveSessionFilePath,
@@ -11,6 +10,7 @@ import {
} from "./paths.js";
import { resolveAndPersistSessionFile } from "./session-file.js";
import { loadSessionStore, normalizeStoreSessionKey } from "./store.js";
import { parseSessionThreadInfo } from "./thread-info.js";
import { resolveMirroredTranscriptText } from "./transcript-mirror.js";
import type { SessionEntry } from "./types.js";

View File

@@ -2,7 +2,7 @@ import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-help
import { getChannelPlugin, normalizeChannelId } from "../channels/plugins/index.js";
import type { CliDeps } from "../cli/deps.js";
import { resolveMainSessionKeyFromConfig } from "../config/sessions.js";
import { parseSessionThreadInfo } from "../config/sessions/delivery-info.js";
import { parseSessionThreadInfo } from "../config/sessions/thread-info.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import { ackDelivery, enqueueDelivery, failDelivery } from "../infra/outbound/delivery-queue.js";