From 0e96c82ce8d2a1a5d34185029d35c8549e217eec Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 6 Apr 2026 02:43:19 +0100 Subject: [PATCH] test(auto-reply): split ACP and reply-dispatch regressions --- .../dispatch-from-config.acp-abort.test.ts | 551 ++++++++++++++++++ ...ispatch-from-config.reply-dispatch.test.ts | 445 ++++++++++++++ .../reply/dispatch-from-config.test.ts | 528 +---------------- 3 files changed, 1010 insertions(+), 514 deletions(-) create mode 100644 src/auto-reply/reply/dispatch-from-config.acp-abort.test.ts create mode 100644 src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts diff --git a/src/auto-reply/reply/dispatch-from-config.acp-abort.test.ts b/src/auto-reply/reply/dispatch-from-config.acp-abort.test.ts new file mode 100644 index 00000000000..4518e564d3b --- /dev/null +++ b/src/auto-reply/reply/dispatch-from-config.acp-abort.test.ts @@ -0,0 +1,551 @@ +import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../../config/config.js"; +import type { SessionBindingRecord } from "../../infra/outbound/session-binding-service.js"; +import type { + AcpRuntime, + AcpRuntimeEnsureInput, + AcpRuntimeEvent, + AcpRuntimeHandle, + AcpRuntimeTurnInput, +} from "../../plugin-sdk/acp-runtime.js"; +import type { + PluginHookBeforeDispatchResult, + PluginHookReplyDispatchResult, + PluginTargetedInboundClaimOutcome, +} from "../../plugins/hooks.js"; +import { setActivePluginRegistry } from "../../plugins/runtime.js"; +import { + createChannelTestPluginBase, + createTestRegistry, +} from "../../test-utils/channel-plugins.js"; +import { createInternalHookEventPayload } from "../../test-utils/internal-hook-event-payload.js"; +import type { ReplyPayload } from "../types.js"; +import type { ReplyDispatcher } from "./reply-dispatcher.js"; +import { buildTestCtx } from "./test-ctx.js"; + +type AbortResult = { handled: boolean; aborted: boolean; stoppedSubagents?: number }; + +const mocks = vi.hoisted(() => ({ + routeReply: vi.fn(async (_params: unknown) => ({ ok: true, messageId: "mock" })), + tryFastAbortFromMessage: vi.fn<() => Promise>(async () => ({ + handled: false, + aborted: false, + })), +})); +const diagnosticMocks = vi.hoisted(() => ({ + logMessageQueued: vi.fn(), + logMessageProcessed: vi.fn(), + logSessionStateChange: vi.fn(), +})); +const hookMocks = vi.hoisted(() => ({ + registry: { + plugins: [] as Array<{ id: string; status: "loaded" | "disabled" | "error" }>, + }, + runner: { + hasHooks: vi.fn<(hookName?: string) => boolean>(() => false), + runInboundClaim: vi.fn(async () => undefined), + runInboundClaimForPlugin: vi.fn(async () => undefined), + runInboundClaimForPluginOutcome: vi.fn<() => Promise>( + async () => ({ status: "no_handler" as const }), + ), + runMessageReceived: vi.fn(async () => {}), + runBeforeDispatch: vi.fn< + (_event: unknown, _ctx: unknown) => Promise + >(async () => undefined), + runReplyDispatch: vi.fn< + (_event: unknown, _ctx: unknown) => Promise + >(async () => undefined), + }, +})); +const internalHookMocks = vi.hoisted(() => ({ + createInternalHookEvent: vi.fn(), + triggerInternalHook: vi.fn(async () => {}), +})); +const acpMocks = vi.hoisted(() => ({ + listAcpSessionEntries: vi.fn(async () => []), + readAcpSessionEntry: vi.fn<(params: { sessionKey: string; cfg?: OpenClawConfig }) => unknown>( + () => null, + ), + upsertAcpSessionMeta: vi.fn< + (params: { + sessionKey: string; + cfg?: OpenClawConfig; + mutate: ( + current: Record | undefined, + entry: { acp?: Record } | undefined, + ) => Record | null | undefined; + }) => Promise + >(async () => null), + requireAcpRuntimeBackend: vi.fn<() => unknown>(), +})); +const sessionBindingMocks = vi.hoisted(() => ({ + listBySession: vi.fn<(targetSessionKey: string) => SessionBindingRecord[]>(() => []), + resolveByConversation: vi.fn< + (ref: { + channel: string; + accountId: string; + conversationId: string; + parentConversationId?: string; + }) => SessionBindingRecord | null + >(() => null), + touch: vi.fn(), +})); +const pluginConversationBindingMocks = vi.hoisted(() => ({ + shownFallbackNoticeBindingIds: new Set(), +})); +const sessionStoreMocks = vi.hoisted(() => ({ + currentEntry: undefined as Record | undefined, + loadSessionStore: vi.fn(() => ({})), + resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"), + resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })), +})); +const acpManagerRuntimeMocks = vi.hoisted(() => ({ + getAcpSessionManager: vi.fn(), +})); +const agentEventMocks = vi.hoisted(() => ({ + emitAgentEvent: vi.fn(), + onAgentEvent: vi.fn<(listener: unknown) => () => void>(() => () => {}), +})); +const ttsMocks = vi.hoisted(() => ({ + maybeApplyTtsToPayload: vi.fn(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { payload: ReplyPayload }; + return params.payload; + }), + normalizeTtsAutoMode: vi.fn((value: unknown) => (typeof value === "string" ? value : undefined)), + resolveTtsConfig: vi.fn((_cfg: OpenClawConfig) => ({ mode: "final" })), +})); +const threadInfoMocks = vi.hoisted(() => ({ + parseSessionThreadInfo: vi.fn< + (sessionKey: string | undefined) => { + baseSessionKey: string | undefined; + threadId: string | undefined; + } + >(), +})); + +function parseGenericThreadSessionInfo(sessionKey: string | undefined) { + const trimmed = sessionKey?.trim(); + if (!trimmed) { + return { baseSessionKey: undefined, threadId: undefined }; + } + const threadMarker = ":thread:"; + const topicMarker = ":topic:"; + const marker = trimmed.includes(threadMarker) + ? threadMarker + : trimmed.includes(topicMarker) + ? topicMarker + : undefined; + if (!marker) { + return { baseSessionKey: trimmed, threadId: undefined }; + } + const index = trimmed.lastIndexOf(marker); + if (index < 0) { + return { baseSessionKey: trimmed, threadId: undefined }; + } + const baseSessionKey = trimmed.slice(0, index).trim() || undefined; + const threadId = trimmed.slice(index + marker.length).trim() || undefined; + return { baseSessionKey, threadId }; +} + +vi.mock("./route-reply.runtime.js", () => ({ + isRoutableChannel: () => true, + routeReply: mocks.routeReply, +})); +vi.mock("./route-reply.js", () => ({ + isRoutableChannel: () => true, + routeReply: mocks.routeReply, +})); +vi.mock("./abort.runtime.js", () => ({ + tryFastAbortFromMessage: mocks.tryFastAbortFromMessage, + formatAbortReplyText: () => "⚙️ Agent was aborted.", +})); +vi.mock("../../logging/diagnostic.js", () => ({ + logMessageQueued: diagnosticMocks.logMessageQueued, + logMessageProcessed: diagnosticMocks.logMessageProcessed, + logSessionStateChange: diagnosticMocks.logSessionStateChange, +})); +vi.mock("../../config/sessions/thread-info.js", () => ({ + parseSessionThreadInfo: (sessionKey: string | undefined) => + threadInfoMocks.parseSessionThreadInfo(sessionKey), +})); +vi.mock("./dispatch-from-config.runtime.js", () => ({ + createInternalHookEvent: internalHookMocks.createInternalHookEvent, + loadSessionStore: sessionStoreMocks.loadSessionStore, + resolveSessionStoreEntry: sessionStoreMocks.resolveSessionStoreEntry, + resolveStorePath: sessionStoreMocks.resolveStorePath, + triggerInternalHook: internalHookMocks.triggerInternalHook, +})); +vi.mock("../../plugins/hook-runner-global.js", () => ({ + getGlobalHookRunner: () => hookMocks.runner, + getGlobalPluginRegistry: () => hookMocks.registry, +})); +vi.mock("../../acp/runtime/session-meta.js", () => ({ + listAcpSessionEntries: acpMocks.listAcpSessionEntries, + readAcpSessionEntry: acpMocks.readAcpSessionEntry, + upsertAcpSessionMeta: acpMocks.upsertAcpSessionMeta, +})); +vi.mock("../../acp/runtime/registry.js", () => ({ + requireAcpRuntimeBackend: acpMocks.requireAcpRuntimeBackend, +})); +vi.mock("../../infra/outbound/session-binding-service.js", () => ({ + getSessionBindingService: () => ({ + bind: vi.fn(async () => { + throw new Error("bind not mocked"); + }), + getCapabilities: vi.fn(() => ({ + adapterAvailable: true, + bindSupported: true, + unbindSupported: true, + placements: ["current", "child"] as const, + })), + listBySession: (targetSessionKey: string) => + sessionBindingMocks.listBySession(targetSessionKey), + resolveByConversation: sessionBindingMocks.resolveByConversation, + touch: sessionBindingMocks.touch, + unbind: vi.fn(async () => []), + }), +})); +vi.mock("../../infra/agent-events.js", () => ({ + emitAgentEvent: (params: unknown) => agentEventMocks.emitAgentEvent(params), + onAgentEvent: (listener: unknown) => agentEventMocks.onAgentEvent(listener), +})); +vi.mock("../../plugins/conversation-binding.js", () => ({ + buildPluginBindingDeclinedText: () => "Plugin binding request was declined.", + buildPluginBindingErrorText: () => "Plugin binding request failed.", + buildPluginBindingUnavailableText: (binding: { pluginName?: string; pluginId: string }) => + `${binding.pluginName ?? binding.pluginId} is not currently loaded.`, + hasShownPluginBindingFallbackNotice: (bindingId: string) => + pluginConversationBindingMocks.shownFallbackNoticeBindingIds.has(bindingId), + isPluginOwnedSessionBindingRecord: ( + record: SessionBindingRecord | null | undefined, + ): record is SessionBindingRecord => + record?.metadata != null && + typeof record.metadata === "object" && + (record.metadata as { pluginBindingOwner?: string }).pluginBindingOwner === "plugin", + markPluginBindingFallbackNoticeShown: (bindingId: string) => { + pluginConversationBindingMocks.shownFallbackNoticeBindingIds.add(bindingId); + }, + toPluginConversationBinding: (record: SessionBindingRecord) => ({ + bindingId: record.bindingId, + pluginId: "unknown-plugin", + pluginName: undefined, + pluginRoot: "", + channel: record.conversation.channel, + accountId: record.conversation.accountId, + conversationId: record.conversation.conversationId, + parentConversationId: record.conversation.parentConversationId, + }), +})); +vi.mock("./dispatch-acp-manager.runtime.js", () => ({ + getAcpSessionManager: () => acpManagerRuntimeMocks.getAcpSessionManager(), + getSessionBindingService: () => ({ + listBySession: (targetSessionKey: string) => + sessionBindingMocks.listBySession(targetSessionKey), + unbind: vi.fn(async () => []), + }), +})); +vi.mock("../../tts/tts.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), + normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value), + resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg), +})); +vi.mock("../../tts/tts.runtime.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), +})); +vi.mock("../../tts/status-config.js", () => ({ + resolveStatusTtsSnapshot: () => ({ + autoMode: "always", + provider: "auto", + maxLength: 1500, + summarize: true, + }), +})); +vi.mock("./dispatch-acp-tts.runtime.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), +})); +vi.mock("./dispatch-acp-session.runtime.js", () => ({ + readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) => + acpMocks.readAcpSessionEntry(params), +})); +vi.mock("../../tts/tts-config.js", () => ({ + normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value), + resolveConfiguredTtsMode: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg).mode, +})); + +const noAbortResult = { handled: false, aborted: false } as const; +let dispatchReplyFromConfig: typeof import("./dispatch-from-config.js").dispatchReplyFromConfig; +let tryDispatchAcpReplyHook: typeof import("../../plugin-sdk/acp-runtime.js").tryDispatchAcpReplyHook; + +function createDispatcher(): ReplyDispatcher { + return { + sendToolResult: vi.fn(() => true), + sendBlockReply: vi.fn(() => true), + sendFinalReply: vi.fn(() => true), + waitForIdle: vi.fn(async () => {}), + getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + markComplete: vi.fn(), + }; +} + +function shouldUseAcpReplyDispatchHook(eventUnknown: unknown): boolean { + const event = eventUnknown as { + sessionKey?: string; + ctx?: { + SessionKey?: string; + CommandTargetSessionKey?: string; + AcpDispatchTailAfterReset?: boolean; + }; + }; + if (event.ctx?.AcpDispatchTailAfterReset) { + return true; + } + return [event.sessionKey, event.ctx?.SessionKey, event.ctx?.CommandTargetSessionKey].some( + (value) => { + const key = value?.trim(); + return Boolean(key && (key.includes("acp:") || key.includes(":acp") || key.includes("-acp"))); + }, + ); +} + +function setNoAbort() { + mocks.tryFastAbortFromMessage.mockResolvedValue(noAbortResult); +} + +function createMockAcpSessionManager() { + return { + resolveSession: (params: { cfg: OpenClawConfig; sessionKey: string }) => { + const entry = acpMocks.readAcpSessionEntry({ + cfg: params.cfg, + sessionKey: params.sessionKey, + }) as { acp?: Record } | null; + if (entry?.acp) { + return { + kind: "ready" as const, + sessionKey: params.sessionKey, + meta: entry.acp, + }; + } + return { kind: "none" as const, sessionKey: params.sessionKey }; + }, + getObservabilitySnapshot: () => ({ + runtimeCache: { activeSessions: 0, idleTtlMs: 0, evictedTotal: 0 }, + turns: { + active: 0, + queueDepth: 0, + completed: 0, + failed: 0, + averageLatencyMs: 0, + maxLatencyMs: 0, + }, + errorsByCode: {}, + }), + runTurn: vi.fn( + async (params: { + cfg: OpenClawConfig; + sessionKey: string; + text?: string; + attachments?: unknown[]; + mode: string; + requestId: string; + signal?: AbortSignal; + onEvent: (event: Record) => Promise; + }) => { + const entry = acpMocks.readAcpSessionEntry({ + cfg: params.cfg, + sessionKey: params.sessionKey, + }) as { + acp?: { agent?: string; mode?: string }; + } | null; + const runtimeBackend = acpMocks.requireAcpRuntimeBackend() as { + runtime?: AcpRuntime; + }; + if (!runtimeBackend.runtime) { + throw new Error("ACP runtime backend not mocked"); + } + const handle = await runtimeBackend.runtime.ensureSession({ + sessionKey: params.sessionKey, + mode: (entry?.acp?.mode || "persistent") as AcpRuntimeEnsureInput["mode"], + agent: entry?.acp?.agent || "codex", + }); + const stream = runtimeBackend.runtime.runTurn({ + handle, + text: params.text ?? "", + attachments: params.attachments as AcpRuntimeTurnInput["attachments"], + mode: params.mode as AcpRuntimeTurnInput["mode"], + requestId: params.requestId, + signal: params.signal, + }); + for await (const event of stream) { + await params.onEvent(event); + } + }, + ), + }; +} + +describe("dispatchReplyFromConfig ACP abort", () => { + beforeAll(async () => { + ({ dispatchReplyFromConfig } = await import("./dispatch-from-config.js")); + ({ tryDispatchAcpReplyHook } = await import("../../plugin-sdk/acp-runtime.js")); + }); + + beforeEach(() => { + const discordTestPlugin = { + ...createChannelTestPluginBase({ + id: "discord", + capabilities: { chatTypes: ["direct"], nativeCommands: true }, + }), + outbound: { + deliveryMode: "direct", + shouldSuppressLocalPayloadPrompt: () => false, + }, + }; + setActivePluginRegistry( + createTestRegistry([{ pluginId: "discord", source: "test", plugin: discordTestPlugin }]), + ); + acpManagerRuntimeMocks.getAcpSessionManager.mockReset(); + acpManagerRuntimeMocks.getAcpSessionManager.mockReturnValue(createMockAcpSessionManager()); + hookMocks.runner.hasHooks.mockReset(); + hookMocks.runner.hasHooks.mockImplementation( + (hookName?: string) => hookName === "reply_dispatch", + ); + hookMocks.runner.runBeforeDispatch.mockReset(); + hookMocks.runner.runBeforeDispatch.mockResolvedValue(undefined); + hookMocks.runner.runReplyDispatch.mockReset(); + hookMocks.runner.runReplyDispatch.mockImplementation(async (event: unknown, ctx: unknown) => { + if (!shouldUseAcpReplyDispatchHook(event)) { + return undefined; + } + return (await tryDispatchAcpReplyHook(event as never, ctx as never)) ?? undefined; + }); + hookMocks.runner.runInboundClaim.mockReset(); + hookMocks.runner.runInboundClaim.mockResolvedValue(undefined); + hookMocks.runner.runInboundClaimForPlugin.mockReset(); + hookMocks.runner.runInboundClaimForPlugin.mockResolvedValue(undefined); + hookMocks.runner.runInboundClaimForPluginOutcome.mockReset(); + hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ + status: "no_handler", + }); + hookMocks.runner.runMessageReceived.mockReset(); + internalHookMocks.createInternalHookEvent.mockReset(); + internalHookMocks.createInternalHookEvent.mockImplementation(createInternalHookEventPayload); + internalHookMocks.triggerInternalHook.mockReset(); + sessionStoreMocks.currentEntry = undefined; + sessionStoreMocks.loadSessionStore.mockReset().mockReturnValue({}); + sessionStoreMocks.resolveStorePath.mockReset().mockReturnValue("/tmp/mock-sessions.json"); + sessionStoreMocks.resolveSessionStoreEntry.mockReset().mockReturnValue({ existing: undefined }); + acpMocks.listAcpSessionEntries.mockReset().mockResolvedValue([]); + acpMocks.readAcpSessionEntry.mockReset().mockReturnValue(null); + acpMocks.upsertAcpSessionMeta.mockReset().mockResolvedValue(null); + acpMocks.requireAcpRuntimeBackend.mockReset(); + sessionBindingMocks.listBySession.mockReset().mockReturnValue([]); + sessionBindingMocks.resolveByConversation.mockReset().mockReturnValue(null); + sessionBindingMocks.touch.mockReset(); + pluginConversationBindingMocks.shownFallbackNoticeBindingIds.clear(); + ttsMocks.maybeApplyTtsToPayload + .mockReset() + .mockImplementation(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { payload: ReplyPayload }; + return params.payload; + }); + ttsMocks.normalizeTtsAutoMode + .mockReset() + .mockImplementation((value: unknown) => (typeof value === "string" ? value : undefined)); + ttsMocks.resolveTtsConfig.mockReset().mockReturnValue({ mode: "final" }); + threadInfoMocks.parseSessionThreadInfo + .mockReset() + .mockImplementation(parseGenericThreadSessionInfo); + diagnosticMocks.logMessageQueued.mockReset(); + diagnosticMocks.logMessageProcessed.mockReset(); + diagnosticMocks.logSessionStateChange.mockReset(); + agentEventMocks.emitAgentEvent.mockReset(); + agentEventMocks.onAgentEvent.mockReset().mockImplementation(() => () => {}); + setNoAbort(); + }); + + it("aborts ACP dispatch promptly when the caller abort signal fires", async () => { + let releaseTurn: (() => void) | undefined; + const releasePromise = new Promise((resolve) => { + releaseTurn = resolve; + }); + const runtime = { + ensureSession: vi.fn( + async (input: { sessionKey: string; mode: string; agent: string }) => + ({ + sessionKey: input.sessionKey, + backend: "acpx", + runtimeSessionName: `${input.sessionKey}:${input.mode}`, + }) as AcpRuntimeHandle, + ), + runTurn: vi.fn(async function* (params: { signal?: AbortSignal }) { + await new Promise((resolve) => { + if (params.signal?.aborted) { + resolve(); + return; + } + const onAbort = () => resolve(); + params.signal?.addEventListener("abort", onAbort, { once: true }); + void releasePromise.then(resolve); + }); + yield { type: "done" } as AcpRuntimeEvent; + }), + cancel: vi.fn(async () => {}), + close: vi.fn(async () => {}), + } satisfies AcpRuntime; + acpMocks.readAcpSessionEntry.mockReturnValue({ + sessionKey: "agent:codex-acp:session-1", + storeSessionKey: "agent:codex-acp:session-1", + cfg: {}, + storePath: "/tmp/mock-sessions.json", + entry: {}, + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime:1", + mode: "persistent", + state: "idle", + lastActivityAt: Date.now(), + }, + }); + acpMocks.requireAcpRuntimeBackend.mockReturnValue({ + id: "acpx", + runtime, + }); + + const abortController = new AbortController(); + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + BodyForAgent: "write a test", + }); + const dispatchPromise = dispatchReplyFromConfig({ + ctx, + cfg: { + acp: { + enabled: true, + dispatch: { enabled: true }, + }, + } as OpenClawConfig, + dispatcher, + replyOptions: { abortSignal: abortController.signal }, + }); + + await vi.waitFor(() => { + expect(runtime.runTurn).toHaveBeenCalledTimes(1); + }); + abortController.abort(); + const outcome = await Promise.race([ + dispatchPromise.then(() => "settled" as const), + new Promise<"pending">((resolve) => { + setTimeout(() => resolve("pending"), 100); + }), + ]); + releaseTurn?.(); + await dispatchPromise; + + expect(outcome).toBe("settled"); + }); +}); diff --git a/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts b/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts new file mode 100644 index 00000000000..d5d6da5d9fb --- /dev/null +++ b/src/auto-reply/reply/dispatch-from-config.reply-dispatch.test.ts @@ -0,0 +1,445 @@ +import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../../config/config.js"; +import type { SessionBindingRecord } from "../../infra/outbound/session-binding-service.js"; +import type { + PluginHookBeforeDispatchResult, + PluginHookReplyDispatchResult, + PluginTargetedInboundClaimOutcome, +} from "../../plugins/hooks.js"; +import { setActivePluginRegistry } from "../../plugins/runtime.js"; +import { + createChannelTestPluginBase, + createTestRegistry, +} from "../../test-utils/channel-plugins.js"; +import { createInternalHookEventPayload } from "../../test-utils/internal-hook-event-payload.js"; +import type { ReplyPayload } from "../types.js"; +import type { ReplyDispatcher } from "./reply-dispatcher.js"; +import { buildTestCtx } from "./test-ctx.js"; + +type AbortResult = { handled: boolean; aborted: boolean; stoppedSubagents?: number }; + +const mocks = vi.hoisted(() => ({ + routeReply: vi.fn(async () => ({ ok: true, messageId: "mock" })), + tryFastAbortFromMessage: vi.fn<() => Promise>(async () => ({ + handled: false, + aborted: false, + })), +})); +const diagnosticMocks = vi.hoisted(() => ({ + logMessageQueued: vi.fn(), + logMessageProcessed: vi.fn(), + logSessionStateChange: vi.fn(), +})); +const hookMocks = vi.hoisted(() => ({ + registry: { + plugins: [] as Array<{ id: string; status: "loaded" | "disabled" | "error" }>, + }, + runner: { + hasHooks: vi.fn<(hookName?: string) => boolean>(() => false), + runInboundClaim: vi.fn(async () => undefined), + runInboundClaimForPlugin: vi.fn(async () => undefined), + runInboundClaimForPluginOutcome: vi.fn<() => Promise>( + async () => ({ status: "no_handler" as const }), + ), + runMessageReceived: vi.fn(async () => {}), + runBeforeDispatch: vi.fn< + (_event: unknown, _ctx: unknown) => Promise + >(async () => undefined), + runReplyDispatch: vi.fn< + (_event: unknown, _ctx: unknown) => Promise + >(async () => undefined), + }, +})); +const internalHookMocks = vi.hoisted(() => ({ + createInternalHookEvent: vi.fn(), + triggerInternalHook: vi.fn(async () => {}), +})); +const acpMocks = vi.hoisted(() => ({ + listAcpSessionEntries: vi.fn(async () => []), + readAcpSessionEntry: vi.fn<(params: { sessionKey: string; cfg?: OpenClawConfig }) => unknown>( + () => null, + ), + upsertAcpSessionMeta: vi.fn(async () => null), + requireAcpRuntimeBackend: vi.fn<() => unknown>(), +})); +const sessionBindingMocks = vi.hoisted(() => ({ + listBySession: vi.fn<(targetSessionKey: string) => SessionBindingRecord[]>(() => []), + resolveByConversation: vi.fn< + (ref: { + channel: string; + accountId: string; + conversationId: string; + parentConversationId?: string; + }) => SessionBindingRecord | null + >(() => null), + touch: vi.fn(), +})); +const pluginConversationBindingMocks = vi.hoisted(() => ({ + shownFallbackNoticeBindingIds: new Set(), +})); +const sessionStoreMocks = vi.hoisted(() => ({ + currentEntry: undefined as Record | undefined, + loadSessionStore: vi.fn(() => ({})), + resolveStorePath: vi.fn(() => "/tmp/mock-sessions.json"), + resolveSessionStoreEntry: vi.fn(() => ({ existing: sessionStoreMocks.currentEntry })), +})); +const acpManagerRuntimeMocks = vi.hoisted(() => ({ + getAcpSessionManager: vi.fn(() => ({ + resolveSession: () => ({ kind: "none" as const }), + getObservabilitySnapshot: () => ({ + runtimeCache: { activeSessions: 0, idleTtlMs: 0, evictedTotal: 0 }, + turns: { + active: 0, + queueDepth: 0, + completed: 0, + failed: 0, + averageLatencyMs: 0, + maxLatencyMs: 0, + }, + errorsByCode: {}, + }), + runTurn: vi.fn(), + })), +})); +const agentEventMocks = vi.hoisted(() => ({ + emitAgentEvent: vi.fn(), + onAgentEvent: vi.fn<(listener: unknown) => () => void>(() => () => {}), +})); +const ttsMocks = vi.hoisted(() => ({ + maybeApplyTtsToPayload: vi.fn(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { payload: ReplyPayload }; + return params.payload; + }), + normalizeTtsAutoMode: vi.fn((value: unknown) => (typeof value === "string" ? value : undefined)), + resolveTtsConfig: vi.fn((_cfg: OpenClawConfig) => ({ mode: "final" })), +})); +const threadInfoMocks = vi.hoisted(() => ({ + parseSessionThreadInfo: vi.fn< + (sessionKey: string | undefined) => { + baseSessionKey: string | undefined; + threadId: string | undefined; + } + >(), +})); + +function parseGenericThreadSessionInfo(sessionKey: string | undefined) { + const trimmed = sessionKey?.trim(); + if (!trimmed) { + return { baseSessionKey: undefined, threadId: undefined }; + } + const threadMarker = ":thread:"; + const topicMarker = ":topic:"; + const marker = trimmed.includes(threadMarker) + ? threadMarker + : trimmed.includes(topicMarker) + ? topicMarker + : undefined; + if (!marker) { + return { baseSessionKey: trimmed, threadId: undefined }; + } + const index = trimmed.lastIndexOf(marker); + if (index < 0) { + return { baseSessionKey: trimmed, threadId: undefined }; + } + const baseSessionKey = trimmed.slice(0, index).trim() || undefined; + const threadId = trimmed.slice(index + marker.length).trim() || undefined; + return { baseSessionKey, threadId }; +} + +vi.mock("./route-reply.runtime.js", () => ({ + isRoutableChannel: () => true, + routeReply: mocks.routeReply, +})); +vi.mock("./route-reply.js", () => ({ + isRoutableChannel: () => true, + routeReply: mocks.routeReply, +})); +vi.mock("./abort.runtime.js", () => ({ + tryFastAbortFromMessage: mocks.tryFastAbortFromMessage, + formatAbortReplyText: () => "⚙️ Agent was aborted.", +})); +vi.mock("../../logging/diagnostic.js", () => ({ + logMessageQueued: diagnosticMocks.logMessageQueued, + logMessageProcessed: diagnosticMocks.logMessageProcessed, + logSessionStateChange: diagnosticMocks.logSessionStateChange, +})); +vi.mock("../../config/sessions/thread-info.js", () => ({ + parseSessionThreadInfo: (sessionKey: string | undefined) => + threadInfoMocks.parseSessionThreadInfo(sessionKey), +})); +vi.mock("./dispatch-from-config.runtime.js", () => ({ + createInternalHookEvent: internalHookMocks.createInternalHookEvent, + loadSessionStore: sessionStoreMocks.loadSessionStore, + resolveSessionStoreEntry: sessionStoreMocks.resolveSessionStoreEntry, + resolveStorePath: sessionStoreMocks.resolveStorePath, + triggerInternalHook: internalHookMocks.triggerInternalHook, +})); +vi.mock("../../plugins/hook-runner-global.js", () => ({ + getGlobalHookRunner: () => hookMocks.runner, + getGlobalPluginRegistry: () => hookMocks.registry, +})); +vi.mock("../../acp/runtime/session-meta.js", () => ({ + listAcpSessionEntries: acpMocks.listAcpSessionEntries, + readAcpSessionEntry: acpMocks.readAcpSessionEntry, + upsertAcpSessionMeta: acpMocks.upsertAcpSessionMeta, +})); +vi.mock("../../acp/runtime/registry.js", () => ({ + requireAcpRuntimeBackend: acpMocks.requireAcpRuntimeBackend, +})); +vi.mock("../../infra/outbound/session-binding-service.js", () => ({ + getSessionBindingService: () => ({ + bind: vi.fn(async () => { + throw new Error("bind not mocked"); + }), + getCapabilities: vi.fn(() => ({ + adapterAvailable: true, + bindSupported: true, + unbindSupported: true, + placements: ["current", "child"] as const, + })), + listBySession: (targetSessionKey: string) => + sessionBindingMocks.listBySession(targetSessionKey), + resolveByConversation: sessionBindingMocks.resolveByConversation, + touch: sessionBindingMocks.touch, + unbind: vi.fn(async () => []), + }), +})); +vi.mock("../../infra/agent-events.js", () => ({ + emitAgentEvent: (params: unknown) => agentEventMocks.emitAgentEvent(params), + onAgentEvent: (listener: unknown) => agentEventMocks.onAgentEvent(listener), +})); +vi.mock("../../plugins/conversation-binding.js", () => ({ + buildPluginBindingDeclinedText: () => "Plugin binding request was declined.", + buildPluginBindingErrorText: () => "Plugin binding request failed.", + buildPluginBindingUnavailableText: (binding: { pluginName?: string; pluginId: string }) => + `${binding.pluginName ?? binding.pluginId} is not currently loaded.`, + hasShownPluginBindingFallbackNotice: (bindingId: string) => + pluginConversationBindingMocks.shownFallbackNoticeBindingIds.has(bindingId), + isPluginOwnedSessionBindingRecord: ( + record: SessionBindingRecord | null | undefined, + ): record is SessionBindingRecord => + record?.metadata != null && + typeof record.metadata === "object" && + (record.metadata as { pluginBindingOwner?: string }).pluginBindingOwner === "plugin", + markPluginBindingFallbackNoticeShown: (bindingId: string) => { + pluginConversationBindingMocks.shownFallbackNoticeBindingIds.add(bindingId); + }, + toPluginConversationBinding: (record: SessionBindingRecord) => ({ + bindingId: record.bindingId, + pluginId: "unknown-plugin", + pluginName: undefined, + pluginRoot: "", + channel: record.conversation.channel, + accountId: record.conversation.accountId, + conversationId: record.conversation.conversationId, + parentConversationId: record.conversation.parentConversationId, + }), +})); +vi.mock("./dispatch-acp-manager.runtime.js", () => ({ + getAcpSessionManager: () => acpManagerRuntimeMocks.getAcpSessionManager(), + getSessionBindingService: () => ({ + listBySession: (targetSessionKey: string) => + sessionBindingMocks.listBySession(targetSessionKey), + unbind: vi.fn(async () => []), + }), +})); +vi.mock("../../tts/tts.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), + normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value), + resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg), +})); +vi.mock("../../tts/tts.runtime.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), +})); +vi.mock("../../tts/status-config.js", () => ({ + resolveStatusTtsSnapshot: () => ({ + autoMode: "always", + provider: "auto", + maxLength: 1500, + summarize: true, + }), +})); +vi.mock("./dispatch-acp-tts.runtime.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), +})); +vi.mock("./dispatch-acp-session.runtime.js", () => ({ + readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) => + acpMocks.readAcpSessionEntry(params), +})); +vi.mock("../../tts/tts-config.js", () => ({ + normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value), + resolveConfiguredTtsMode: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg).mode, +})); + +const emptyConfig = {} as OpenClawConfig; +let dispatchReplyFromConfig: typeof import("./dispatch-from-config.js").dispatchReplyFromConfig; +let resetInboundDedupe: typeof import("./inbound-dedupe.js").resetInboundDedupe; + +function createDispatcher(): ReplyDispatcher { + return { + sendToolResult: vi.fn(() => true), + sendBlockReply: vi.fn(() => true), + sendFinalReply: vi.fn(() => true), + waitForIdle: vi.fn(async () => {}), + getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + markComplete: vi.fn(), + }; +} + +function createHookCtx() { + return buildTestCtx({ + Body: "hello", + BodyForAgent: "hello", + BodyForCommands: "hello", + From: "user1", + Surface: "telegram", + ChatType: "private", + SessionKey: "agent:test:session", + }); +} + +describe("dispatchReplyFromConfig reply_dispatch hook", () => { + beforeAll(async () => { + ({ dispatchReplyFromConfig } = await import("./dispatch-from-config.js")); + ({ resetInboundDedupe } = await import("./inbound-dedupe.js")); + }); + + beforeEach(() => { + const discordTestPlugin = { + ...createChannelTestPluginBase({ + id: "discord", + capabilities: { chatTypes: ["direct"], nativeCommands: true }, + }), + outbound: { + deliveryMode: "direct", + shouldSuppressLocalPayloadPrompt: () => false, + }, + }; + setActivePluginRegistry( + createTestRegistry([{ pluginId: "discord", source: "test", plugin: discordTestPlugin }]), + ); + resetInboundDedupe(); + mocks.routeReply.mockReset().mockResolvedValue({ ok: true, messageId: "mock" }); + mocks.tryFastAbortFromMessage.mockReset().mockResolvedValue({ + handled: false, + aborted: false, + }); + hookMocks.runner.hasHooks.mockReset(); + hookMocks.runner.hasHooks.mockImplementation( + (hookName?: string) => hookName === "reply_dispatch", + ); + hookMocks.runner.runInboundClaim.mockReset().mockResolvedValue(undefined); + hookMocks.runner.runInboundClaimForPlugin.mockReset().mockResolvedValue(undefined); + hookMocks.runner.runInboundClaimForPluginOutcome.mockReset().mockResolvedValue({ + status: "no_handler", + }); + hookMocks.runner.runMessageReceived.mockReset().mockResolvedValue(undefined); + hookMocks.runner.runBeforeDispatch.mockReset().mockResolvedValue(undefined); + hookMocks.runner.runReplyDispatch.mockReset().mockResolvedValue(undefined); + internalHookMocks.createInternalHookEvent.mockReset(); + internalHookMocks.createInternalHookEvent.mockImplementation(createInternalHookEventPayload); + internalHookMocks.triggerInternalHook.mockReset().mockResolvedValue(undefined); + acpMocks.listAcpSessionEntries.mockReset().mockResolvedValue([]); + acpMocks.readAcpSessionEntry.mockReset().mockReturnValue(null); + acpMocks.upsertAcpSessionMeta.mockReset().mockResolvedValue(null); + acpMocks.requireAcpRuntimeBackend.mockReset(); + sessionBindingMocks.listBySession.mockReset().mockReturnValue([]); + sessionBindingMocks.resolveByConversation.mockReset().mockReturnValue(null); + sessionBindingMocks.touch.mockReset(); + pluginConversationBindingMocks.shownFallbackNoticeBindingIds.clear(); + sessionStoreMocks.currentEntry = undefined; + sessionStoreMocks.loadSessionStore.mockReset().mockReturnValue({}); + sessionStoreMocks.resolveStorePath.mockReset().mockReturnValue("/tmp/mock-sessions.json"); + sessionStoreMocks.resolveSessionStoreEntry.mockReset().mockReturnValue({ existing: undefined }); + acpManagerRuntimeMocks.getAcpSessionManager.mockReset(); + acpManagerRuntimeMocks.getAcpSessionManager.mockImplementation(() => ({ + resolveSession: () => ({ kind: "none" as const }), + getObservabilitySnapshot: () => ({ + runtimeCache: { activeSessions: 0, idleTtlMs: 0, evictedTotal: 0 }, + turns: { + active: 0, + queueDepth: 0, + completed: 0, + failed: 0, + averageLatencyMs: 0, + maxLatencyMs: 0, + }, + errorsByCode: {}, + }), + runTurn: vi.fn(), + })); + agentEventMocks.emitAgentEvent.mockReset(); + agentEventMocks.onAgentEvent.mockReset().mockImplementation(() => () => {}); + diagnosticMocks.logMessageQueued.mockReset(); + diagnosticMocks.logMessageProcessed.mockReset(); + diagnosticMocks.logSessionStateChange.mockReset(); + ttsMocks.maybeApplyTtsToPayload + .mockReset() + .mockImplementation(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { payload: ReplyPayload }; + return params.payload; + }); + ttsMocks.normalizeTtsAutoMode + .mockReset() + .mockImplementation((value: unknown) => (typeof value === "string" ? value : undefined)); + ttsMocks.resolveTtsConfig.mockReset().mockReturnValue({ mode: "final" }); + threadInfoMocks.parseSessionThreadInfo + .mockReset() + .mockImplementation(parseGenericThreadSessionInfo); + }); + + it("returns handled dispatch results from plugins", async () => { + hookMocks.runner.runReplyDispatch.mockResolvedValue({ + handled: true, + queuedFinal: true, + counts: { tool: 1, block: 2, final: 3 }, + }); + + const result = await dispatchReplyFromConfig({ + ctx: createHookCtx(), + cfg: emptyConfig, + dispatcher: createDispatcher(), + replyResolver: async () => ({ text: "model reply" }), + }); + + expect(hookMocks.runner.runReplyDispatch).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "agent:test:session", + sendPolicy: "allow", + inboundAudio: false, + }), + expect.objectContaining({ + cfg: emptyConfig, + }), + ); + expect(result).toEqual({ + queuedFinal: true, + counts: { tool: 1, block: 2, final: 3 }, + }); + }); + + it("still applies send-policy deny after an unhandled plugin dispatch", async () => { + hookMocks.runner.runReplyDispatch.mockResolvedValue({ + handled: false, + }); + + const result = await dispatchReplyFromConfig({ + ctx: createHookCtx(), + cfg: { + ...emptyConfig, + session: { + sendPolicy: { default: "deny" }, + }, + }, + dispatcher: createDispatcher(), + replyResolver: async () => ({ text: "model reply" }), + }); + + expect(hookMocks.runner.runReplyDispatch).toHaveBeenCalled(); + expect(result).toEqual({ + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + }); + }); +}); diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 42c3aa0a57e..a00c6a8b112 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -352,6 +352,17 @@ type DispatchReplyArgs = Parameters< typeof import("./dispatch-from-config.js").dispatchReplyFromConfig >[0]; +beforeAll(async () => { + ({ dispatchReplyFromConfig } = await import("./dispatch-from-config.js")); + await import("./dispatch-acp.js"); + await import("./dispatch-acp-command-bypass.js"); + await import("./dispatch-acp-tts.runtime.js"); + await import("./dispatch-acp-session.runtime.js"); + ({ resetInboundDedupe } = await import("./inbound-dedupe.js")); + ({ AcpRuntimeError: AcpRuntimeErrorClass } = await import("../../acp/runtime/errors.js")); + ({ tryDispatchAcpReplyHook } = await import("../../plugin-sdk/acp-runtime.js")); +}); + function createDispatcher(): ReplyDispatcher { return { sendToolResult: vi.fn(() => true), @@ -536,17 +547,6 @@ async function dispatchTwiceWithFreshDispatchers(params: Omit { - beforeAll(async () => { - ({ dispatchReplyFromConfig } = await import("./dispatch-from-config.js")); - await import("./dispatch-acp.js"); - await import("./dispatch-acp-command-bypass.js"); - await import("./dispatch-acp-tts.runtime.js"); - await import("./dispatch-acp-session.runtime.js"); - ({ resetInboundDedupe } = await import("./inbound-dedupe.js")); - ({ AcpRuntimeError: AcpRuntimeErrorClass } = await import("../../acp/runtime/errors.js")); - ({ tryDispatchAcpReplyHook } = await import("../../plugin-sdk/acp-runtime.js")); - }); - beforeEach(() => { const discordTestPlugin = { ...createChannelTestPluginBase({ @@ -1475,92 +1475,6 @@ describe("dispatchReplyFromConfig", () => { ); }); - it("aborts ACP dispatch promptly when the caller abort signal fires", async () => { - setNoAbort(); - let releaseTurn: (() => void) | undefined; - const releasePromise = new Promise((resolve) => { - releaseTurn = resolve; - }); - const runtime = { - ensureSession: vi.fn( - async (input: { sessionKey: string; mode: string; agent: string }) => - ({ - sessionKey: input.sessionKey, - backend: "acpx", - runtimeSessionName: `${input.sessionKey}:${input.mode}`, - }) as { sessionKey: string; backend: string; runtimeSessionName: string }, - ), - runTurn: vi.fn(async function* (params: { signal?: AbortSignal }) { - await new Promise((resolve) => { - if (params.signal?.aborted) { - resolve(); - return; - } - const onAbort = () => resolve(); - params.signal?.addEventListener("abort", onAbort, { once: true }); - void releasePromise.then(resolve); - }); - yield { type: "done" }; - }), - cancel: vi.fn(async () => {}), - close: vi.fn(async () => {}), - }; - acpMocks.readAcpSessionEntry.mockReturnValue({ - sessionKey: "agent:codex-acp:session-1", - storeSessionKey: "agent:codex-acp:session-1", - cfg: {}, - storePath: "/tmp/mock-sessions.json", - entry: {}, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime:1", - mode: "persistent", - state: "idle", - lastActivityAt: Date.now(), - }, - }); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime, - }); - - const abortController = new AbortController(); - const cfg = { - acp: { - enabled: true, - dispatch: { enabled: true }, - }, - } as OpenClawConfig; - const dispatcher = createDispatcher(); - const ctx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - SessionKey: "agent:codex-acp:session-1", - BodyForAgent: "write a test", - }); - const dispatchPromise = dispatchReplyFromConfig({ - ctx, - cfg, - dispatcher, - replyOptions: { abortSignal: abortController.signal }, - }); - await vi.waitFor(() => { - expect(runtime.runTurn).toHaveBeenCalledTimes(1); - }); - abortController.abort(); - const outcome = await Promise.race([ - dispatchPromise.then(() => "settled" as const), - new Promise<"pending">((resolve) => { - setTimeout(() => resolve("pending"), 100); - }), - ]); - releaseTurn?.(); - await dispatchPromise; - - expect(outcome).toBe("settled"); - }); - it("emits lifecycle end for ACP turns using the current run id", async () => { setNoAbort(); const runtime = createAcpRuntime([{ type: "text_delta", text: "done" }, { type: "done" }]); @@ -1897,341 +1811,6 @@ describe("dispatchReplyFromConfig", () => { expect(replyResolver).toHaveBeenCalled(); }); - it("honors send-policy deny before ACP runtime dispatch", async () => { - setNoAbort(); - const runtime = createAcpRuntime([ - { type: "text_delta", text: "should-not-run" }, - { type: "done" }, - ]); - acpMocks.readAcpSessionEntry.mockReturnValue({ - sessionKey: "agent:codex-acp:session-1", - storeSessionKey: "agent:codex-acp:session-1", - cfg: {}, - storePath: "/tmp/mock-sessions.json", - entry: {}, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime:1", - mode: "persistent", - state: "idle", - lastActivityAt: Date.now(), - }, - }); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime, - }); - - const cfg = { - acp: { - enabled: true, - dispatch: { enabled: true }, - }, - session: { - sendPolicy: { - default: "deny", - }, - }, - } as OpenClawConfig; - const dispatcher = createDispatcher(); - const ctx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - SessionKey: "agent:codex-acp:session-1", - BodyForAgent: "write a test", - }); - - await dispatchReplyFromConfig({ ctx, cfg, dispatcher }); - - expect(runtime.runTurn).not.toHaveBeenCalled(); - expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); - }); - - it("does not bypass send-policy deny for ACP slash commands", async () => { - setNoAbort(); - const runtime = createAcpRuntime([{ type: "done" }]); - acpMocks.readAcpSessionEntry.mockReturnValue({ - sessionKey: "agent:codex-acp:session-1", - storeSessionKey: "agent:codex-acp:session-1", - cfg: {}, - storePath: "/tmp/mock-sessions.json", - entry: {}, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime:1", - mode: "persistent", - state: "idle", - lastActivityAt: Date.now(), - }, - }); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime, - }); - - const cfg = { - acp: { - enabled: true, - dispatch: { enabled: true }, - }, - session: { - sendPolicy: { - default: "deny", - }, - }, - } as OpenClawConfig; - const dispatcher = createDispatcher(); - const ctx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - SessionKey: "agent:codex-acp:session-1", - CommandBody: "/acp cancel", - BodyForCommands: "/acp cancel", - BodyForAgent: "/acp cancel", - }); - const replyResolver = vi.fn(async () => ({ text: "command output" }) as ReplyPayload); - - await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); - - expect(replyResolver).not.toHaveBeenCalled(); - expect(runtime.runTurn).not.toHaveBeenCalled(); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); - }); - - it("does not bypass send-policy deny for ACP reset tails after command handling", async () => { - setNoAbort(); - const runtime = createAcpRuntime([ - { type: "text_delta", text: "tail accepted" }, - { type: "done" }, - ]); - acpMocks.readAcpSessionEntry.mockReturnValue({ - sessionKey: "agent:codex-acp:session-1", - storeSessionKey: "agent:codex-acp:session-1", - cfg: {}, - storePath: "/tmp/mock-sessions.json", - entry: {}, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime:1", - mode: "persistent", - state: "idle", - lastActivityAt: Date.now(), - }, - }); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime, - }); - - const cfg = { - acp: { - enabled: true, - dispatch: { enabled: true }, - }, - session: { - sendPolicy: { - default: "deny", - }, - }, - } as OpenClawConfig; - const dispatcher = createDispatcher(); - const ctx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - CommandSource: "native", - SessionKey: "discord:slash:owner", - CommandTargetSessionKey: "agent:codex-acp:session-1", - CommandBody: "/new continue with deployment", - BodyForCommands: "/new continue with deployment", - BodyForAgent: "/new continue with deployment", - }); - const replyResolver = vi.fn(async (resolverCtx: MsgContext) => { - resolverCtx.Body = "continue with deployment"; - resolverCtx.RawBody = "continue with deployment"; - resolverCtx.CommandBody = "continue with deployment"; - resolverCtx.BodyForCommands = "continue with deployment"; - resolverCtx.BodyForAgent = "continue with deployment"; - resolverCtx.AcpDispatchTailAfterReset = true; - return undefined; - }); - - await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); - - expect(replyResolver).not.toHaveBeenCalled(); - expect(runtime.runTurn).not.toHaveBeenCalled(); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); - }); - - it("does not bypass ACP slash aliases when text commands are disabled on native surfaces", async () => { - setNoAbort(); - const runtime = createAcpRuntime([{ type: "done" }]); - acpMocks.readAcpSessionEntry.mockReturnValue({ - sessionKey: "agent:codex-acp:session-1", - storeSessionKey: "agent:codex-acp:session-1", - cfg: {}, - storePath: "/tmp/mock-sessions.json", - entry: {}, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime:1", - mode: "persistent", - state: "idle", - lastActivityAt: Date.now(), - }, - }); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime, - }); - - const cfg = { - acp: { - enabled: true, - dispatch: { enabled: true }, - }, - commands: { - text: false, - }, - session: { - sendPolicy: { - default: "allow", - }, - }, - } as OpenClawConfig; - const dispatcher = createDispatcher(); - const ctx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - SessionKey: "agent:codex-acp:session-1", - CommandBody: "/acp cancel", - BodyForCommands: "/acp cancel", - BodyForAgent: "/acp cancel", - CommandSource: "text", - }); - const replyResolver = vi.fn(async () => ({ text: "should not bypass" }) as ReplyPayload); - - await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); - - expect(runtime.runTurn).toHaveBeenCalledTimes(1); - expect(replyResolver).not.toHaveBeenCalled(); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); - }); - - it("does not bypass ACP dispatch for unauthorized bang-prefixed messages", async () => { - setNoAbort(); - const runtime = createAcpRuntime([{ type: "done" }]); - acpMocks.readAcpSessionEntry.mockReturnValue({ - sessionKey: "agent:codex-acp:session-1", - storeSessionKey: "agent:codex-acp:session-1", - cfg: {}, - storePath: "/tmp/mock-sessions.json", - entry: {}, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime:1", - mode: "persistent", - state: "idle", - lastActivityAt: Date.now(), - }, - }); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime, - }); - - const cfg = { - acp: { - enabled: true, - dispatch: { enabled: true }, - }, - session: { - sendPolicy: { - default: "deny", - }, - }, - } as OpenClawConfig; - const dispatcher = createDispatcher(); - const ctx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - SessionKey: "agent:codex-acp:session-1", - CommandBody: "!poll", - BodyForCommands: "!poll", - BodyForAgent: "!poll", - CommandAuthorized: false, - }); - const replyResolver = vi.fn(async () => ({ text: "should not bypass" }) as ReplyPayload); - - await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); - - expect(runtime.runTurn).not.toHaveBeenCalled(); - expect(replyResolver).not.toHaveBeenCalled(); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); - }); - - it("does not bypass ACP dispatch for bang-prefixed messages when text commands are disabled", async () => { - setNoAbort(); - const runtime = createAcpRuntime([{ type: "done" }]); - acpMocks.readAcpSessionEntry.mockReturnValue({ - sessionKey: "agent:codex-acp:session-1", - storeSessionKey: "agent:codex-acp:session-1", - cfg: {}, - storePath: "/tmp/mock-sessions.json", - entry: {}, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime:1", - mode: "persistent", - state: "idle", - lastActivityAt: Date.now(), - }, - }); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime, - }); - - const cfg = { - acp: { - enabled: true, - dispatch: { enabled: true }, - }, - commands: { - text: false, - }, - session: { - sendPolicy: { - default: "deny", - }, - }, - } as OpenClawConfig; - const dispatcher = createDispatcher(); - const ctx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - SessionKey: "agent:codex-acp:session-1", - CommandBody: "!poll", - BodyForCommands: "!poll", - BodyForAgent: "!poll", - CommandAuthorized: true, - CommandSource: "text", - }); - const replyResolver = vi.fn(async () => ({ text: "should not bypass" }) as ReplyPayload); - - await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); - - expect(runtime.runTurn).not.toHaveBeenCalled(); - expect(replyResolver).not.toHaveBeenCalled(); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); - }); - it("coalesces tiny ACP token deltas into normal Discord text spacing", async () => { setNoAbort(); const runtime = createAcpRuntime([ @@ -3456,13 +3035,12 @@ describe("before_dispatch hook", () => { ...overrides, }); - beforeEach(async () => { - vi.resetModules(); - ({ dispatchReplyFromConfig } = await import("./dispatch-from-config.js")); - ({ resetInboundDedupe } = await import("./inbound-dedupe.js")); + beforeEach(() => { resetInboundDedupe(); mocks.routeReply.mockReset(); mocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" }); + threadInfoMocks.parseSessionThreadInfo.mockReset(); + threadInfoMocks.parseSessionThreadInfo.mockImplementation(parseGenericThreadSessionInfo); ttsMocks.state.synthesizeFinalAudio = false; ttsMocks.maybeApplyTtsToPayload.mockClear(); setNoAbort(); @@ -3562,81 +3140,3 @@ describe("before_dispatch hook", () => { expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "model reply" }); }); }); - -describe("reply_dispatch hook", () => { - const createHookCtx = (overrides: Partial = {}) => - buildTestCtx({ - Body: "hello", - BodyForAgent: "hello", - BodyForCommands: "hello", - From: "user1", - Surface: "telegram", - ChatType: "private", - SessionKey: "agent:test:session", - ...overrides, - }); - - beforeEach(async () => { - vi.resetModules(); - ({ dispatchReplyFromConfig } = await import("./dispatch-from-config.js")); - ({ resetInboundDedupe } = await import("./inbound-dedupe.js")); - resetInboundDedupe(); - hookMocks.runner.runBeforeDispatch.mockReset(); - hookMocks.runner.runBeforeDispatch.mockResolvedValue(undefined); - hookMocks.runner.runReplyDispatch.mockReset(); - hookMocks.runner.runReplyDispatch.mockResolvedValue(undefined); - hookMocks.runner.hasHooks.mockImplementation( - (hookName?: string) => hookName === "reply_dispatch", - ); - }); - - it("returns handled dispatch results from plugins", async () => { - hookMocks.runner.runReplyDispatch.mockResolvedValue({ - handled: true, - queuedFinal: true, - counts: { tool: 1, block: 2, final: 3 }, - }); - - const result = await dispatchReplyFromConfig({ - ctx: createHookCtx(), - cfg: emptyConfig, - dispatcher: createDispatcher(), - replyResolver: async () => ({ text: "model reply" }), - }); - - expect(hookMocks.runner.runReplyDispatch).toHaveBeenCalledWith( - expect.objectContaining({ - sessionKey: "agent:test:session", - sendPolicy: "allow", - inboundAudio: false, - }), - expect.objectContaining({ - cfg: emptyConfig, - }), - ); - expect(result).toEqual({ - queuedFinal: true, - counts: { tool: 1, block: 2, final: 3 }, - }); - }); - - it("still applies send-policy deny after an unhandled plugin dispatch", async () => { - const result = await dispatchReplyFromConfig({ - ctx: createHookCtx(), - cfg: { - ...emptyConfig, - session: { - sendPolicy: { default: "deny" }, - }, - }, - dispatcher: createDispatcher(), - replyResolver: async () => ({ text: "model reply" }), - }); - - expect(hookMocks.runner.runReplyDispatch).toHaveBeenCalled(); - expect(result).toEqual({ - queuedFinal: false, - counts: { tool: 0, block: 0, final: 0 }, - }); - }); -});