refactor: consume acpx runtime library (#61495)

* refactor: consume acpx runtime library

* refactor: remove duplicated acpx runtime files

* fix: update acpx runtime dependency

* fix: preserve acp runtime error codes

* fix: migrate legacy acpx session files

* fix: update acpx runtime dependency

* fix: import Dirent from node fs

* ACPX: repin shared runtime engine

* ACPX: repin runtime semantics fixes

* ACPX: repin runtime contract cleanup

* Extensions: repin ACPX after layout refactor

* ACPX: drop legacy session migration

* ACPX: drop direct ACP SDK dependency

* Discord ACP: stop duplicate direct fallback replies

* ACP: rename delivered text visibility hook

* ACPX: pin extension to 0.5.0

* Deps: drop stale ACPX build-script allowlist

* ACPX: add local development guidance

* ACPX: document temporary pnpm exception flow

* SDK: preserve legacy ACP visibility hook

* ACP: keep reset commands on local path

* ACP: make in-place reset start fresh session

* ACP: recover broken bindings on fresh reset

* ACP: defer fresh reset marker until close succeeds

* ACP: reset bound sessions fresh again

* Discord: ensure ACP bindings before /new

* ACP: recover missing persistent sessions
This commit is contained in:
Onur Solmaz
2026-04-06 15:51:08 +02:00
committed by GitHub
parent 4b2d528345
commit 154a7edb7c
62 changed files with 1261 additions and 7216 deletions

View File

@@ -27,7 +27,7 @@ describe("shouldBypassAcpDispatchForCommand", () => {
expect(shouldBypassAcpDispatchForCommand(ctx, {} as OpenClawConfig)).toBe(false);
});
it("returns false for ACP reset-tail slash commands", () => {
it("returns true for ACP reset-tail slash commands", () => {
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
@@ -37,7 +37,19 @@ describe("shouldBypassAcpDispatchForCommand", () => {
BodyForAgent: "/new continue with deployment",
});
expect(shouldBypassAcpDispatchForCommand(ctx, {} as OpenClawConfig)).toBe(false);
expect(shouldBypassAcpDispatchForCommand(ctx, {} as OpenClawConfig)).toBe(true);
});
it("returns true for bare ACP reset slash commands", () => {
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
CommandBody: "/reset",
BodyForCommands: "/reset",
BodyForAgent: "/reset",
});
expect(shouldBypassAcpDispatchForCommand(ctx, {} as OpenClawConfig)).toBe(true);
});
it("returns false for slash commands when text commands are disabled", () => {

View File

@@ -23,6 +23,10 @@ function resolveCommandCandidateText(ctx: FinalizedMsgContext): string {
return resolveFirstContextText(ctx, ["CommandBody", "BodyForCommands", "RawBody", "Body"]).trim();
}
function isResetCommandCandidate(text: string): boolean {
return /^\/(?:new|reset)(?:\s|$)/i.test(text);
}
export function shouldBypassAcpDispatchForCommand(
ctx: FinalizedMsgContext,
cfg: OpenClawConfig,
@@ -41,6 +45,10 @@ export function shouldBypassAcpDispatchForCommand(
return allowTextCommands;
}
if (isResetCommandCandidate(normalized)) {
return true;
}
if (!normalized.startsWith("!")) {
return false;
}

View File

@@ -17,6 +17,32 @@ const deliveryMocks = vi.hoisted(() => ({
runMessageAction: vi.fn(async (_params: unknown) => ({ ok: true as const })),
}));
const channelPluginMocks = vi.hoisted(() => ({
shouldTreatDeliveredTextAsVisible: (({
kind,
text,
}: {
kind: "tool" | "block" | "final";
text?: string;
}) => kind === "block" && typeof text === "string" && text.trim().length > 0) as
| ((params: { kind: "tool" | "block" | "final"; text?: string }) => boolean)
| undefined,
shouldTreatRoutedTextAsVisible: undefined as
| ((params: { kind: "tool" | "block" | "final"; text?: string }) => boolean)
| undefined,
getChannelPlugin: vi.fn((channelId: string) => {
if (channelId !== "discord" && channelId !== "telegram") {
return undefined;
}
return {
outbound: {
shouldTreatDeliveredTextAsVisible: channelPluginMocks.shouldTreatDeliveredTextAsVisible,
shouldTreatRoutedTextAsVisible: channelPluginMocks.shouldTreatRoutedTextAsVisible,
},
};
}),
}));
vi.mock("../../tts/tts.js", () => ({
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
}));
@@ -25,6 +51,14 @@ vi.mock("./route-reply.js", () => ({
routeReply: (params: unknown) => deliveryMocks.routeReply(params),
}));
vi.mock("../../channels/plugins/index.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../channels/plugins/index.js")>();
return {
...actual,
getChannelPlugin: (channelId: string) => channelPluginMocks.getChannelPlugin(channelId),
};
});
vi.mock("../../infra/outbound/message-action-runner.js", () => ({
runMessageAction: (params: unknown) => deliveryMocks.runMessageAction(params),
}));
@@ -62,6 +96,15 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
deliveryMocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock-message" });
deliveryMocks.runMessageAction.mockClear();
deliveryMocks.runMessageAction.mockResolvedValue({ ok: true as const });
channelPluginMocks.getChannelPlugin.mockClear();
channelPluginMocks.shouldTreatDeliveredTextAsVisible = ({
kind,
text,
}: {
kind: "tool" | "block" | "final";
text?: string;
}) => kind === "block" && typeof text === "string" && text.trim().length > 0;
channelPluginMocks.shouldTreatRoutedTextAsVisible = undefined;
});
it("bypasses TTS when skipTts is requested", async () => {
@@ -143,8 +186,18 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false);
});
it("does not treat non-telegram direct block text as visible", async () => {
const coordinator = createCoordinator();
it("does not treat channels without a visibility override as visible for direct block delivery", async () => {
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "whatsapp",
Surface: "whatsapp",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher: createDispatcher(),
inboundAudio: false,
shouldRouteToOriginating: false,
});
await coordinator.deliver("block", { text: "hello" }, { skipTts: true });
await coordinator.settleVisibleText();
@@ -155,6 +208,34 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
expect(coordinator.getRoutedCounts().block).toBe(0);
});
it("treats direct discord block text as visible", async () => {
const coordinator = createCoordinator();
await coordinator.deliver("block", { text: "hello" }, { skipTts: true });
await coordinator.settleVisibleText();
expect(coordinator.hasDeliveredVisibleText()).toBe(true);
expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false);
});
it("honors the legacy routed visibility hook name for plugin compatibility", async () => {
channelPluginMocks.shouldTreatDeliveredTextAsVisible = undefined;
channelPluginMocks.shouldTreatRoutedTextAsVisible = ({
kind,
text,
}: {
kind: "tool" | "block" | "final";
text?: string;
}) => kind === "block" && typeof text === "string" && text.trim().length > 0;
const coordinator = createCoordinator();
await coordinator.deliver("block", { text: "hello" }, { skipTts: true });
await coordinator.settleVisibleText();
expect(coordinator.hasDeliveredVisibleText()).toBe(true);
expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false);
});
it("tracks failed visible telegram block delivery separately", async () => {
const dispatcher: ReplyDispatcher = {
sendToolResult: vi.fn(() => true),
@@ -300,4 +381,26 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
}),
);
});
it("treats routed discord block text as visible", async () => {
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher: createDispatcher(),
inboundAudio: false,
shouldRouteToOriginating: true,
originatingChannel: "discord",
originatingTo: "channel:thread-1",
});
await coordinator.deliver("block", { text: "hello" }, { skipTts: true });
expect(coordinator.hasDeliveredVisibleText()).toBe(true);
expect(coordinator.hasFailedVisibleTextDelivery()).toBe(false);
expect(coordinator.getRoutedCounts().block).toBe(1);
});
});

View File

@@ -82,6 +82,7 @@ async function shouldTreatDeliveredTextAsVisible(params: {
channel: string | undefined;
kind: ReplyDispatchKind;
text: string | undefined;
routed: boolean;
}): Promise<boolean> {
if (!params.text?.trim()) {
return false;
@@ -93,19 +94,20 @@ async function shouldTreatDeliveredTextAsVisible(params: {
if (!channelId) {
return false;
}
// Only Telegram currently overrides block/tool visibility via channel runtime.
// Keep other channels on the fast path so ACP local delivery does not pay the
// broader channel-registry import cost on every streamed turn.
if (channelId !== "telegram") {
return false;
}
const { getChannelPlugin } = await loadChannelPluginRuntime();
return (
getChannelPlugin(channelId)?.outbound?.shouldTreatRoutedTextAsVisible?.({
const outbound = getChannelPlugin(channelId)?.outbound;
const visibilityOverride =
outbound?.shouldTreatDeliveredTextAsVisible ?? outbound?.shouldTreatRoutedTextAsVisible;
if (visibilityOverride) {
return visibilityOverride({
kind: params.kind,
text: params.text,
}) === true
);
});
}
if (!params.routed) {
return channelId === "telegram";
}
return false;
}
async function maybeApplyAcpTts(params: {
@@ -320,6 +322,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
channel: routedChannel,
kind,
text: ttsPayload.text,
routed: true,
});
const { routeReply } = await loadRouteReplyRuntime();
const result = await routeReply({
@@ -363,6 +366,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
channel: directChannel,
kind,
text: ttsPayload.text,
routed: false,
});
const delivered =
kind === "tool"

View File

@@ -33,6 +33,25 @@ const routeMocks = vi.hoisted(() => ({
routeReply: vi.fn(async (_params: unknown) => ({ ok: true, messageId: "mock" })),
}));
const channelPluginMocks = vi.hoisted(() => ({
getChannelPlugin: vi.fn((channelId: string) => {
if (channelId !== "discord" && channelId !== "telegram") {
return undefined;
}
return {
outbound: {
shouldTreatDeliveredTextAsVisible: ({
kind,
text,
}: {
kind: "tool" | "block" | "final";
text?: string;
}) => kind === "block" && typeof text === "string" && text.trim().length > 0,
},
};
}),
}));
const messageActionMocks = vi.hoisted(() => ({
runMessageAction: vi.fn(async (_params: unknown) => ({ ok: true as const })),
}));
@@ -109,6 +128,8 @@ async function runDispatch(params: {
cfg?: OpenClawConfig;
dispatcher?: ReplyDispatcher;
shouldRouteToOriginating?: boolean;
originatingChannel?: string;
originatingTo?: string;
onReplyStart?: () => void;
ctxOverrides?: Record<string, unknown>;
sessionKeyOverride?: string;
@@ -128,7 +149,10 @@ async function runDispatch(params: {
inboundAudio: false,
shouldRouteToOriginating: params.shouldRouteToOriginating ?? false,
...(params.shouldRouteToOriginating
? { originatingChannel: "telegram", originatingTo: "telegram:thread-1" }
? {
originatingChannel: params.originatingChannel ?? "telegram",
originatingTo: params.originatingTo ?? "telegram:thread-1",
}
: {}),
shouldSendToolSummaries: true,
bypassForCommand: false,
@@ -236,6 +260,13 @@ describe("tryDispatchAcpReply", () => {
vi.doMock("./route-reply.js", () => ({
routeReply: (params: unknown) => routeMocks.routeReply(params),
}));
vi.doMock("../../channels/plugins/index.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../channels/plugins/index.js")>();
return {
...actual,
getChannelPlugin: (channelId: string) => channelPluginMocks.getChannelPlugin(channelId),
};
});
vi.doMock("../../infra/outbound/message-action-runner.js", () => ({
runMessageAction: (params: unknown) => messageActionMocks.runMessageAction(params),
}));
@@ -295,6 +326,7 @@ describe("tryDispatchAcpReply", () => {
policyMocks.resolveAcpAgentPolicyError.mockReturnValue(null);
routeMocks.routeReply.mockReset();
routeMocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" });
channelPluginMocks.getChannelPlugin.mockClear();
messageActionMocks.runMessageAction.mockReset();
messageActionMocks.runMessageAction.mockResolvedValue({ ok: true as const });
ttsMocks.maybeApplyTtsToPayload.mockClear();
@@ -957,6 +989,36 @@ describe("tryDispatchAcpReply", () => {
expect(routeMocks.routeReply).toHaveBeenCalledTimes(1);
});
it("does not deliver final fallback text when routed discord block text was already visible", async () => {
setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
queueTtsReplies(
{ text: "Received your test message." },
{} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>,
);
mockRoutedTextTurn("Received your test message.");
const { dispatcher } = createDispatcher();
const result = await runDispatch({
bodyForAgent: "run acp",
dispatcher,
shouldRouteToOriginating: true,
originatingChannel: "discord",
originatingTo: "channel:1478836151241412759",
});
expect(result?.counts.block).toBe(1);
expect(result?.counts.final).toBe(0);
expect(routeMocks.routeReply).toHaveBeenCalledTimes(1);
expect(routeMocks.routeReply).toHaveBeenCalledWith(
expect.objectContaining({
channel: "discord",
to: "channel:1478836151241412759",
payload: expect.objectContaining({ text: "Received your test message." }),
}),
);
});
it("does not deliver final fallback text when direct block text was already visible", async () => {
setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
@@ -983,6 +1045,35 @@ describe("tryDispatchAcpReply", () => {
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("does not deliver final fallback text when direct discord block text was already visible", async () => {
setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
queueTtsReplies(
{ text: "Received." },
{} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>,
);
mockVisibleTextTurn("Received.");
const { dispatcher, counts } = createDispatcher();
const result = await runDispatch({
bodyForAgent: "reply",
dispatcher,
ctxOverrides: {
Provider: "discord",
Surface: "discord",
},
});
expect(result?.counts.block).toBe(0);
expect(result?.counts.final).toBe(0);
expect(counts.block).toBe(0);
expect(counts.final).toBe(0);
expect(dispatcher.sendBlockReply).toHaveBeenCalledWith(
expect.objectContaining({ text: "Received." }),
);
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("treats visible telegram ACP block delivery as a successful final response", async () => {
setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
@@ -1006,7 +1097,7 @@ describe("tryDispatchAcpReply", () => {
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("preserves final fallback when direct block text is filtered by non-telegram channels", async () => {
it("preserves final fallback when direct block text is filtered by channels without a visibility override", async () => {
setReadyAcpResolution();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>);
@@ -1016,6 +1107,10 @@ describe("tryDispatchAcpReply", () => {
const result = await runDispatch({
bodyForAgent: "reply",
dispatcher,
ctxOverrides: {
Provider: "whatsapp",
Surface: "whatsapp",
},
});
expect(result?.counts.block).toBe(0);

View File

@@ -616,7 +616,7 @@ describe("initSessionState RawBody", () => {
expect(result.triggerBodyNormalized).toBe("/NEW KeepThisCase");
});
it("does not rotate local session state for /new on bound ACP sessions", async () => {
it("rotates local session state for /new on bound ACP sessions", async () => {
const root = await makeCaseDir("openclaw-rawbody-acp-reset-");
const storePath = path.join(root, "sessions.json");
const sessionKey = "agent:codex:acp:binding:discord:default:feedface";
@@ -667,9 +667,9 @@ describe("initSessionState RawBody", () => {
commandAuthorized: true,
});
expect(result.resetTriggered).toBe(false);
expect(result.sessionId).toBe(existingSessionId);
expect(result.isNewSession).toBe(false);
expect(result.resetTriggered).toBe(true);
expect(result.sessionId).not.toBe(existingSessionId);
expect(result.isNewSession).toBe(true);
});
it("rotates local session state for ACP /new when no matching conversation binding exists", async () => {

View File

@@ -1,6 +1,5 @@
import crypto from "node:crypto";
import path from "node:path";
import { normalizeConversationText } from "../../acp/conversation-id.js";
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import { clearBootstrapSnapshotOnSessionRollover } from "../../agents/bootstrap-cache.js";
import { disposeSessionMcpRuntime } from "../../agents/pi-bundle-mcp-tools.js";
@@ -38,7 +37,6 @@ import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js"
import { isInternalMessageChannel } from "../../utils/message-channel.js";
import { resolveCommandAuthorization } from "../command-auth.js";
import type { MsgContext, TemplateContext } from "../templating.js";
import { resolveEffectiveResetTargetSessionKey } from "./acp-reset-target.js";
import { resolveConversationBindingContextFromMessage } from "./conversation-binding-input.js";
import { normalizeInboundTextNewlines } from "./inbound-text.js";
import { stripMentions, stripStructuralPrefixes } from "./mentions.js";
@@ -180,32 +178,6 @@ function resolveSessionConversationBindingContext(
};
}
function resolveBoundAcpSessionForReset(params: {
cfg: OpenClawConfig;
ctx: MsgContext;
bindingContext?: {
channel: string;
accountId: string;
conversationId: string;
parentConversationId?: string;
} | null;
}): string | undefined {
const activeSessionKey = normalizeConversationText(params.ctx.SessionKey);
const bindingContext =
params.bindingContext ?? resolveSessionConversationBindingContext(params.cfg, params.ctx);
return resolveEffectiveResetTargetSessionKey({
cfg: params.cfg,
channel: bindingContext?.channel,
accountId: bindingContext?.accountId,
conversationId: bindingContext?.conversationId,
parentConversationId: bindingContext?.parentConversationId,
activeSessionKey,
allowNonAcpBindingSessionKey: false,
skipConfiguredFallbackWhenActiveSessionNonAcp: true,
fallbackToActiveAcpWhenUnbound: false,
});
}
function resolveBoundConversationSessionKey(params: {
cfg: OpenClawConfig;
ctx: MsgContext;
@@ -340,17 +312,6 @@ export async function initSessionState(params: {
const strippedForReset = isGroup
? stripMentions(triggerBodyNormalized, ctx, cfg, agentId)
: triggerBodyNormalized;
const shouldUseAcpInPlaceReset = Boolean(
resolveBoundAcpSessionForReset({
cfg,
ctx: sessionCtxForState,
bindingContext: conversationBindingContext,
}),
);
const shouldBypassAcpResetForTrigger = (triggerLower: string): boolean =>
shouldUseAcpInPlaceReset &&
DEFAULT_RESET_TRIGGERS.some((defaultTrigger) => defaultTrigger.toLowerCase() === triggerLower);
// Reset triggers are configured as lowercased commands (e.g. "/new"), but users may type
// "/NEW" etc. Match case-insensitively while keeping the original casing for any stripped body.
const trimmedBodyLower = trimmedBody.toLowerCase();
@@ -366,12 +327,6 @@ export async function initSessionState(params: {
}
const triggerLower = trigger.toLowerCase();
if (trimmedBodyLower === triggerLower || strippedForResetLower === triggerLower) {
if (shouldBypassAcpResetForTrigger(triggerLower)) {
// ACP-bound conversations handle /new and /reset in command handling
// so the bound ACP runtime can be reset in place without rotating the
// normal OpenClaw session/transcript.
break;
}
isNewSession = true;
bodyStripped = "";
resetTriggered = true;
@@ -383,9 +338,6 @@ export async function initSessionState(params: {
trimmedBodyLower.startsWith(triggerPrefixLower) ||
strippedForResetLower.startsWith(triggerPrefixLower)
) {
if (shouldBypassAcpResetForTrigger(triggerLower)) {
break;
}
isNewSession = true;
bodyStripped = strippedForReset.slice(trigger.length).trimStart();
resetTriggered = true;