diff --git a/src/auto-reply/reply/commands-core.send-policy.test.ts b/src/auto-reply/reply/commands-core.send-policy.test.ts index 9ea844c3b4e..29d5de8f50c 100644 --- a/src/auto-reply/reply/commands-core.send-policy.test.ts +++ b/src/auto-reply/reply/commands-core.send-policy.test.ts @@ -78,9 +78,12 @@ describe("handleCommands send policy", () => { vi.clearAllMocks(); }); - it("prefers the target session entry from sessionStore for send policy checks", async () => { + it("allows processing to continue even when send policy is deny (#53328)", async () => { + // sendPolicy deny now only suppresses outbound delivery, not inbound processing. + // The deny gate moved to dispatch-from-config.ts where it suppresses delivery + // after the agent has processed the message. const result = await handleCommands(makeParams()); - expect(result).toEqual({ shouldContinue: false }); + expect(result).toEqual({ shouldContinue: true }); }); }); diff --git a/src/auto-reply/reply/commands-core.ts b/src/auto-reply/reply/commands-core.ts index f495a6f2208..90106aefd62 100644 --- a/src/auto-reply/reply/commands-core.ts +++ b/src/auto-reply/reply/commands-core.ts @@ -1,5 +1,3 @@ -import { logVerbose } from "../../globals.js"; -import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { shouldHandleTextCommands } from "../commands-registry.js"; import { emitResetCommandHooks } from "./commands-reset-hooks.js"; import { maybeHandleResetCommand } from "./commands-reset.js"; @@ -41,18 +39,8 @@ export async function handleCommands(params: HandleCommandsParams): Promise { expect(onReplyStart).not.toHaveBeenCalled(); }); + it("does not fire onReplyStart when user delivery is suppressed", async () => { + const onReplyStart = vi.fn(async () => {}); + const dispatcher = createDispatcher(); + const coordinator = createAcpDispatchDeliveryCoordinator({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + }), + dispatcher, + inboundAudio: false, + suppressUserDelivery: true, + shouldRouteToOriginating: false, + onReplyStart, + }); + + // Directly invoking the lifecycle (e.g. from dispatch-acp.ts before the + // first deliver call) must not fire the typing indicator when delivery is + // suppressed by sendPolicy: "deny". + await coordinator.startReplyLifecycle(); + const delivered = await coordinator.deliver("final", { text: "hello" }); + + expect(delivered).toBe(false); + expect(onReplyStart).not.toHaveBeenCalled(); + }); + it("keeps parent-owned background ACP child delivery silent while preserving accumulated output", async () => { const dispatcher = createDispatcher(); const coordinator = createAcpDispatchDeliveryCoordinator({ diff --git a/src/auto-reply/reply/dispatch-acp-delivery.ts b/src/auto-reply/reply/dispatch-acp-delivery.ts index d3bed6fad39..0403e2098f2 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.ts @@ -215,6 +215,13 @@ export function createAcpDispatchDeliveryCoordinator(params: { return; } state.startedReplyLifecycle = true; + // When delivery is suppressed (e.g. sendPolicy: "deny"), do not fire the + // onReplyStart callback — channels wire it to typing indicators / lifecycle + // notifications that should not leak outbound events while the session is + // under a deny policy. See #53328. + if (params.suppressUserDelivery) { + return; + } void Promise.resolve(params.onReplyStart?.()).catch((error) => { logVerbose( `dispatch-acp: reply lifecycle start failed: ${error instanceof Error ? error.message : String(error)}`, diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 4e48b95a541..244cf120682 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -3004,6 +3004,28 @@ describe("before_dispatch hook", () => { expect(result.queuedFinal).toBe(true); }); + it("suppresses before_dispatch handled reply when sendPolicy is deny", async () => { + setNoAbort(); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "deny", + }; + hookMocks.runner.runBeforeDispatch.mockResolvedValue({ handled: true, text: "Blocked" }); + const dispatcher = createDispatcher(); + const result = await dispatchReplyFromConfig({ + ctx: createHookCtx({ SessionKey: "test:session" }), + cfg: emptyConfig, + dispatcher, + }); + // Hook handled the message (no model dispatch) + expect(hookMocks.runner.runBeforeDispatch).toHaveBeenCalled(); + // But delivery must be suppressed + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + expect(mocks.routeReply).not.toHaveBeenCalled(); + expect(result.queuedFinal).toBe(false); + }); + it("continues default dispatch when hook returns not handled", async () => { hookMocks.runner.runBeforeDispatch.mockResolvedValue({ handled: false }); const dispatcher = createDispatcher(); @@ -3017,3 +3039,333 @@ describe("before_dispatch hook", () => { expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "model reply" }); }); }); + +describe("sendPolicy deny — suppress delivery, not processing (#53328)", () => { + beforeEach(() => { + hookMocks.runner.hasHooks.mockImplementation( + (hookName?: string) => hookName === "reply_dispatch", + ); + hookMocks.runner.runReplyDispatch.mockResolvedValue(undefined); + hookMocks.runner.runBeforeDispatch.mockResolvedValue(undefined); + }); + + it("still calls the replyResolver when sendPolicy is deny", async () => { + setNoAbort(); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "deny", + }; + const dispatcher = createDispatcher(); + const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload); + const ctx = buildTestCtx({ SessionKey: "test:session" }); + + await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher, + replyResolver, + }); + + // The agent MUST process the message (replyResolver called) + expect(replyResolver).toHaveBeenCalledTimes(1); + }); + + it("passes suppressUserDelivery to tail reply_dispatch when sendPolicy is deny", async () => { + setNoAbort(); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "deny", + }; + hookMocks.runner.runReplyDispatch.mockImplementation(async (event: unknown) => { + const candidate = event as { isTailDispatch?: boolean }; + if (candidate.isTailDispatch) { + return { + handled: true, + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + }; + } + return undefined; + }); + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + SessionKey: "test:session", + AcpDispatchTailAfterReset: true, + }); + + await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher, + replyResolver: async () => ({ text: "agent reply" }), + }); + + expect(hookMocks.runner.runReplyDispatch).toHaveBeenCalledWith( + expect.objectContaining({ + isTailDispatch: true, + sendPolicy: "deny", + suppressUserDelivery: true, + }), + expect.any(Object), + ); + }); + + it("suppresses final reply delivery when sendPolicy is deny", async () => { + setNoAbort(); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "deny", + }; + const dispatcher = createDispatcher(); + const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload); + const ctx = buildTestCtx({ SessionKey: "test:session" }); + + const result = await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher, + replyResolver, + }); + + // Delivery MUST be suppressed + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + expect(result.queuedFinal).toBe(false); + }); + + it("suppresses tool result delivery when sendPolicy is deny", async () => { + setNoAbort(); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "deny", + }; + const dispatcher = createDispatcher(); + let capturedOnToolResult: ((payload: ReplyPayload) => Promise) | undefined; + const replyResolver = vi.fn( + async (_ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig) => { + capturedOnToolResult = opts?.onToolResult as + | ((payload: ReplyPayload) => Promise) + | undefined; + return { text: "reply" } satisfies ReplyPayload; + }, + ); + const ctx = buildTestCtx({ SessionKey: "test:session" }); + + await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher, + replyResolver, + }); + + // Trigger a tool result — delivery should be suppressed + expect(capturedOnToolResult).toBeDefined(); + await capturedOnToolResult!({ text: "tool output" }); + expect(dispatcher.sendToolResult).not.toHaveBeenCalled(); + }); + + it("suppresses block reply delivery when sendPolicy is deny", async () => { + setNoAbort(); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "deny", + }; + const dispatcher = createDispatcher(); + let capturedOnBlockReply: + | ((payload: ReplyPayload, context?: unknown) => Promise) + | undefined; + const replyResolver = vi.fn( + async (_ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig) => { + capturedOnBlockReply = opts?.onBlockReply as + | ((payload: ReplyPayload, context?: unknown) => Promise) + | undefined; + return [] as ReplyPayload[]; + }, + ); + const ctx = buildTestCtx({ SessionKey: "test:session" }); + + await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher, + replyResolver, + }); + + // Trigger a block reply — delivery should be suppressed + expect(capturedOnBlockReply).toBeDefined(); + await capturedOnBlockReply!({ text: "streaming chunk" }); + expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); + }); + + it("delivers replies normally when sendPolicy is allow", async () => { + setNoAbort(); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "allow", + }; + const dispatcher = createDispatcher(); + const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload); + const ctx = buildTestCtx({ SessionKey: "test:session" }); + + await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher, + replyResolver, + }); + + expect(replyResolver).toHaveBeenCalledTimes(1); + expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1); + }); + + it("delivers replies normally when sendPolicy is unset (defaults to allow)", async () => { + setNoAbort(); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + }; + const dispatcher = createDispatcher(); + const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload); + const ctx = buildTestCtx({ SessionKey: "test:session" }); + + await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher, + replyResolver, + }); + + expect(replyResolver).toHaveBeenCalledTimes(1); + expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1); + }); + + it("suppresses the fast-abort reply under sendPolicy deny", async () => { + // Fast-abort runs before sendPolicy in the old code, so the abort reply + // leaked. Under the guard, the abort is still recorded but no reply is + // dispatched. See #53328. + mocks.tryFastAbortFromMessage.mockResolvedValue({ + handled: true, + aborted: true, + }); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "deny", + }; + const dispatcher = createDispatcher(); + const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload); + const ctx = buildTestCtx({ + Provider: "telegram", + Body: "/stop", + SessionKey: "test:session", + }); + + const result = await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher, + replyResolver, + }); + + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + expect(replyResolver).not.toHaveBeenCalled(); + expect(result.queuedFinal).toBe(false); + }); + + it("delivers the fast-abort reply normally when sendPolicy is allow (regression guard)", async () => { + mocks.tryFastAbortFromMessage.mockResolvedValue({ + handled: true, + aborted: true, + }); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "allow", + }; + const dispatcher = createDispatcher(); + const replyResolver = vi.fn(async () => ({ text: "hi" }) satisfies ReplyPayload); + const ctx = buildTestCtx({ + Provider: "telegram", + Body: "/stop", + SessionKey: "test:session", + }); + + await dispatchReplyFromConfig({ + ctx, + cfg: emptyConfig, + dispatcher, + replyResolver, + }); + + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ + text: "⚙️ Agent was aborted.", + }); + }); + + it("skips plugin-bound claim hook under deny and falls through to suppressed agent dispatch", async () => { + // Plugin-bound inbound handlers can emit outbound replies we cannot + // rewind. Under deny, skip the plugin claim entirely and let the agent + // process the message with delivery suppressed. See #53328. + setNoAbort(); + hookMocks.runner.hasHooks.mockImplementation( + ((hookName?: string) => + hookName === "inbound_claim" || hookName === "message_received") as () => boolean, + ); + hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }]; + hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({ + status: "handled", + result: { handled: true }, + }); + sessionBindingMocks.resolveByConversation.mockReturnValue({ + bindingId: "binding-deny", + targetSessionKey: "plugin-binding:codex:abc123", + targetKind: "session", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "channel:deny-test", + }, + status: "active", + boundAt: 1710000000000, + metadata: { + pluginBindingOwner: "plugin", + pluginId: "openclaw-codex-app-server", + pluginRoot: "/tmp/plugin", + }, + } satisfies SessionBindingRecord); + sessionStoreMocks.currentEntry = { + sessionId: "s1", + updatedAt: 0, + sendPolicy: "deny", + }; + const dispatcher = createDispatcher(); + const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload); + const ctx = buildTestCtx({ + Provider: "discord", + Surface: "discord", + OriginatingChannel: "discord", + OriginatingTo: "discord:channel:deny-test", + To: "discord:channel:deny-test", + AccountId: "default", + SessionKey: "agent:main:discord:channel:deny-test", + Body: "observed message", + }); + + await dispatchReplyFromConfig({ ctx, cfg: emptyConfig, dispatcher, replyResolver }); + + // Binding is still tracked (touch runs before the gate)... + expect(sessionBindingMocks.touch).toHaveBeenCalledWith("binding-deny"); + // ...but the plugin claim hook MUST NOT be invoked under deny — the + // plugin can't be trusted to honor suppressDelivery on its outbound path. + expect(hookMocks.runner.runInboundClaimForPluginOutcome).not.toHaveBeenCalled(); + // Agent still processes the message (the whole point of the PR)... + expect(replyResolver).toHaveBeenCalledTimes(1); + // ...but no final reply is delivered. + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); + }); +}); diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index dcfcdab25f9..79049eea8ea 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -427,6 +427,25 @@ export async function dispatchReplyFromConfig( ? toPluginConversationBinding(pluginOwnedBindingRecord) : null; + // Resolve sendPolicy early so every outbound path below (plugin-binding + // notices, fast-abort, normal dispatch) honors suppressDelivery. Under + // sendPolicy: "deny" the agent still processes inbound, but no outbound + // reply/notice/indicator is allowed. See #53328. + const sendPolicy = resolveSendPolicy({ + cfg, + entry: sessionStoreEntry.entry, + sessionKey: sessionStoreEntry.sessionKey ?? sessionKey, + channel: + sessionStoreEntry.entry?.channel ?? + ctx.OriginatingChannel ?? + ctx.Surface ?? + ctx.Provider ?? + undefined, + chatType: sessionStoreEntry.entry?.chatType, + }); + const suppressDelivery = sendPolicy === "deny"; + const suppressHookUserDelivery = suppressAcpChildUserDelivery || suppressDelivery; + let pluginFallbackReason: | "plugin-bound-fallback-missing-plugin" | "plugin-bound-fallback-no-handler" @@ -434,68 +453,78 @@ export async function dispatchReplyFromConfig( if (pluginOwnedBinding) { touchConversationBindingRecord(pluginOwnedBinding.bindingId); - logVerbose( - `plugin-bound inbound routed to ${pluginOwnedBinding.pluginId} conversation=${pluginOwnedBinding.conversationId}`, - ); - const targetedClaimOutcome = hookRunner?.runInboundClaimForPluginOutcome - ? await hookRunner.runInboundClaimForPluginOutcome( - pluginOwnedBinding.pluginId, - inboundClaimEvent, - inboundClaimContext, - ) - : (() => { - const pluginLoaded = - getGlobalPluginRegistry()?.plugins.some( - (plugin) => plugin.id === pluginOwnedBinding.pluginId && plugin.status === "loaded", - ) ?? false; - return pluginLoaded - ? ({ status: "no_handler" } as const) - : ({ status: "missing_plugin" } as const); - })(); + if (suppressDelivery) { + // Plugin-bound inbound handlers typically emit outbound replies we + // cannot rewind. Under deny, skip the plugin claim entirely and fall + // through to normal (suppressed) agent processing so no delivery leaks + // via the plugin path. See #53328. + logVerbose( + `plugin-bound inbound skipped under sendPolicy: deny (plugin=${pluginOwnedBinding.pluginId} session=${sessionKey ?? "unknown"}); falling through to suppressed agent processing`, + ); + } else { + logVerbose( + `plugin-bound inbound routed to ${pluginOwnedBinding.pluginId} conversation=${pluginOwnedBinding.conversationId}`, + ); + const targetedClaimOutcome = hookRunner?.runInboundClaimForPluginOutcome + ? await hookRunner.runInboundClaimForPluginOutcome( + pluginOwnedBinding.pluginId, + inboundClaimEvent, + inboundClaimContext, + ) + : (() => { + const pluginLoaded = + getGlobalPluginRegistry()?.plugins.some( + (plugin) => plugin.id === pluginOwnedBinding.pluginId && plugin.status === "loaded", + ) ?? false; + return pluginLoaded + ? ({ status: "no_handler" } as const) + : ({ status: "missing_plugin" } as const); + })(); - switch (targetedClaimOutcome.status) { - case "handled": { - markIdle("plugin_binding_dispatch"); - recordProcessed("completed", { reason: "plugin-bound-handled" }); - return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; - } - case "missing_plugin": - case "no_handler": { - pluginFallbackReason = - targetedClaimOutcome.status === "missing_plugin" - ? "plugin-bound-fallback-missing-plugin" - : "plugin-bound-fallback-no-handler"; - if (!hasShownPluginBindingFallbackNotice(pluginOwnedBinding.bindingId)) { - const didSendNotice = await sendBindingNotice( - { text: buildPluginBindingUnavailableText(pluginOwnedBinding) }, - "additive", - ); - if (didSendNotice) { - markPluginBindingFallbackNoticeShown(pluginOwnedBinding.bindingId); - } + switch (targetedClaimOutcome.status) { + case "handled": { + markIdle("plugin_binding_dispatch"); + recordProcessed("completed", { reason: "plugin-bound-handled" }); + return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; + } + case "missing_plugin": + case "no_handler": { + pluginFallbackReason = + targetedClaimOutcome.status === "missing_plugin" + ? "plugin-bound-fallback-missing-plugin" + : "plugin-bound-fallback-no-handler"; + if (!hasShownPluginBindingFallbackNotice(pluginOwnedBinding.bindingId)) { + const didSendNotice = await sendBindingNotice( + { text: buildPluginBindingUnavailableText(pluginOwnedBinding) }, + "additive", + ); + if (didSendNotice) { + markPluginBindingFallbackNoticeShown(pluginOwnedBinding.bindingId); + } + } + break; + } + case "declined": { + await sendBindingNotice( + { text: buildPluginBindingDeclinedText(pluginOwnedBinding) }, + "terminal", + ); + markIdle("plugin_binding_declined"); + recordProcessed("completed", { reason: "plugin-bound-declined" }); + return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; + } + case "error": { + logVerbose( + `plugin-bound inbound claim failed for ${pluginOwnedBinding.pluginId}: ${targetedClaimOutcome.error}`, + ); + await sendBindingNotice( + { text: buildPluginBindingErrorText(pluginOwnedBinding) }, + "terminal", + ); + markIdle("plugin_binding_error"); + recordProcessed("completed", { reason: "plugin-bound-error" }); + return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; } - break; - } - case "declined": { - await sendBindingNotice( - { text: buildPluginBindingDeclinedText(pluginOwnedBinding) }, - "terminal", - ); - markIdle("plugin_binding_declined"); - recordProcessed("completed", { reason: "plugin-bound-declined" }); - return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; - } - case "error": { - logVerbose( - `plugin-bound inbound claim failed for ${pluginOwnedBinding.pluginId}: ${targetedClaimOutcome.error}`, - ); - await sendBindingNotice( - { text: buildPluginBindingErrorText(pluginOwnedBinding) }, - "terminal", - ); - markIdle("plugin_binding_error"); - recordProcessed("completed", { reason: "plugin-bound-error" }); - return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; } } } @@ -536,24 +565,30 @@ export async function dispatchReplyFromConfig( } const fastAbort = await fastAbortResolver({ ctx, cfg }); if (fastAbort.handled) { - const payload = { - text: formatAbortReplyTextResolver(fastAbort.stoppedSubagents), - } satisfies ReplyPayload; let queuedFinal = false; let routedFinalCount = 0; - const result = await routeReplyToOriginating(payload); - if (result) { - queuedFinal = result.ok; - if (result.ok) { - routedFinalCount += 1; - } - if (!result.ok) { - logVerbose( - `dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`, - ); + if (!suppressDelivery) { + const payload = { + text: formatAbortReplyTextResolver(fastAbort.stoppedSubagents), + } satisfies ReplyPayload; + const result = await routeReplyToOriginating(payload); + if (result) { + queuedFinal = result.ok; + if (result.ok) { + routedFinalCount += 1; + } + if (!result.ok) { + logVerbose( + `dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`, + ); + } + } else { + queuedFinal = dispatcher.sendFinalReply(payload); } } else { - queuedFinal = dispatcher.sendFinalReply(payload); + logVerbose( + `dispatch-from-config: fast_abort reply suppressed by sendPolicy: deny (session=${sessionKey ?? "unknown"})`, + ); } const counts = dispatcher.getQueuedCounts(); counts.final += routedFinalCount; @@ -562,19 +597,6 @@ export async function dispatchReplyFromConfig( return { queuedFinal, counts }; } - const sendPolicy = resolveSendPolicy({ - cfg, - entry: sessionStoreEntry.entry, - sessionKey: sessionStoreEntry.sessionKey ?? sessionKey, - channel: - sessionStoreEntry.entry?.channel ?? - ctx.OriginatingChannel ?? - ctx.Surface ?? - ctx.Provider ?? - undefined, - chatType: sessionStoreEntry.entry?.chatType, - }); - const shouldSendToolSummaries = ctx.ChatType !== "group" || ctx.IsForum === true; const shouldSendToolStartStatuses = ctx.ChatType !== "group" || ctx.IsForum === true; const sendFinalPayload = async ( @@ -630,7 +652,7 @@ export async function dispatchReplyFromConfig( const text = beforeDispatchResult.text; let queuedFinal = false; let routedFinalCount = 0; - if (text) { + if (text && !suppressDelivery) { const handledReply = await sendFinalPayload({ text }); queuedFinal = handledReply.queuedFinal; routedFinalCount += handledReply.routedFinalCount; @@ -652,7 +674,7 @@ export async function dispatchReplyFromConfig( inboundAudio, sessionTtsAuto, ttsChannel, - suppressUserDelivery: suppressAcpChildUserDelivery, + suppressUserDelivery: suppressHookUserDelivery, shouldRouteToOriginating, originatingChannel, originatingTo, @@ -676,14 +698,12 @@ export async function dispatchReplyFromConfig( } } - if (sendPolicy === "deny") { + // When sendPolicy is "deny", we still let the agent process the inbound message + // (context, memory, tool calls) but suppress all outbound delivery. + if (suppressDelivery) { logVerbose( - `Send blocked by policy for session ${sessionStoreEntry.sessionKey ?? sessionKey ?? "unknown"}`, + `Delivery suppressed by send policy for session ${sessionStoreEntry.sessionKey ?? sessionKey ?? "unknown"} — agent will still process the message`, ); - const counts = dispatcher.getQueuedCounts(); - recordProcessed("completed", { reason: "send_policy_deny" }); - markIdle("message_completed"); - return { queuedFinal: false, counts }; } const toolStartStatusesSent = new Set(); @@ -710,6 +730,9 @@ export async function dispatchReplyFromConfig( return parts.join("\n\n").trim() || "Planning next steps."; }; const maybeSendWorkingStatus = async (label: string): Promise => { + if (suppressDelivery) { + return; + } const normalizedLabel = normalizeWorkingLabel(label); if ( !shouldEmitVerboseProgress() || @@ -735,7 +758,7 @@ export async function dispatchReplyFromConfig( explanation?: string; steps?: string[]; }): Promise => { - if (!shouldEmitVerboseProgress()) { + if (suppressDelivery || !shouldEmitVerboseProgress()) { return; } const replyPayload: ReplyPayload = { @@ -818,7 +841,8 @@ export async function dispatchReplyFromConfig( }; const typing = resolveRunTypingPolicy({ requestedPolicy: params.replyOptions?.typingPolicy, - suppressTyping: params.replyOptions?.suppressTyping === true || shouldSuppressTyping, + suppressTyping: + suppressDelivery || params.replyOptions?.suppressTyping === true || shouldSuppressTyping, originatingChannel, systemEvent: shouldRouteToOriginating, }); @@ -833,6 +857,9 @@ export async function dispatchReplyFromConfig( suppressTyping: typing.suppressTyping, onToolResult: (payload: ReplyPayload) => { const run = async () => { + if (suppressDelivery) { + return; + } const ttsPayload = await maybeApplyTtsToReplyPayload({ payload, cfg, @@ -881,6 +908,9 @@ export async function dispatchReplyFromConfig( }, onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => { const run = async () => { + if (suppressDelivery) { + return; + } // Suppress reasoning payloads — channels using this generic dispatch // path (WhatsApp, web, etc.) do not have a dedicated reasoning lane. // Telegram has its own dispatch path that handles reasoning splitting. @@ -942,11 +972,12 @@ export async function dispatchReplyFromConfig( inboundAudio, sessionTtsAuto, ttsChannel, + suppressUserDelivery: suppressHookUserDelivery, shouldRouteToOriginating, originatingChannel, originatingTo, shouldSendToolSummaries, - sendPolicy: "allow", + sendPolicy, isTailDispatch: true, }, { @@ -971,63 +1002,65 @@ export async function dispatchReplyFromConfig( let queuedFinal = false; let routedFinalCount = 0; - for (const reply of replies) { - // Suppress reasoning payloads from channel delivery — channels using this - // generic dispatch path do not have a dedicated reasoning lane. - if (reply.isReasoning === true) { - continue; - } - const finalReply = await sendFinalPayload(reply); - queuedFinal = finalReply.queuedFinal || queuedFinal; - routedFinalCount += finalReply.routedFinalCount; - } - - const ttsMode = resolveConfiguredTtsMode(cfg); - // Generate TTS-only reply after block streaming completes (when there's no final reply). - // This handles the case where block streaming succeeds and drops final payloads, - // but we still want TTS audio to be generated from the accumulated block content. - if ( - ttsMode === "final" && - replies.length === 0 && - blockCount > 0 && - accumulatedBlockText.trim() - ) { - try { - const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({ - payload: { text: accumulatedBlockText }, - cfg, - channel: ttsChannel, - kind: "final", - inboundAudio, - ttsAuto: sessionTtsAuto, - }); - // Only send if TTS was actually applied (mediaUrl exists) - if (ttsSyntheticReply.mediaUrl) { - // Send TTS-only payload (no text, just audio) so it doesn't duplicate the block content - const ttsOnlyPayload: ReplyPayload = { - mediaUrl: ttsSyntheticReply.mediaUrl, - audioAsVoice: ttsSyntheticReply.audioAsVoice, - }; - const result = await routeReplyToOriginating(ttsOnlyPayload); - if (result) { - queuedFinal = result.ok || queuedFinal; - if (result.ok) { - routedFinalCount += 1; - } - if (!result.ok) { - logVerbose( - `dispatch-from-config: route-reply (tts-only) failed: ${result.error ?? "unknown error"}`, - ); - } - } else { - const didQueue = dispatcher.sendFinalReply(ttsOnlyPayload); - queuedFinal = didQueue || queuedFinal; - } + if (!suppressDelivery) { + for (const reply of replies) { + // Suppress reasoning payloads from channel delivery — channels using this + // generic dispatch path do not have a dedicated reasoning lane. + if (reply.isReasoning === true) { + continue; + } + const finalReply = await sendFinalPayload(reply); + queuedFinal = finalReply.queuedFinal || queuedFinal; + routedFinalCount += finalReply.routedFinalCount; + } + + const ttsMode = resolveConfiguredTtsMode(cfg); + // Generate TTS-only reply after block streaming completes (when there's no final reply). + // This handles the case where block streaming succeeds and drops final payloads, + // but we still want TTS audio to be generated from the accumulated block content. + if ( + ttsMode === "final" && + replies.length === 0 && + blockCount > 0 && + accumulatedBlockText.trim() + ) { + try { + const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({ + payload: { text: accumulatedBlockText }, + cfg, + channel: ttsChannel, + kind: "final", + inboundAudio, + ttsAuto: sessionTtsAuto, + }); + // Only send if TTS was actually applied (mediaUrl exists) + if (ttsSyntheticReply.mediaUrl) { + // Send TTS-only payload (no text, just audio) so it doesn't duplicate the block content + const ttsOnlyPayload: ReplyPayload = { + mediaUrl: ttsSyntheticReply.mediaUrl, + audioAsVoice: ttsSyntheticReply.audioAsVoice, + }; + const result = await routeReplyToOriginating(ttsOnlyPayload); + if (result) { + queuedFinal = result.ok || queuedFinal; + if (result.ok) { + routedFinalCount += 1; + } + if (!result.ok) { + logVerbose( + `dispatch-from-config: route-reply (tts-only) failed: ${result.error ?? "unknown error"}`, + ); + } + } else { + const didQueue = dispatcher.sendFinalReply(ttsOnlyPayload); + queuedFinal = didQueue || queuedFinal; + } + } + } catch (err) { + logVerbose( + `dispatch-from-config: accumulated block TTS failed: ${formatErrorMessage(err)}`, + ); } - } catch (err) { - logVerbose( - `dispatch-from-config: accumulated block TTS failed: ${formatErrorMessage(err)}`, - ); } } diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index 1c24175fea7..3a0b04456c5 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -350,7 +350,12 @@ export async function runPreparedReply( sessionCtx.MediaPath || (sessionCtx.MediaPaths && sessionCtx.MediaPaths.length > 0), ); if (!baseBodyTrimmed && !hasMediaAttachment) { - await typing.onReplyStart(); + // Skip onReplyStart when typing is suppressed (e.g. sendPolicy deny) — + // otherwise channels that wire onReplyStart to typing indicators leak + // visible signals even though outbound delivery is suppressed. + if (!suppressTyping) { + await typing.onReplyStart(); + } logVerbose("Inbound body empty after normalization; skipping agent run"); typing.cleanup(); return { diff --git a/src/plugin-sdk/acp-runtime.test.ts b/src/plugin-sdk/acp-runtime.test.ts index 8fd38fb4199..4f25d35e09c 100644 --- a/src/plugin-sdk/acp-runtime.test.ts +++ b/src/plugin-sdk/acp-runtime.test.ts @@ -117,6 +117,74 @@ describe("tryDispatchAcpReplyHook", () => { expect(dispatchMock).toHaveBeenCalledOnce(); }); + it("dispatches non-tail ACP turn under deny when suppressUserDelivery is set", async () => { + bypassMock.mockResolvedValue(false); + dispatchMock.mockResolvedValue({ + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + }); + + const result = await tryDispatchAcpReplyHook( + { + ...event, + sendPolicy: "deny", + suppressUserDelivery: true, + ctx: buildTestCtx({ + SessionKey: "agent:test:session", + BodyForCommands: "write a test", + BodyForAgent: "write a test", + }), + }, + ctx, + ); + + // Non-tail, non-command ACP turns under deny must still flow through ACP + // runtime so session/tool state stays consistent — delivery suppression is + // handled inside the ACP delivery path via suppressUserDelivery. + expect(dispatchMock).toHaveBeenCalledOnce(); + expect(dispatchMock).toHaveBeenCalledWith( + expect.objectContaining({ + suppressUserDelivery: true, + bypassForCommand: false, + }), + ); + expect(result).toEqual({ + handled: true, + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + }); + }); + + it("allows tail dispatch through when sendPolicy is deny", async () => { + bypassMock.mockResolvedValue(false); + dispatchMock.mockResolvedValue({ + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + }); + + const result = await tryDispatchAcpReplyHook( + { + ...event, + sendPolicy: "deny", + isTailDispatch: true, + ctx: buildTestCtx({ + SessionKey: "agent:test:session", + BodyForCommands: "continue after reset", + BodyForAgent: "continue after reset", + }), + }, + ctx, + ); + + // Tail dispatch should proceed despite deny — delivery suppression is handled downstream + expect(dispatchMock).toHaveBeenCalledOnce(); + expect(result).toEqual({ + handled: true, + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + }); + }); + it("does not let ACP claim reset commands before local command handling", async () => { bypassMock.mockResolvedValue(true); dispatchMock.mockResolvedValue(undefined); diff --git a/src/plugin-sdk/acp-runtime.ts b/src/plugin-sdk/acp-runtime.ts index e7ae343cb48..269e288fdda 100644 --- a/src/plugin-sdk/acp-runtime.ts +++ b/src/plugin-sdk/acp-runtime.ts @@ -60,13 +60,31 @@ export async function tryDispatchAcpReplyHook( event: PluginHookReplyDispatchEvent, ctx: PluginHookReplyDispatchContext, ): Promise { - if (event.sendPolicy === "deny" && !hasExplicitCommandCandidate(event.ctx)) { + // Under sendPolicy: "deny", ACP-bound sessions still need their turns to flow + // through acpManager.runTurn so session state, tool calls, and memory stay + // consistent — only outbound delivery should be suppressed. The ACP delivery + // path (dispatch-acp-delivery.ts) honors event.suppressUserDelivery to drop + // user-facing sends. If suppressUserDelivery is not set under deny, we cannot + // safely route through ACP (delivery would leak), so fall back to the + // embedded reply path unless an explicit command candidate or tail dispatch + // warrants going through ACP anyway. + if ( + event.sendPolicy === "deny" && + !event.suppressUserDelivery && + !hasExplicitCommandCandidate(event.ctx) && + !event.isTailDispatch + ) { return; } const runtime = await loadDispatchAcpRuntime(); const bypassForCommand = await runtime.shouldBypassAcpDispatchForCommand(event.ctx, ctx.cfg); - if (event.sendPolicy === "deny" && !bypassForCommand) { + if ( + event.sendPolicy === "deny" && + !event.suppressUserDelivery && + !bypassForCommand && + !event.isTailDispatch + ) { return; }