import { beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; import type { SessionBindingRecord } from "../../infra/outbound/session-binding-service.js"; import type { PluginHookBeforeDispatchResult, PluginTargetedInboundClaimOutcome, } from "../../plugins/hooks.js"; import { setActivePluginRegistry } from "../../plugins/runtime.js"; import { createChannelTestPluginBase, createTestRegistry, } from "../../test-utils/channel-plugins.js"; import { createInternalHookEventPayload } from "../../test-utils/internal-hook-event-payload.js"; import type { MsgContext } from "../templating.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import type { ReplyDispatcher } from "./reply-dispatcher.js"; import { buildTestCtx } from "./test-ctx.js"; type AbortResult = { handled: boolean; aborted: boolean; stoppedSubagents?: number }; const mocks = vi.hoisted(() => ({ routeReply: vi.fn(async (_params: unknown) => ({ ok: true, messageId: "mock" })), tryFastAbortFromMessage: vi.fn<() => Promise>(async () => ({ handled: false, aborted: false, })), })); const diagnosticMocks = vi.hoisted(() => ({ logMessageQueued: vi.fn(), logMessageProcessed: vi.fn(), logSessionStateChange: vi.fn(), })); const hookMocks = vi.hoisted(() => ({ registry: { plugins: [] as Array<{ id: string; status: "loaded" | "disabled" | "error"; }>, }, runner: { hasHooks: vi.fn<(hookName?: string) => boolean>(() => false), runInboundClaim: vi.fn(async () => undefined), runInboundClaimForPlugin: vi.fn(async () => undefined), runInboundClaimForPluginOutcome: vi.fn<() => Promise>( async () => ({ status: "no_handler" as const }), ), runMessageReceived: vi.fn(async () => {}), runBeforeDispatch: vi.fn< (_event: unknown, _ctx: unknown) => Promise >(async () => undefined), }, })); const internalHookMocks = vi.hoisted(() => ({ createInternalHookEvent: vi.fn(), triggerInternalHook: vi.fn(async () => {}), })); const acpMocks = vi.hoisted(() => ({ listAcpSessionEntries: vi.fn(async () => []), readAcpSessionEntry: vi.fn<() => unknown>(() => null), upsertAcpSessionMeta: vi.fn(async () => null), requireAcpRuntimeBackend: vi.fn<() => unknown>(), })); const sessionBindingMocks = vi.hoisted(() => ({ listBySession: vi.fn<(targetSessionKey: string) => SessionBindingRecord[]>(() => []), resolveByConversation: vi.fn< (ref: { channel: string; accountId: string; conversationId: string; parentConversationId?: string; }) => SessionBindingRecord | null >(() => null), touch: vi.fn(), })); const sessionStoreMocks = vi.hoisted(() => ({ currentEntry: undefined as Record | undefined, loadSessionStore: vi.fn(() => ({})), resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"), resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })), })); const ttsMocks = vi.hoisted(() => { const state = { synthesizeFinalAudio: false, }; return { state, maybeApplyTtsToPayload: vi.fn(async (paramsUnknown: unknown) => { const params = paramsUnknown as { payload: ReplyPayload; kind: "tool" | "block" | "final"; }; if ( state.synthesizeFinalAudio && params.kind === "final" && typeof params.payload?.text === "string" && params.payload.text.trim() ) { return { ...params.payload, mediaUrl: "https://example.com/tts-synth.opus", audioAsVoice: true, }; } return params.payload; }), normalizeTtsAutoMode: vi.fn((value: unknown) => typeof value === "string" ? value : undefined, ), resolveTtsConfig: vi.fn((_cfg: OpenClawConfig) => ({ mode: "final" })), }; }); vi.mock("./route-reply.runtime.js", () => ({ isRoutableChannel: (channel: string | undefined) => Boolean( channel && [ "telegram", "slack", "discord", "signal", "imessage", "whatsapp", "feishu", "mattermost", ].includes(channel), ), routeReply: mocks.routeReply, })); vi.mock("./route-reply.js", async (importOriginal) => { const actual = await importOriginal(); return { ...actual, routeReply: mocks.routeReply, }; }); vi.mock("./abort.runtime.js", () => ({ tryFastAbortFromMessage: mocks.tryFastAbortFromMessage, formatAbortReplyText: (stoppedSubagents?: number) => { if (typeof stoppedSubagents !== "number" || stoppedSubagents <= 0) { return "⚙️ Agent was aborted."; } const label = stoppedSubagents === 1 ? "sub-agent" : "sub-agents"; return `⚙️ Agent was aborted. Stopped ${stoppedSubagents} ${label}.`; }, })); vi.mock("../../logging/diagnostic.js", () => ({ logMessageQueued: diagnosticMocks.logMessageQueued, logMessageProcessed: diagnosticMocks.logMessageProcessed, logSessionStateChange: diagnosticMocks.logSessionStateChange, })); vi.mock("../../config/sessions/store.js", async (importOriginal) => { const actual = await importOriginal(); return { ...actual, loadSessionStore: sessionStoreMocks.loadSessionStore, resolveSessionStoreEntry: sessionStoreMocks.resolveSessionStoreEntry, }; }); vi.mock("../../config/sessions/paths.js", async (importOriginal) => { const actual = await importOriginal(); return { ...actual, resolveStorePath: sessionStoreMocks.resolveStorePath, }; }); vi.mock("../../plugins/hook-runner-global.js", () => ({ getGlobalHookRunner: () => hookMocks.runner, getGlobalPluginRegistry: () => hookMocks.registry, })); vi.mock("../../hooks/internal-hooks.js", () => ({ createInternalHookEvent: internalHookMocks.createInternalHookEvent, triggerInternalHook: internalHookMocks.triggerInternalHook, })); vi.mock("../../acp/runtime/session-meta.js", () => ({ listAcpSessionEntries: acpMocks.listAcpSessionEntries, readAcpSessionEntry: acpMocks.readAcpSessionEntry, upsertAcpSessionMeta: acpMocks.upsertAcpSessionMeta, })); vi.mock("../../acp/runtime/registry.js", () => ({ requireAcpRuntimeBackend: acpMocks.requireAcpRuntimeBackend, })); vi.mock("../../infra/outbound/session-binding-service.js", async (importOriginal) => { const actual = await importOriginal(); return { ...actual, getSessionBindingService: () => ({ bind: vi.fn(async () => { throw new Error("bind not mocked"); }), getCapabilities: vi.fn(() => ({ adapterAvailable: true, bindSupported: true, unbindSupported: true, placements: ["current", "child"] as const, })), listBySession: (targetSessionKey: string) => sessionBindingMocks.listBySession(targetSessionKey), resolveByConversation: sessionBindingMocks.resolveByConversation, touch: sessionBindingMocks.touch, unbind: vi.fn(async () => []), }), }; }); vi.mock("../../tts/tts.js", () => ({ maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value), resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg), })); vi.mock("../../tts/tts.runtime.js", () => ({ maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), })); vi.mock("../../tts/tts-config.js", () => ({ normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value), resolveConfiguredTtsMode: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg).mode, })); const noAbortResult = { handled: false, aborted: false } as const; const emptyConfig = {} as OpenClawConfig; let dispatchReplyFromConfig: typeof import("./dispatch-from-config.js").dispatchReplyFromConfig; let resetInboundDedupe: typeof import("./inbound-dedupe.js").resetInboundDedupe; let acpManagerTesting: typeof import("../../acp/control-plane/manager.js").__testing; let pluginBindingTesting: typeof import("../../plugins/conversation-binding.js").__testing; let AcpRuntimeErrorClass: typeof import("../../acp/runtime/errors.js").AcpRuntimeError; type DispatchReplyArgs = Parameters< typeof import("./dispatch-from-config.js").dispatchReplyFromConfig >[0]; function createDispatcher(): ReplyDispatcher { return { sendToolResult: vi.fn(() => true), sendBlockReply: vi.fn(() => true), sendFinalReply: vi.fn(() => true), waitForIdle: vi.fn(async () => {}), getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), markComplete: vi.fn(), }; } function setNoAbort() { mocks.tryFastAbortFromMessage.mockResolvedValue(noAbortResult); } function createAcpRuntime(events: Array>) { return { ensureSession: vi.fn( async (input: { sessionKey: string; mode: string; agent: string }) => ({ sessionKey: input.sessionKey, backend: "acpx", runtimeSessionName: `${input.sessionKey}:${input.mode}`, }) as { sessionKey: string; backend: string; runtimeSessionName: string }, ), runTurn: vi.fn(async function* (_params: { text?: string }) { for (const event of events) { yield event; } }), cancel: vi.fn(async () => {}), close: vi.fn(async () => {}), }; } function firstToolResultPayload(dispatcher: ReplyDispatcher): ReplyPayload | undefined { return (dispatcher.sendToolResult as ReturnType).mock.calls[0]?.[0] as | ReplyPayload | undefined; } async function dispatchTwiceWithFreshDispatchers(params: Omit) { await dispatchReplyFromConfig({ ...params, dispatcher: createDispatcher(), }); await dispatchReplyFromConfig({ ...params, dispatcher: createDispatcher(), }); } describe("dispatchReplyFromConfig", () => { beforeEach(async () => { vi.resetModules(); ({ dispatchReplyFromConfig } = await import("./dispatch-from-config.js")); ({ resetInboundDedupe } = await import("./inbound-dedupe.js")); ({ __testing: acpManagerTesting } = await import("../../acp/control-plane/manager.js")); ({ __testing: pluginBindingTesting } = await import("../../plugins/conversation-binding.js")); ({ AcpRuntimeError: AcpRuntimeErrorClass } = await import("../../acp/runtime/errors.js")); const discordTestPlugin = { ...createChannelTestPluginBase({ id: "discord", capabilities: { chatTypes: ["direct"], nativeCommands: true, }, }), execApprovals: { shouldSuppressLocalPrompt: ({ payload }: { payload: ReplyPayload }) => Boolean( payload.channelData && typeof payload.channelData === "object" && !Array.isArray(payload.channelData) && payload.channelData.execApproval, ), }, }; setActivePluginRegistry( createTestRegistry([ { pluginId: "discord", source: "test", plugin: discordTestPlugin, }, ]), ); acpManagerTesting.resetAcpSessionManagerForTests(); resetInboundDedupe(); mocks.routeReply.mockReset(); mocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" }); acpMocks.listAcpSessionEntries.mockReset().mockResolvedValue([]); diagnosticMocks.logMessageQueued.mockClear(); diagnosticMocks.logMessageProcessed.mockClear(); diagnosticMocks.logSessionStateChange.mockClear(); hookMocks.runner.hasHooks.mockClear(); hookMocks.runner.hasHooks.mockReturnValue(false); hookMocks.runner.runInboundClaim.mockClear(); hookMocks.runner.runInboundClaim.mockResolvedValue(undefined); hookMocks.runner.runInboundClaimForPlugin.mockClear(); hookMocks.runner.runInboundClaimForPlugin.mockResolvedValue(undefined); hookMocks.runner.runInboundClaimForPluginOutcome.mockClear(); hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ status: "no_handler", }); hookMocks.runner.runMessageReceived.mockClear(); hookMocks.runner.runBeforeDispatch.mockClear(); hookMocks.runner.runBeforeDispatch.mockResolvedValue(undefined); hookMocks.registry.plugins = []; internalHookMocks.createInternalHookEvent.mockClear(); internalHookMocks.createInternalHookEvent.mockImplementation(createInternalHookEventPayload); internalHookMocks.triggerInternalHook.mockClear(); acpMocks.readAcpSessionEntry.mockReset(); acpMocks.readAcpSessionEntry.mockReturnValue(null); acpMocks.upsertAcpSessionMeta.mockReset(); acpMocks.upsertAcpSessionMeta.mockResolvedValue(null); acpMocks.requireAcpRuntimeBackend.mockReset(); sessionBindingMocks.listBySession.mockReset(); sessionBindingMocks.listBySession.mockReturnValue([]); pluginBindingTesting.reset(); sessionBindingMocks.resolveByConversation.mockReset(); sessionBindingMocks.resolveByConversation.mockReturnValue(null); sessionBindingMocks.touch.mockReset(); sessionStoreMocks.currentEntry = undefined; sessionStoreMocks.loadSessionStore.mockClear(); sessionStoreMocks.resolveStorePath.mockClear(); sessionStoreMocks.resolveSessionStoreEntry.mockClear(); ttsMocks.state.synthesizeFinalAudio = false; ttsMocks.maybeApplyTtsToPayload.mockClear(); ttsMocks.normalizeTtsAutoMode.mockClear(); ttsMocks.resolveTtsConfig.mockClear(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final", }); }); it("does not route when Provider matches OriginatingChannel (even if Surface is missing)", async () => { setNoAbort(); mocks.routeReply.mockClear(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "slack", Surface: undefined, OriginatingChannel: "slack", OriginatingTo: "channel:C123", }); const replyResolver = async ( _ctx: MsgContext, _opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(mocks.routeReply).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1); }); it("routes when OriginatingChannel differs from Provider", async () => { setNoAbort(); mocks.routeReply.mockClear(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "slack", AccountId: "acc-1", MessageThreadId: 123, GroupChannel: "ops-room", OriginatingChannel: "telegram", OriginatingTo: "telegram:999", }); const replyResolver = async ( _ctx: MsgContext, _opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); expect(mocks.routeReply).toHaveBeenCalledWith( expect.objectContaining({ channel: "telegram", to: "telegram:999", accountId: "acc-1", threadId: 123, isGroup: true, groupId: "telegram:999", }), ); }); it("falls back to thread-scoped session key when current ctx has no MessageThreadId", async () => { setNoAbort(); mocks.routeReply.mockClear(); sessionStoreMocks.currentEntry = { deliveryContext: { channel: "mattermost", to: "channel:CHAN1", accountId: "default", }, origin: { threadId: "stale-origin-root", }, lastThreadId: "stale-origin-root", }; const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "webchat", Surface: "webchat", SessionKey: "agent:main:mattermost:channel:CHAN1:thread:post-root", AccountId: "default", MessageThreadId: undefined, OriginatingChannel: "mattermost", OriginatingTo: "channel:CHAN1", ExplicitDeliverRoute: true, }); const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(mocks.routeReply).toHaveBeenCalledWith( expect.objectContaining({ channel: "mattermost", to: "channel:CHAN1", threadId: "post-root", }), ); }); it("does not resurrect a cleared route thread from origin metadata", async () => { setNoAbort(); mocks.routeReply.mockClear(); // Simulate the real store: lastThreadId and deliveryContext.threadId may be normalised from // origin.threadId on read, but a non-thread session key must still route to channel root. sessionStoreMocks.currentEntry = { deliveryContext: { channel: "mattermost", to: "channel:CHAN1", accountId: "default", threadId: "stale-root", }, lastThreadId: "stale-root", origin: { threadId: "stale-root", }, }; const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "webchat", Surface: "webchat", SessionKey: "agent:main:mattermost:channel:CHAN1", AccountId: "default", MessageThreadId: undefined, OriginatingChannel: "mattermost", OriginatingTo: "channel:CHAN1", ExplicitDeliverRoute: true, }); const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); const routeCall = mocks.routeReply.mock.calls[0]?.[0] as | { channel?: string; to?: string; threadId?: string | number } | undefined; expect(routeCall).toMatchObject({ channel: "mattermost", to: "channel:CHAN1", }); expect(routeCall?.threadId).toBeUndefined(); }); it("forces suppressTyping when routing to a different originating channel", async () => { setNoAbort(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "slack", OriginatingChannel: "telegram", OriginatingTo: "telegram:999", }); const replyResolver = async (_ctx: MsgContext, opts?: GetReplyOptions) => { expect(opts?.suppressTyping).toBe(true); expect(opts?.typingPolicy).toBe("system_event"); return { text: "hi" } satisfies ReplyPayload; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); }); it("forces suppressTyping for internal webchat turns", async () => { setNoAbort(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "webchat", Surface: "webchat", OriginatingChannel: "webchat", OriginatingTo: "session:abc", }); const replyResolver = async (_ctx: MsgContext, opts?: GetReplyOptions) => { expect(opts?.suppressTyping).toBe(true); expect(opts?.typingPolicy).toBe("internal_webchat"); return { text: "hi" } satisfies ReplyPayload; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); }); it("routes when provider is webchat but surface carries originating channel metadata", async () => { setNoAbort(); mocks.routeReply.mockClear(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "webchat", Surface: "telegram", OriginatingChannel: "telegram", OriginatingTo: "telegram:999", }); const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); expect(mocks.routeReply).toHaveBeenCalledWith( expect.objectContaining({ channel: "telegram", to: "telegram:999", }), ); }); it("routes Feishu replies when provider is webchat and origin metadata points to Feishu", async () => { setNoAbort(); mocks.routeReply.mockClear(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "webchat", Surface: "feishu", OriginatingChannel: "feishu", OriginatingTo: "ou_feishu_direct_123", }); const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); expect(mocks.routeReply).toHaveBeenCalledWith( expect.objectContaining({ channel: "feishu", to: "ou_feishu_direct_123", }), ); }); it("does not route when provider already matches originating channel", async () => { setNoAbort(); mocks.routeReply.mockClear(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", Surface: "webchat", OriginatingChannel: "telegram", OriginatingTo: "telegram:999", }); const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(mocks.routeReply).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1); }); it("does not route external origin replies when current surface is internal webchat without explicit delivery", async () => { setNoAbort(); mocks.routeReply.mockClear(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "webchat", Surface: "webchat", OriginatingChannel: "imessage", OriginatingTo: "imessage:+15550001111", }); const replyResolver = async ( _ctx: MsgContext, _opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(mocks.routeReply).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1); }); it("routes external origin replies for internal webchat turns when explicit delivery is set", async () => { setNoAbort(); mocks.routeReply.mockClear(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "webchat", Surface: "webchat", OriginatingChannel: "imessage", OriginatingTo: "imessage:+15550001111", ExplicitDeliverRoute: true, }); const replyResolver = async ( _ctx: MsgContext, _opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); expect(mocks.routeReply).toHaveBeenCalledWith( expect.objectContaining({ channel: "imessage", to: "imessage:+15550001111", }), ); }); it("routes media-only tool results when summaries are suppressed", async () => { setNoAbort(); mocks.routeReply.mockClear(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "slack", ChatType: "group", AccountId: "acc-1", OriginatingChannel: "telegram", OriginatingTo: "telegram:999", }); const replyResolver = async ( _ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => { expect(opts?.onToolResult).toBeDefined(); await opts?.onToolResult?.({ text: "NO_REPLY", mediaUrls: ["https://example.com/tts-routed.opus"], }); return undefined; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendToolResult).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); expect(mocks.routeReply).toHaveBeenCalledTimes(1); const routed = mocks.routeReply.mock.calls[0]?.[0] as { payload?: ReplyPayload } | undefined; expect(routed?.payload?.mediaUrls).toEqual(["https://example.com/tts-routed.opus"]); expect(routed?.payload?.text).toBeUndefined(); }); it("provides onToolResult in DM sessions", async () => { setNoAbort(); mocks.routeReply.mockClear(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", ChatType: "direct", }); const replyResolver = async ( _ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => { expect(opts?.onToolResult).toBeDefined(); expect(typeof opts?.onToolResult).toBe("function"); return { text: "hi" } satisfies ReplyPayload; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1); }); it("suppresses group tool summaries but still forwards tool media", async () => { setNoAbort(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", ChatType: "group", }); const replyResolver = async ( _ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => { expect(opts?.onToolResult).toBeDefined(); await opts?.onToolResult?.({ text: "🔧 exec: ls" }); await opts?.onToolResult?.({ text: "NO_REPLY", mediaUrls: ["https://example.com/tts-group.opus"], }); return { text: "hi" } satisfies ReplyPayload; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendToolResult).toHaveBeenCalledTimes(1); const sent = firstToolResultPayload(dispatcher); expect(sent?.mediaUrls).toEqual(["https://example.com/tts-group.opus"]); expect(sent?.text).toBeUndefined(); expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1); }); it("delivers tool summaries in forum topic sessions (group + IsForum)", async () => { setNoAbort(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", ChatType: "group", IsForum: true, MessageThreadId: 99, }); const replyResolver = async ( _ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => { await opts?.onToolResult?.({ text: "🔧 exec: ls" }); return { text: "done" } satisfies ReplyPayload; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendToolResult).toHaveBeenCalledWith( expect.objectContaining({ text: "🔧 exec: ls" }), ); expect(dispatcher.sendToolResult).toHaveBeenCalledTimes(1); expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1); }); it("delivers deterministic exec approval tool payloads in groups", async () => { setNoAbort(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", ChatType: "group", }); const replyResolver = async ( _ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => { await opts?.onToolResult?.({ text: "Approval required.\n\n```txt\n/approve 117ba06d allow-once\n```", channelData: { execApproval: { approvalId: "117ba06d-1111-2222-3333-444444444444", approvalSlug: "117ba06d", allowedDecisions: ["allow-once", "allow-always", "deny"], }, }, }); return { text: "NO_REPLY" } satisfies ReplyPayload; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendToolResult).toHaveBeenCalledTimes(1); expect(firstToolResultPayload(dispatcher)).toEqual( expect.objectContaining({ text: "Approval required.\n\n```txt\n/approve 117ba06d allow-once\n```", channelData: { execApproval: { approvalId: "117ba06d-1111-2222-3333-444444444444", approvalSlug: "117ba06d", allowedDecisions: ["allow-once", "allow-always", "deny"], }, }, }), ); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "NO_REPLY" }); }); it("sends tool results via dispatcher in DM sessions", async () => { setNoAbort(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", ChatType: "direct", }); const replyResolver = async ( _ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => { // Simulate tool result emission await opts?.onToolResult?.({ text: "🔧 exec: ls" }); return { text: "done" } satisfies ReplyPayload; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendToolResult).toHaveBeenCalledWith( expect.objectContaining({ text: "🔧 exec: ls" }), ); expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1); }); it("suppresses native tool summaries but still forwards tool media", async () => { setNoAbort(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", ChatType: "direct", CommandSource: "native", }); const replyResolver = async ( _ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => { expect(opts?.onToolResult).toBeDefined(); await opts?.onToolResult?.({ text: "🔧 tools/sessions_send" }); await opts?.onToolResult?.({ mediaUrl: "https://example.com/tts-native.opus", }); return { text: "hi" } satisfies ReplyPayload; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendToolResult).toHaveBeenCalledTimes(1); const sent = firstToolResultPayload(dispatcher); expect(sent?.mediaUrl).toBe("https://example.com/tts-native.opus"); expect(sent?.text).toBeUndefined(); expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1); }); it("delivers deterministic exec approval tool payloads for native commands", async () => { setNoAbort(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", CommandSource: "native", }); const replyResolver = async ( _ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig, ) => { await opts?.onToolResult?.({ text: "Approval required.\n\n```txt\n/approve 117ba06d allow-once\n```", channelData: { execApproval: { approvalId: "117ba06d-1111-2222-3333-444444444444", approvalSlug: "117ba06d", allowedDecisions: ["allow-once", "allow-always", "deny"], }, }, }); return { text: "NO_REPLY" } satisfies ReplyPayload; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendToolResult).toHaveBeenCalledTimes(1); expect(firstToolResultPayload(dispatcher)).toEqual( expect.objectContaining({ channelData: { execApproval: { approvalId: "117ba06d-1111-2222-3333-444444444444", approvalSlug: "117ba06d", allowedDecisions: ["allow-once", "allow-always", "deny"], }, }, }), ); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "NO_REPLY" }); }); it("fast-aborts without calling the reply resolver", async () => { mocks.tryFastAbortFromMessage.mockResolvedValue({ handled: true, aborted: true, }); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", Body: "/stop", }); const replyResolver = vi.fn(async () => ({ text: "hi" }) as ReplyPayload); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(replyResolver).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "⚙️ Agent was aborted.", }); }); it("fast-abort reply includes stopped subagent count when provided", async () => { mocks.tryFastAbortFromMessage.mockResolvedValue({ handled: true, aborted: true, stoppedSubagents: 2, }); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", Body: "/stop", }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver: vi.fn(async () => ({ text: "hi" }) as ReplyPayload), }); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "⚙️ Agent was aborted. Stopped 2 sub-agents.", }); }); it("routes ACP sessions through the runtime branch and streams block replies", async () => { setNoAbort(); const runtime = createAcpRuntime([ { type: "text_delta", text: "hello " }, { type: "text_delta", text: "world" }, { type: "done" }, ]); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, stream: { coalesceIdleMs: 0, maxChunkChars: 128 }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", BodyForAgent: "write a test", }); const replyResolver = vi.fn(async () => ({ text: "fallback" }) as ReplyPayload); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(replyResolver).not.toHaveBeenCalled(); expect(runtime.ensureSession).toHaveBeenCalledWith( expect.objectContaining({ sessionKey: "agent:codex-acp:session-1", agent: "codex", mode: "persistent", }), ); const blockCalls = (dispatcher.sendBlockReply as ReturnType).mock.calls; expect(blockCalls.length).toBeGreaterThan(0); const streamedText = blockCalls.map((call) => (call[0] as ReplyPayload).text ?? "").join(""); expect(streamedText).toContain("hello"); expect(streamedText).toContain("world"); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect.objectContaining({ text: "hello world" }), ); }); it("aborts ACP dispatch promptly when the caller abort signal fires", async () => { setNoAbort(); let releaseTurn: (() => void) | undefined; const releasePromise = new Promise((resolve) => { releaseTurn = resolve; }); const runtime = { ensureSession: vi.fn( async (input: { sessionKey: string; mode: string; agent: string }) => ({ sessionKey: input.sessionKey, backend: "acpx", runtimeSessionName: `${input.sessionKey}:${input.mode}`, }) as { sessionKey: string; backend: string; runtimeSessionName: string }, ), runTurn: vi.fn(async function* (params: { signal?: AbortSignal }) { await new Promise((resolve) => { if (params.signal?.aborted) { resolve(); return; } const onAbort = () => resolve(); params.signal?.addEventListener("abort", onAbort, { once: true }); void releasePromise.then(resolve); }); yield { type: "done" }; }), cancel: vi.fn(async () => {}), close: vi.fn(async () => {}), }; acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const abortController = new AbortController(); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", BodyForAgent: "write a test", }); const dispatchPromise = dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyOptions: { abortSignal: abortController.signal }, }); await vi.waitFor(() => { expect(runtime.runTurn).toHaveBeenCalledTimes(1); }); abortController.abort(); const outcome = await Promise.race([ dispatchPromise.then(() => "settled" as const), new Promise<"pending">((resolve) => { setTimeout(() => resolve("pending"), 100); }), ]); releaseTurn?.(); await dispatchPromise; expect(outcome).toBe("settled"); }); it("posts a one-time resolved-session-id notice in thread after the first ACP turn", async () => { setNoAbort(); const runtime = createAcpRuntime([{ type: "text_delta", text: "hello" }, { type: "done" }]); const pendingAcp = { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", identity: { state: "pending" as const, source: "ensure" as const, lastUpdatedAt: Date.now(), acpxSessionId: "acpx-123", agentSessionId: "inner-123", }, mode: "persistent" as const, state: "idle" as const, lastActivityAt: Date.now(), }; const resolvedAcp = { ...pendingAcp, identity: { ...pendingAcp.identity, state: "resolved" as const, source: "status" as const, }, }; acpMocks.readAcpSessionEntry.mockImplementation(() => { const runTurnStarted = runtime.runTurn.mock.calls.length > 0; return { sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: runTurnStarted ? resolvedAcp : pendingAcp, }; }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", MessageThreadId: "thread-1", BodyForAgent: "show ids", }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver: vi.fn() }); const finalCalls = (dispatcher.sendFinalReply as ReturnType).mock.calls; expect(finalCalls.length).toBe(2); const noticePayload = finalCalls[1]?.[0] as ReplyPayload | undefined; expect(noticePayload?.text).toContain("Session ids resolved"); expect(noticePayload?.text).toContain("agent session id: inner-123"); expect(noticePayload?.text).toContain("acpx session id: acpx-123"); expect(noticePayload?.text).toContain("codex resume inner-123"); }); it("posts resolved-session-id notice when ACP session is bound even without MessageThreadId", async () => { setNoAbort(); const runtime = createAcpRuntime([{ type: "text_delta", text: "hello" }, { type: "done" }]); const pendingAcp = { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", identity: { state: "pending" as const, source: "ensure" as const, lastUpdatedAt: Date.now(), acpxSessionId: "acpx-123", agentSessionId: "inner-123", }, mode: "persistent" as const, state: "idle" as const, lastActivityAt: Date.now(), }; const resolvedAcp = { ...pendingAcp, identity: { ...pendingAcp.identity, state: "resolved" as const, source: "status" as const, }, }; acpMocks.readAcpSessionEntry.mockImplementation(() => { const runTurnStarted = runtime.runTurn.mock.calls.length > 0; return { sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: runTurnStarted ? resolvedAcp : pendingAcp, }; }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); sessionBindingMocks.listBySession.mockReturnValue([ { bindingId: "default:thread-1", targetSessionKey: "agent:codex-acp:session-1", targetKind: "session", conversation: { channel: "discord", accountId: "default", conversationId: "thread-1", }, status: "active", boundAt: Date.now(), }, ]); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", AccountId: "default", SessionKey: "agent:codex-acp:session-1", MessageThreadId: undefined, BodyForAgent: "show ids", }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver: vi.fn() }); const finalCalls = (dispatcher.sendFinalReply as ReturnType).mock.calls; expect(finalCalls.length).toBe(2); const noticePayload = finalCalls[1]?.[0] as ReplyPayload | undefined; expect(noticePayload?.text).toContain("Session ids resolved"); expect(noticePayload?.text).toContain("agent session id: inner-123"); expect(noticePayload?.text).toContain("acpx session id: acpx-123"); }); it("honors send-policy deny before ACP runtime dispatch", async () => { setNoAbort(); const runtime = createAcpRuntime([ { type: "text_delta", text: "should-not-run" }, { type: "done" }, ]); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, session: { sendPolicy: { default: "deny", }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", BodyForAgent: "write a test", }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher }); expect(runtime.runTurn).not.toHaveBeenCalled(); expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); it("routes ACP slash commands through the normal command pipeline", async () => { setNoAbort(); const runtime = createAcpRuntime([{ type: "done" }]); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, session: { sendPolicy: { default: "deny", }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", CommandBody: "/acp cancel", BodyForCommands: "/acp cancel", BodyForAgent: "/acp cancel", }); const replyResolver = vi.fn(async () => ({ text: "command output" }) as ReplyPayload); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(replyResolver).toHaveBeenCalledTimes(1); expect(runtime.runTurn).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "command output", }); }); it("routes ACP reset tails through ACP after command handling", async () => { setNoAbort(); const runtime = createAcpRuntime([ { type: "text_delta", text: "tail accepted" }, { type: "done" }, ]); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, session: { sendPolicy: { default: "deny", }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", CommandSource: "native", SessionKey: "discord:slash:owner", CommandTargetSessionKey: "agent:codex-acp:session-1", CommandBody: "/new continue with deployment", BodyForCommands: "/new continue with deployment", BodyForAgent: "/new continue with deployment", }); const replyResolver = vi.fn(async (resolverCtx: MsgContext) => { resolverCtx.Body = "continue with deployment"; resolverCtx.RawBody = "continue with deployment"; resolverCtx.CommandBody = "continue with deployment"; resolverCtx.BodyForCommands = "continue with deployment"; resolverCtx.BodyForAgent = "continue with deployment"; resolverCtx.AcpDispatchTailAfterReset = true; return undefined; }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(replyResolver).toHaveBeenCalledTimes(1); expect(runtime.runTurn).toHaveBeenCalledTimes(1); expect(runtime.runTurn.mock.calls[0]?.[0]).toMatchObject({ text: "continue with deployment", }); }); it("does not bypass ACP slash aliases when text commands are disabled on native surfaces", async () => { setNoAbort(); const runtime = createAcpRuntime([{ type: "done" }]); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, commands: { text: false, }, session: { sendPolicy: { default: "allow", }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", CommandBody: "/acp cancel", BodyForCommands: "/acp cancel", BodyForAgent: "/acp cancel", CommandSource: "text", }); const replyResolver = vi.fn(async () => ({ text: "should not bypass" }) as ReplyPayload); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(runtime.runTurn).toHaveBeenCalledTimes(1); expect(replyResolver).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); it("does not bypass ACP dispatch for unauthorized bang-prefixed messages", async () => { setNoAbort(); const runtime = createAcpRuntime([{ type: "done" }]); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, session: { sendPolicy: { default: "deny", }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", CommandBody: "!poll", BodyForCommands: "!poll", BodyForAgent: "!poll", CommandAuthorized: false, }); const replyResolver = vi.fn(async () => ({ text: "should not bypass" }) as ReplyPayload); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(runtime.runTurn).not.toHaveBeenCalled(); expect(replyResolver).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); it("does not bypass ACP dispatch for bang-prefixed messages when text commands are disabled", async () => { setNoAbort(); const runtime = createAcpRuntime([{ type: "done" }]); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, commands: { text: false, }, session: { sendPolicy: { default: "deny", }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", CommandBody: "!poll", BodyForCommands: "!poll", BodyForAgent: "!poll", CommandAuthorized: true, CommandSource: "text", }); const replyResolver = vi.fn(async () => ({ text: "should not bypass" }) as ReplyPayload); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(runtime.runTurn).not.toHaveBeenCalled(); expect(replyResolver).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); it("coalesces tiny ACP token deltas into normal Discord text spacing", async () => { setNoAbort(); const runtime = createAcpRuntime([ { type: "text_delta", text: "What" }, { type: "text_delta", text: " do" }, { type: "text_delta", text: " you" }, { type: "text_delta", text: " want" }, { type: "text_delta", text: " to" }, { type: "text_delta", text: " work" }, { type: "text_delta", text: " on?" }, { type: "done" }, ]); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, stream: { coalesceIdleMs: 0, maxChunkChars: 256 }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", BodyForAgent: "test spacing", }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher }); const blockTexts = (dispatcher.sendBlockReply as ReturnType).mock.calls .map((call) => ((call[0] as ReplyPayload).text ?? "").trim()) .filter(Boolean); expect(blockTexts).toEqual(["What do you want to work on?"]); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect.objectContaining({ text: "What do you want to work on?" }), ); }); it("generates final-mode TTS audio after ACP block streaming completes", async () => { setNoAbort(); ttsMocks.state.synthesizeFinalAudio = true; const runtime = createAcpRuntime([ { type: "text_delta", text: "Hello from ACP streaming." }, { type: "done" }, ]); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, stream: { coalesceIdleMs: 0, maxChunkChars: 256 }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", BodyForAgent: "stream this", }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher }); const finalPayload = (dispatcher.sendFinalReply as ReturnType).mock .calls[0]?.[0] as ReplyPayload | undefined; expect(finalPayload?.mediaUrl).toBe("https://example.com/tts-synth.opus"); expect(finalPayload?.text).toBeUndefined(); }); it("routes ACP block output to originating channel without parent dispatcher duplicates", async () => { setNoAbort(); mocks.routeReply.mockClear(); const runtime = createAcpRuntime([ { type: "text_delta", text: "thread chunk" }, { type: "done" }, ]); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, stream: { coalesceIdleMs: 0, maxChunkChars: 128 }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", OriginatingChannel: "telegram", OriginatingTo: "telegram:thread-1", SessionKey: "agent:codex-acp:session-1", BodyForAgent: "write a test", }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher }); expect(mocks.routeReply).toHaveBeenCalled(); expect(mocks.routeReply).toHaveBeenCalledWith( expect.objectContaining({ channel: "telegram", to: "telegram:thread-1", }), ); expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); it("closes oneshot ACP sessions after the turn completes", async () => { setNoAbort(); const runtime = createAcpRuntime([{ type: "done" }]); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:oneshot-1", storeSessionKey: "agent:codex-acp:oneshot-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:oneshot", mode: "oneshot", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockReturnValue({ id: "acpx", runtime, }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:oneshot-1", BodyForAgent: "run once", }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher }); expect(runtime.close).toHaveBeenCalledWith( expect.objectContaining({ reason: "oneshot-complete", }), ); }); it("emits an explicit ACP policy error when dispatch is disabled", async () => { setNoAbort(); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); const cfg = { acp: { enabled: true, dispatch: { enabled: false }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", BodyForAgent: "write a test", }); const replyResolver = vi.fn(async () => ({ text: "fallback" }) as ReplyPayload); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(replyResolver).not.toHaveBeenCalled(); expect(acpMocks.requireAcpRuntimeBackend).not.toHaveBeenCalled(); const finalPayload = (dispatcher.sendFinalReply as ReturnType).mock .calls[0]?.[0] as ReplyPayload | undefined; expect(finalPayload?.text).toContain("ACP dispatch is disabled by policy"); }); it("fails closed when ACP metadata is missing for an ACP session key", async () => { setNoAbort(); acpMocks.readAcpSessionEntry.mockReturnValue(null); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex:acp:session-1", BodyForAgent: "hello", }); const replyResolver = vi.fn(async () => ({ text: "fallback" }) as ReplyPayload); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(replyResolver).not.toHaveBeenCalled(); expect(acpMocks.requireAcpRuntimeBackend).not.toHaveBeenCalled(); const finalPayload = (dispatcher.sendFinalReply as ReturnType).mock .calls[0]?.[0] as ReplyPayload | undefined; expect(finalPayload?.text).toContain("ACP metadata is missing"); }); it("surfaces backend-missing ACP errors in-thread without falling back", async () => { setNoAbort(); acpMocks.readAcpSessionEntry.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", storeSessionKey: "agent:codex-acp:session-1", cfg: {}, storePath: "/tmp/mock-sessions.json", entry: {}, acp: { backend: "acpx", agent: "codex", runtimeSessionName: "runtime:1", mode: "persistent", state: "idle", lastActivityAt: Date.now(), }, }); acpMocks.requireAcpRuntimeBackend.mockImplementation(() => { throw new AcpRuntimeErrorClass( "ACP_BACKEND_MISSING", "ACP runtime backend is not configured. Install and enable the acpx runtime plugin.", ); }); const cfg = { acp: { enabled: true, dispatch: { enabled: true }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", SessionKey: "agent:codex-acp:session-1", BodyForAgent: "write a test", }); const replyResolver = vi.fn(async () => ({ text: "fallback" }) as ReplyPayload); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(replyResolver).not.toHaveBeenCalled(); const finalPayload = (dispatcher.sendFinalReply as ReturnType).mock .calls[0]?.[0] as ReplyPayload | undefined; expect(finalPayload?.text).toContain("ACP error (ACP_BACKEND_MISSING)"); expect(finalPayload?.text).toContain("Install and enable the acpx runtime plugin"); }); it("deduplicates inbound messages by MessageSid and origin", async () => { setNoAbort(); const cfg = emptyConfig; const ctx = buildTestCtx({ Provider: "whatsapp", OriginatingChannel: "whatsapp", OriginatingTo: "whatsapp:+15555550123", MessageSid: "msg-1", }); const replyResolver = vi.fn(async () => ({ text: "hi" }) as ReplyPayload); await dispatchTwiceWithFreshDispatchers({ ctx, cfg, replyResolver, }); expect(replyResolver).toHaveBeenCalledTimes(1); }); it("suppresses local discord exec approval tool prompts when discord approvals are enabled", async () => { setNoAbort(); const cfg = { channels: { discord: { enabled: true, execApprovals: { enabled: true, approvers: ["123"], }, }, }, } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", AccountId: "default", }); const replyResolver = vi.fn(async (_ctx: MsgContext, options?: GetReplyOptions) => { await options?.onToolResult?.({ text: "Approval required.", channelData: { execApproval: { approvalId: "12345678-1234-1234-1234-123456789012", approvalSlug: "12345678", allowedDecisions: ["allow-once", "allow-always", "deny"], }, }, }); return { text: "done" } as ReplyPayload; }); await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(dispatcher.sendToolResult).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect.objectContaining({ text: "done" }), ); }); it("deduplicates same-agent inbound replies across main and direct session keys", async () => { setNoAbort(); const cfg = emptyConfig; const replyResolver = vi.fn(async () => ({ text: "hi" }) as ReplyPayload); const baseCtx = buildTestCtx({ Provider: "telegram", Surface: "telegram", OriginatingChannel: "telegram", OriginatingTo: "telegram:7463849194", MessageSid: "msg-1", SessionKey: "agent:main:main", }); await dispatchReplyFromConfig({ ctx: baseCtx, cfg, dispatcher: createDispatcher(), replyResolver, }); await dispatchReplyFromConfig({ ctx: { ...baseCtx, SessionKey: "agent:main:telegram:direct:7463849194", }, cfg, dispatcher: createDispatcher(), replyResolver, }); expect(replyResolver).toHaveBeenCalledTimes(1); }); it("emits message_received hook with originating channel metadata", async () => { setNoAbort(); hookMocks.runner.hasHooks.mockReturnValue(true); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "slack", Surface: "slack", OriginatingChannel: "Telegram", OriginatingTo: "telegram:999", CommandBody: "/search hello", RawBody: "raw text", Body: "body text", Timestamp: 1710000000000, MessageSidFull: "sid-full", SenderId: "user-1", SenderName: "Alice", SenderUsername: "alice", SenderE164: "+15555550123", AccountId: "acc-1", GroupSpace: "guild-123", GroupChannel: "alerts", }); const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(hookMocks.runner.runMessageReceived).toHaveBeenCalledWith( expect.objectContaining({ from: ctx.From, content: "/search hello", timestamp: 1710000000000, metadata: expect.objectContaining({ originatingChannel: "Telegram", originatingTo: "telegram:999", messageId: "sid-full", senderId: "user-1", senderName: "Alice", senderUsername: "alice", senderE164: "+15555550123", guildId: "guild-123", channelName: "alerts", }), }), expect.objectContaining({ channelId: "telegram", accountId: "acc-1", conversationId: "telegram:999", }), ); }); it("does not broadcast inbound claims without a core-owned plugin binding", async () => { setNoAbort(); hookMocks.runner.hasHooks.mockImplementation( ((hookName?: string) => hookName === "inbound_claim" || hookName === "message_received") as () => boolean, ); hookMocks.runner.runInboundClaim.mockResolvedValue({ handled: true } as never); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", Surface: "telegram", OriginatingChannel: "telegram", OriginatingTo: "telegram:-10099", To: "telegram:-10099", AccountId: "default", SenderId: "user-9", SenderUsername: "ada", MessageThreadId: 77, CommandAuthorized: true, WasMentioned: true, CommandBody: "who are you", RawBody: "who are you", Body: "who are you", MessageSid: "msg-claim-1", SessionKey: "agent:main:telegram:group:-10099:77", }); const replyResolver = vi.fn(async () => ({ text: "core reply" }) satisfies ReplyPayload); const result = await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(result).toEqual({ queuedFinal: true, counts: { tool: 0, block: 0, final: 0 } }); expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); expect(hookMocks.runner.runMessageReceived).toHaveBeenCalledWith( expect.objectContaining({ from: ctx.From, content: "who are you", metadata: expect.objectContaining({ messageId: "msg-claim-1", originatingChannel: "telegram", originatingTo: "telegram:-10099", senderId: "user-9", senderUsername: "ada", threadId: 77, }), }), expect.objectContaining({ channelId: "telegram", accountId: "default", conversationId: "telegram:-10099", }), ); expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledWith( expect.objectContaining({ type: "message", action: "received", sessionKey: "agent:main:telegram:group:-10099:77", }), ); expect(replyResolver).toHaveBeenCalledTimes(1); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect.objectContaining({ text: "core reply" }), ); }); it("emits internal message:received hook when a session key is available", async () => { setNoAbort(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", Surface: "telegram", SessionKey: "agent:main:main", CommandBody: "/help", MessageSid: "msg-42", GroupSpace: "guild-456", GroupChannel: "ops-room", }); const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledWith( "message", "received", "agent:main:main", expect.objectContaining({ from: ctx.From, content: "/help", channelId: "telegram", messageId: "msg-42", metadata: expect.objectContaining({ guildId: "guild-456", channelName: "ops-room", }), }), ); expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1); }); it("skips internal message:received hook when session key is unavailable", async () => { setNoAbort(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", Surface: "telegram", CommandBody: "/help", }); (ctx as MsgContext).SessionKey = undefined; const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(internalHookMocks.createInternalHookEvent).not.toHaveBeenCalled(); expect(internalHookMocks.triggerInternalHook).not.toHaveBeenCalled(); }); it("emits diagnostics when enabled", async () => { setNoAbort(); const cfg = { diagnostics: { enabled: true } } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "slack", Surface: "slack", SessionKey: "agent:main:main", MessageSid: "msg-1", To: "slack:C123", }); const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(diagnosticMocks.logMessageQueued).toHaveBeenCalledTimes(1); expect(diagnosticMocks.logSessionStateChange).toHaveBeenCalledWith({ sessionKey: "agent:main:main", state: "processing", reason: "message_start", }); expect(diagnosticMocks.logMessageProcessed).toHaveBeenCalledWith( expect.objectContaining({ channel: "slack", outcome: "completed", sessionKey: "agent:main:main", }), ); }); it("routes plugin-owned bindings to the owning plugin before generic inbound claim broadcast", async () => { setNoAbort(); hookMocks.runner.hasHooks.mockImplementation( ((hookName?: string) => hookName === "inbound_claim" || hookName === "message_received") as () => boolean, ); hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }]; hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ status: "handled", result: { handled: true }, }); sessionBindingMocks.resolveByConversation.mockReturnValue({ bindingId: "binding-1", targetSessionKey: "plugin-binding:codex:abc123", targetKind: "session", conversation: { channel: "discord", accountId: "default", conversationId: "channel:1481858418548412579", }, status: "active", boundAt: 1710000000000, metadata: { pluginBindingOwner: "plugin", pluginId: "openclaw-codex-app-server", pluginRoot: "/Users/huntharo/github/openclaw-app-server", }, } satisfies SessionBindingRecord); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", OriginatingChannel: "discord", OriginatingTo: "discord:channel:1481858418548412579", To: "discord:channel:1481858418548412579", AccountId: "default", SenderId: "user-9", SenderUsername: "ada", CommandAuthorized: true, WasMentioned: false, CommandBody: "who are you", RawBody: "who are you", Body: "who are you", MessageSid: "msg-claim-plugin-1", SessionKey: "agent:main:discord:channel:1481858418548412579", }); const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload); const result = await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }); expect(sessionBindingMocks.touch).toHaveBeenCalledWith("binding-1"); expect(hookMocks.runner.runInboundClaimForPluginOutcome).toHaveBeenCalledWith( "openclaw-codex-app-server", expect.objectContaining({ channel: "discord", accountId: "default", conversationId: "channel:1481858418548412579", content: "who are you", }), expect.objectContaining({ channelId: "discord", accountId: "default", conversationId: "channel:1481858418548412579", }), ); expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); expect(replyResolver).not.toHaveBeenCalled(); }); it("routes plugin-owned Discord DM bindings to the owning plugin before generic inbound claim broadcast", async () => { setNoAbort(); hookMocks.runner.hasHooks.mockImplementation( ((hookName?: string) => hookName === "inbound_claim" || hookName === "message_received") as () => boolean, ); hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }]; hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ status: "handled", result: { handled: true }, }); sessionBindingMocks.resolveByConversation.mockReturnValue({ bindingId: "binding-dm-1", targetSessionKey: "plugin-binding:codex:dm123", targetKind: "session", conversation: { channel: "discord", accountId: "default", conversationId: "user:1177378744822943744", }, status: "active", boundAt: 1710000000000, metadata: { pluginBindingOwner: "plugin", pluginId: "openclaw-codex-app-server", pluginRoot: "/Users/huntharo/github/openclaw-app-server", }, } satisfies SessionBindingRecord); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "discord", Surface: "discord", OriginatingChannel: "discord", From: "discord:1177378744822943744", OriginatingTo: "channel:1480574946919846079", To: "channel:1480574946919846079", AccountId: "default", SenderId: "user-9", SenderUsername: "ada", CommandAuthorized: true, WasMentioned: false, CommandBody: "who are you", RawBody: "who are you", Body: "who are you", MessageSid: "msg-claim-plugin-dm-1", SessionKey: "agent:main:discord:user:1177378744822943744", }); const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload); const result = await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(result).toEqual({ queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } }); expect(sessionBindingMocks.touch).toHaveBeenCalledWith("binding-dm-1"); expect(hookMocks.runner.runInboundClaimForPluginOutcome).toHaveBeenCalledWith( "openclaw-codex-app-server", expect.objectContaining({ channel: "discord", accountId: "default", conversationId: "user:1177378744822943744", content: "who are you", }), expect.objectContaining({ channelId: "discord", accountId: "default", conversationId: "user:1177378744822943744", }), ); expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); expect(replyResolver).not.toHaveBeenCalled(); }); it("falls back to OpenClaw once per startup when a bound plugin is missing", async () => { setNoAbort(); hookMocks.runner.hasHooks.mockImplementation( ((hookName?: string) => hookName === "inbound_claim" || hookName === "message_received") as () => boolean, ); hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ status: "missing_plugin", }); sessionBindingMocks.resolveByConversation.mockReturnValue({ bindingId: "binding-missing-1", targetSessionKey: "plugin-binding:codex:missing123", targetKind: "session", conversation: { channel: "discord", accountId: "default", conversationId: "channel:missing-plugin", }, status: "active", boundAt: 1710000000000, metadata: { pluginBindingOwner: "plugin", pluginId: "openclaw-codex-app-server", pluginName: "Codex App Server", pluginRoot: "/Users/huntharo/github/openclaw-app-server", detachHint: "/codex_detach", }, } satisfies SessionBindingRecord); const replyResolver = vi.fn(async () => ({ text: "openclaw fallback" }) satisfies ReplyPayload); const firstDispatcher = createDispatcher(); await dispatchReplyFromConfig({ ctx: buildTestCtx({ Provider: "discord", Surface: "discord", OriginatingChannel: "discord", OriginatingTo: "discord:channel:missing-plugin", To: "discord:channel:missing-plugin", AccountId: "default", MessageSid: "msg-missing-plugin-1", SessionKey: "agent:main:discord:channel:missing-plugin", CommandBody: "hello", RawBody: "hello", Body: "hello", }), cfg: emptyConfig, dispatcher: firstDispatcher, replyResolver, }); const firstNotice = (firstDispatcher.sendToolResult as ReturnType).mock .calls[0]?.[0] as ReplyPayload | undefined; expect(firstNotice?.text).toContain("Routing this message to OpenClaw instead."); expect(firstNotice?.text).toContain("/codex_detach"); expect(replyResolver).toHaveBeenCalledTimes(1); expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); replyResolver.mockClear(); hookMocks.runner.runInboundClaim.mockClear(); const secondDispatcher = createDispatcher(); await dispatchReplyFromConfig({ ctx: buildTestCtx({ Provider: "discord", Surface: "discord", OriginatingChannel: "discord", OriginatingTo: "discord:channel:missing-plugin", To: "discord:channel:missing-plugin", AccountId: "default", MessageSid: "msg-missing-plugin-2", SessionKey: "agent:main:discord:channel:missing-plugin", CommandBody: "still there?", RawBody: "still there?", Body: "still there?", }), cfg: emptyConfig, dispatcher: secondDispatcher, replyResolver, }); expect(secondDispatcher.sendToolResult).not.toHaveBeenCalled(); expect(replyResolver).toHaveBeenCalledTimes(1); expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); }); it("falls back to OpenClaw when the bound plugin is loaded but has no inbound_claim handler", async () => { setNoAbort(); hookMocks.runner.hasHooks.mockImplementation( ((hookName?: string) => hookName === "inbound_claim" || hookName === "message_received") as () => boolean, ); hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }]; hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ status: "no_handler", }); sessionBindingMocks.resolveByConversation.mockReturnValue({ bindingId: "binding-no-handler-1", targetSessionKey: "plugin-binding:codex:nohandler123", targetKind: "session", conversation: { channel: "discord", accountId: "default", conversationId: "channel:no-handler", }, status: "active", boundAt: 1710000000000, metadata: { pluginBindingOwner: "plugin", pluginId: "openclaw-codex-app-server", pluginName: "Codex App Server", pluginRoot: "/Users/huntharo/github/openclaw-app-server", }, } satisfies SessionBindingRecord); const dispatcher = createDispatcher(); const replyResolver = vi.fn(async () => ({ text: "openclaw fallback" }) satisfies ReplyPayload); await dispatchReplyFromConfig({ ctx: buildTestCtx({ Provider: "discord", Surface: "discord", OriginatingChannel: "discord", OriginatingTo: "discord:channel:no-handler", To: "discord:channel:no-handler", AccountId: "default", MessageSid: "msg-no-handler-1", SessionKey: "agent:main:discord:channel:no-handler", CommandBody: "hello", RawBody: "hello", Body: "hello", }), cfg: emptyConfig, dispatcher, replyResolver, }); const notice = (dispatcher.sendToolResult as ReturnType).mock.calls[0]?.[0] as | ReplyPayload | undefined; expect(notice?.text).toContain("Routing this message to OpenClaw instead."); expect(replyResolver).toHaveBeenCalledTimes(1); expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); }); it("notifies the user when a bound plugin declines the turn and keeps the binding attached", async () => { setNoAbort(); hookMocks.runner.hasHooks.mockImplementation( ((hookName?: string) => hookName === "inbound_claim" || hookName === "message_received") as () => boolean, ); hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }]; hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ status: "declined", }); sessionBindingMocks.resolveByConversation.mockReturnValue({ bindingId: "binding-declined-1", targetSessionKey: "plugin-binding:codex:declined123", targetKind: "session", conversation: { channel: "discord", accountId: "default", conversationId: "channel:declined", }, status: "active", boundAt: 1710000000000, metadata: { pluginBindingOwner: "plugin", pluginId: "openclaw-codex-app-server", pluginName: "Codex App Server", pluginRoot: "/Users/huntharo/github/openclaw-app-server", detachHint: "/codex_detach", }, } satisfies SessionBindingRecord); const dispatcher = createDispatcher(); const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload); await dispatchReplyFromConfig({ ctx: buildTestCtx({ Provider: "discord", Surface: "discord", OriginatingChannel: "discord", OriginatingTo: "discord:channel:declined", To: "discord:channel:declined", AccountId: "default", MessageSid: "msg-declined-1", SessionKey: "agent:main:discord:channel:declined", CommandBody: "hello", RawBody: "hello", Body: "hello", }), cfg: emptyConfig, dispatcher, replyResolver, }); const finalNotice = (dispatcher.sendFinalReply as ReturnType).mock .calls[0]?.[0] as ReplyPayload | undefined; expect(finalNotice?.text).toContain("did not handle this message"); expect(finalNotice?.text).toContain("/codex_detach"); expect(replyResolver).not.toHaveBeenCalled(); expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); }); it("notifies the user when a bound plugin errors and keeps raw details out of the reply", async () => { setNoAbort(); hookMocks.runner.hasHooks.mockImplementation( ((hookName?: string) => hookName === "inbound_claim" || hookName === "message_received") as () => boolean, ); hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }]; hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ status: "error", error: "boom", }); sessionBindingMocks.resolveByConversation.mockReturnValue({ bindingId: "binding-error-1", targetSessionKey: "plugin-binding:codex:error123", targetKind: "session", conversation: { channel: "discord", accountId: "default", conversationId: "channel:error", }, status: "active", boundAt: 1710000000000, metadata: { pluginBindingOwner: "plugin", pluginId: "openclaw-codex-app-server", pluginName: "Codex App Server", pluginRoot: "/Users/huntharo/github/openclaw-app-server", }, } satisfies SessionBindingRecord); const dispatcher = createDispatcher(); const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload); await dispatchReplyFromConfig({ ctx: buildTestCtx({ Provider: "discord", Surface: "discord", OriginatingChannel: "discord", OriginatingTo: "discord:channel:error", To: "discord:channel:error", AccountId: "default", MessageSid: "msg-error-1", SessionKey: "agent:main:discord:channel:error", CommandBody: "hello", RawBody: "hello", Body: "hello", }), cfg: emptyConfig, dispatcher, replyResolver, }); const finalNotice = (dispatcher.sendFinalReply as ReturnType).mock .calls[0]?.[0] as ReplyPayload | undefined; expect(finalNotice?.text).toContain("hit an error handling this message"); expect(finalNotice?.text).not.toContain("boom"); expect(replyResolver).not.toHaveBeenCalled(); expect(hookMocks.runner.runInboundClaim).not.toHaveBeenCalled(); }); it("marks diagnostics skipped for duplicate inbound messages", async () => { setNoAbort(); const cfg = { diagnostics: { enabled: true } } as OpenClawConfig; const ctx = buildTestCtx({ Provider: "whatsapp", OriginatingChannel: "whatsapp", OriginatingTo: "whatsapp:+15555550123", MessageSid: "msg-dup", }); const replyResolver = vi.fn(async () => ({ text: "hi" }) as ReplyPayload); await dispatchTwiceWithFreshDispatchers({ ctx, cfg, replyResolver, }); expect(replyResolver).toHaveBeenCalledTimes(1); expect(diagnosticMocks.logMessageProcessed).toHaveBeenCalledWith( expect.objectContaining({ channel: "whatsapp", outcome: "skipped", reason: "duplicate", }), ); }); it("passes configOverride to replyResolver when provided", async () => { setNoAbort(); const cfg = emptyConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "msteams", Surface: "msteams" }); const overrideCfg = { agents: { defaults: { userTimezone: "America/New_York" } }, } as OpenClawConfig; let receivedCfg: OpenClawConfig | undefined; const replyResolver = async ( _ctx: MsgContext, _opts?: GetReplyOptions, cfgArg?: OpenClawConfig, ) => { receivedCfg = cfgArg; return { text: "hi" } satisfies ReplyPayload; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver, configOverride: overrideCfg, }); expect(receivedCfg).toBe(overrideCfg); }); it("does not pass cfg as implicit configOverride when configOverride is not provided", async () => { setNoAbort(); const cfg = { agents: { defaults: { userTimezone: "UTC" } } } as OpenClawConfig; const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "telegram", Surface: "telegram" }); let receivedCfg: OpenClawConfig | undefined; const replyResolver = async ( _ctx: MsgContext, _opts?: GetReplyOptions, cfgArg?: OpenClawConfig, ) => { receivedCfg = cfgArg; return { text: "hi" } satisfies ReplyPayload; }; await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); expect(receivedCfg).toBeUndefined(); }); it("suppresses isReasoning payloads from final replies (WhatsApp channel)", async () => { setNoAbort(); const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "whatsapp" }); const replyResolver = async () => [ { text: "Reasoning:\n_thinking..._", isReasoning: true }, { text: "The answer is 42" }, ] satisfies ReplyPayload[]; await dispatchReplyFromConfig({ ctx, cfg: emptyConfig, dispatcher, replyResolver }); const finalCalls = (dispatcher.sendFinalReply as ReturnType).mock.calls; expect(finalCalls).toHaveLength(1); expect(finalCalls[0][0]).toMatchObject({ text: "The answer is 42" }); }); it("suppresses isReasoning payloads from block replies (generic dispatch path)", async () => { setNoAbort(); const dispatcher = createDispatcher(); const ctx = buildTestCtx({ Provider: "whatsapp" }); const blockReplySentTexts: string[] = []; const replyResolver = async ( _ctx: MsgContext, opts?: GetReplyOptions, ): Promise => { // Simulate block reply with reasoning payload await opts?.onBlockReply?.({ text: "Reasoning:\n_thinking..._", isReasoning: true }); await opts?.onBlockReply?.({ text: "The answer is 42" }); return { text: "The answer is 42" }; }; // Capture what actually gets dispatched as block replies (dispatcher.sendBlockReply as ReturnType).mockImplementation( (payload: ReplyPayload) => { if (payload.text) { blockReplySentTexts.push(payload.text); } return true; }, ); await dispatchReplyFromConfig({ ctx, cfg: emptyConfig, dispatcher, replyResolver }); expect(blockReplySentTexts).not.toContain("Reasoning:\n_thinking..._"); expect(blockReplySentTexts).toContain("The answer is 42"); }); }); describe("before_dispatch hook", () => { const createHookCtx = (overrides: Partial = {}) => buildTestCtx({ Body: "hello", BodyForAgent: "hello", BodyForCommands: "hello", From: "user1", Surface: "telegram", ChatType: "private", ...overrides, }); beforeEach(async () => { vi.resetModules(); ({ dispatchReplyFromConfig } = await import("./dispatch-from-config.js")); ({ resetInboundDedupe } = await import("./inbound-dedupe.js")); resetInboundDedupe(); mocks.routeReply.mockReset(); mocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" }); ttsMocks.state.synthesizeFinalAudio = false; ttsMocks.maybeApplyTtsToPayload.mockClear(); setNoAbort(); hookMocks.runner.runBeforeDispatch.mockClear(); hookMocks.runner.runBeforeDispatch.mockResolvedValue(undefined); hookMocks.runner.hasHooks.mockImplementation( (hookName?: string) => hookName === "before_dispatch", ); }); it("skips model dispatch when hook returns handled", async () => { hookMocks.runner.runBeforeDispatch.mockResolvedValue({ handled: true, text: "Blocked" }); const dispatcher = createDispatcher(); const result = await dispatchReplyFromConfig({ ctx: createHookCtx(), cfg: emptyConfig, dispatcher, }); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "Blocked" }); expect(result.queuedFinal).toBe(true); }); it("silently short-circuits when hook returns handled without text", async () => { hookMocks.runner.runBeforeDispatch.mockResolvedValue({ handled: true }); const dispatcher = createDispatcher(); const result = await dispatchReplyFromConfig({ ctx: createHookCtx(), cfg: emptyConfig, dispatcher, }); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); expect(result.queuedFinal).toBe(false); }); it("uses canonical hook metadata and shared routed final delivery", async () => { ttsMocks.state.synthesizeFinalAudio = true; hookMocks.runner.runBeforeDispatch.mockResolvedValue({ handled: true, text: "Blocked" }); const dispatcher = createDispatcher(); const ctx = createHookCtx({ Body: "raw body", BodyForAgent: "agent body", BodyForCommands: "command body", Provider: "slack", Surface: "slack", OriginatingChannel: "telegram", OriginatingTo: "telegram:999", From: "signal:group:ops-room", SenderId: "signal:user:alice", GroupChannel: "ops-room", ChatType: "direct", Timestamp: 123, }); const result = await dispatchReplyFromConfig({ ctx, cfg: emptyConfig, dispatcher }); expect(hookMocks.runner.runBeforeDispatch).toHaveBeenCalledWith( expect.objectContaining({ content: "command body", body: "agent body", channel: "telegram", senderId: "signal:user:alice", isGroup: true, timestamp: 123, }), expect.objectContaining({ channelId: "telegram", senderId: "signal:user:alice", }), ); expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); expect(mocks.routeReply).toHaveBeenCalledWith( expect.objectContaining({ channel: "telegram", to: "telegram:999", payload: expect.objectContaining({ text: "Blocked", mediaUrl: "https://example.com/tts-synth.opus", audioAsVoice: true, }), }), ); expect(result.queuedFinal).toBe(true); }); it("continues default dispatch when hook returns not handled", async () => { hookMocks.runner.runBeforeDispatch.mockResolvedValue({ handled: false }); const dispatcher = createDispatcher(); await dispatchReplyFromConfig({ ctx: createHookCtx(), cfg: emptyConfig, dispatcher, replyResolver: async () => ({ text: "model reply" }), }); expect(hookMocks.runner.runBeforeDispatch).toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "model reply" }); }); });