From 4c613fbfe05f4dc7ee66dfffa5bc48dc3f370816 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 18 May 2026 02:39:42 +0100 Subject: [PATCH] refactor(cron): centralize source delivery plan --- src/agents/pi-embedded-messaging.types.ts | 2 + ...i-embedded-subscribe.tools.extract.test.ts | 64 +++++ src/agents/pi-embedded-subscribe.tools.ts | 17 +- .../delivery-dispatch.double-announce.test.ts | 19 +- .../delivery-dispatch.named-agent.test.ts | 35 +-- src/cron/isolated-agent/delivery-dispatch.ts | 46 +--- .../isolated-agent/run-delivery.runtime.ts | 6 +- src/cron/isolated-agent/run-executor.ts | 40 ++-- .../run.message-tool-policy.test.ts | 123 ++++++++-- src/cron/isolated-agent/run.test-harness.ts | 10 +- src/cron/isolated-agent/run.ts | 206 ++++++---------- .../outbound/source-delivery-plan.test.ts | 210 +++++++++++++++++ src/infra/outbound/source-delivery-plan.ts | 221 ++++++++++++++++++ 13 files changed, 770 insertions(+), 229 deletions(-) create mode 100644 src/infra/outbound/source-delivery-plan.test.ts create mode 100644 src/infra/outbound/source-delivery-plan.ts diff --git a/src/agents/pi-embedded-messaging.types.ts b/src/agents/pi-embedded-messaging.types.ts index d85eae9eaac..4e6fe037e62 100644 --- a/src/agents/pi-embedded-messaging.types.ts +++ b/src/agents/pi-embedded-messaging.types.ts @@ -6,6 +6,8 @@ export type MessagingToolSend = { accountId?: string; to?: string; threadId?: string; + threadImplicit?: boolean; + threadSuppressed?: boolean; text?: string; mediaUrls?: string[]; }; diff --git a/src/agents/pi-embedded-subscribe.tools.extract.test.ts b/src/agents/pi-embedded-subscribe.tools.extract.test.ts index 1af32519995..b1f00f8954e 100644 --- a/src/agents/pi-embedded-subscribe.tools.extract.test.ts +++ b/src/agents/pi-embedded-subscribe.tools.extract.test.ts @@ -17,6 +17,7 @@ describe("extractMessagingToolSend", () => { plugin: { ...createChannelTestPluginBase({ id: "telegram" }), messaging: { normalizeTarget: normalizeTelegramMessagingTargetForTest }, + threading: { resolveAutoThreadId: () => "456" }, }, source: "test", }, @@ -25,6 +26,18 @@ describe("extractMessagingToolSend", () => { plugin: { ...createChannelTestPluginBase({ id: "slack" }), messaging: { normalizeTarget: (raw: string) => raw.trim().toLowerCase() }, + actions: { + extractToolSend: (params: { args: Record }) => { + const { args } = params; + return args.action === "sendMessage" && typeof args.to === "string" + ? { + to: args.to, + accountId: typeof args.accountId === "string" ? args.accountId : undefined, + threadId: typeof args.threadId === "string" ? args.threadId : undefined, + } + : null; + }, + }, }, source: "test", }, @@ -119,4 +132,55 @@ describe("extractMessagingToolSend", () => { expect(result?.to).toBe("channel:123"); expect(result?.threadId).toBe("456"); }); + + it("records when message sends can inherit the current thread", () => { + const result = extractMessagingToolSend("message", { + action: "send", + provider: "telegram", + to: "123", + content: "done", + }); + + expect(result?.threadImplicit).toBe(true); + }); + + it("keeps provider-tool extracted thread id evidence", () => { + const result = extractMessagingToolSend("slack", { + action: "sendMessage", + to: " Channel:C1 ", + threadId: "171.222", + accountId: "bot-a", + content: "done", + }); + + expect(result).toMatchObject({ + tool: "slack", + provider: "slack", + accountId: "bot-a", + to: "channel:c1", + threadId: "171.222", + }); + }); + + it("records when message sends explicitly suppress implicit thread delivery", () => { + const topLevel = extractMessagingToolSend("message", { + action: "send", + provider: "telegram", + to: "123", + topLevel: true, + content: "done", + }); + const nullThread = extractMessagingToolSend("message", { + action: "send", + provider: "telegram", + to: "123", + threadId: null, + content: "done", + }); + + expect(topLevel?.threadSuppressed).toBe(true); + expect(topLevel?.threadImplicit).toBeUndefined(); + expect(nullThread?.threadSuppressed).toBe(true); + expect(nullThread?.threadImplicit).toBeUndefined(); + }); }); diff --git a/src/agents/pi-embedded-subscribe.tools.ts b/src/agents/pi-embedded-subscribe.tools.ts index f1871d9e9e3..63444f6ae2a 100644 --- a/src/agents/pi-embedded-subscribe.tools.ts +++ b/src/agents/pi-embedded-subscribe.tools.ts @@ -579,8 +579,21 @@ export function extractMessagingToolSend( const provider = providerId ?? normalizeOptionalLowercaseString(providerHint) ?? "message"; const to = normalizeTargetForProvider(provider, toRaw); const threadId = normalizeOptionalString(args.threadId); + const threadSuppressed = args.topLevel === true || args.threadId === null; + const threadImplicit = + !threadId && + !threadSuppressed && + Boolean(providerId && getChannelPlugin(providerId)?.threading?.resolveAutoThreadId); return to - ? { tool: toolName, provider, accountId, to, ...(threadId ? { threadId } : {}) } + ? { + tool: toolName, + provider, + accountId, + to, + ...(threadId ? { threadId } : {}), + ...(threadImplicit ? { threadImplicit: true } : {}), + ...(threadSuppressed ? { threadSuppressed: true } : {}), + } : undefined; } const providerId = normalizeChannelId(toolName); @@ -593,12 +606,14 @@ export function extractMessagingToolSend( return undefined; } const to = normalizeTargetForProvider(providerId, extracted.to); + const threadId = normalizeOptionalString(extracted.threadId); return to ? { tool: toolName, provider: providerId, accountId: extracted.accountId ?? accountId, to, + ...(threadId ? { threadId } : {}), } : undefined; } diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts index 8d48d8a9602..f2e90469d1c 100644 --- a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -231,6 +231,12 @@ function makeBaseParams(overrides: { resolvedDelivery, deliveryRequested: overrides.deliveryRequested ?? true, skipHeartbeatDelivery: false, + sourceDeliveryOutcome: { + visibleDeliveries: [], + verifiedMessageToolDelivery: false, + satisfiesSourceDelivery: false, + unverifiedMessageToolDelivery: false, + }, deliveryBestEffort: overrides.deliveryBestEffort ?? false, deliveryPayloadHasStructuredContent: false, deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [], @@ -1392,7 +1398,18 @@ describe("dispatchCronDelivery — double-announce guard", () => { mode: "implicit", error: new Error("sessionKey is required to resolve delivery.channel=last"), }; - params.unverifiedMessagingToolDelivery = true; + params.sourceDeliveryOutcome = { + visibleDeliveries: [ + { + via: "message_tool", + target: { tool: "message", provider: "messagechat", to: "123" }, + verifiedTarget: false, + }, + ], + verifiedMessageToolDelivery: false, + satisfiesSourceDelivery: false, + unverifiedMessageToolDelivery: true, + }; const state = await dispatchCronDelivery(params); diff --git a/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts b/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts index f1e903b7f51..c940663a235 100644 --- a/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, vi } from "vitest"; -import { matchesMessagingToolDeliveryTarget } from "./delivery-dispatch.js"; +import { sourceDeliveryTargetsMatch } from "../../infra/outbound/source-delivery-plan.js"; // Mock the announce flow dependencies to test the fallback behavior. vi.mock("../../agents/subagent-announce.js", () => ({ @@ -9,10 +9,10 @@ vi.mock("../../agents/subagent-registry-read.js", () => ({ countActiveDescendantRuns: vi.fn().mockReturnValue(0), })); -describe("matchesMessagingToolDeliveryTarget", () => { +describe("sourceDeliveryTargetsMatch", () => { it("matches when channel and to agree", () => { expect( - matchesMessagingToolDeliveryTarget( + sourceDeliveryTargetsMatch( { provider: "telegram", to: "123456" }, { channel: "telegram", to: "123456" }, ), @@ -21,7 +21,7 @@ describe("matchesMessagingToolDeliveryTarget", () => { it("rejects when channel differs", () => { expect( - matchesMessagingToolDeliveryTarget( + sourceDeliveryTargetsMatch( { provider: "whatsapp", to: "123456" }, { channel: "telegram", to: "123456" }, ), @@ -30,7 +30,7 @@ describe("matchesMessagingToolDeliveryTarget", () => { it("rejects when to is missing from delivery", () => { expect( - matchesMessagingToolDeliveryTarget( + sourceDeliveryTargetsMatch( { provider: "telegram", to: "123456" }, { channel: "telegram", to: undefined }, ), @@ -39,25 +39,34 @@ describe("matchesMessagingToolDeliveryTarget", () => { it("rejects when channel is missing from delivery", () => { expect( - matchesMessagingToolDeliveryTarget( + sourceDeliveryTargetsMatch( { provider: "telegram", to: "123456" }, { channel: undefined, to: "123456" }, ), ).toBe(false); }); - it("strips :topic:NNN suffix from target.to before comparing", () => { + it("matches topic suffixes against the resolved delivery thread", () => { expect( - matchesMessagingToolDeliveryTarget( + sourceDeliveryTargetsMatch( { provider: "telegram", to: "-1003597428309:topic:462" }, - { channel: "telegram", to: "-1003597428309" }, + { channel: "telegram", to: "-1003597428309", threadId: 462 }, ), ).toBe(true); }); + it("rejects matching room targets when thread ids differ", () => { + expect( + sourceDeliveryTargetsMatch( + { provider: "telegram", to: "-1003597428309", threadId: "111" }, + { channel: "telegram", to: "-1003597428309", threadId: 462 }, + ), + ).toBe(false); + }); + it("matches when provider is 'message' (generic)", () => { expect( - matchesMessagingToolDeliveryTarget( + sourceDeliveryTargetsMatch( { provider: "message", to: "123456" }, { channel: "telegram", to: "123456" }, ), @@ -66,7 +75,7 @@ describe("matchesMessagingToolDeliveryTarget", () => { it("rejects when accountIds differ", () => { expect( - matchesMessagingToolDeliveryTarget( + sourceDeliveryTargetsMatch( { provider: "telegram", to: "123456", accountId: "bot-a" }, { channel: "telegram", to: "123456", accountId: "bot-b" }, ), @@ -75,7 +84,7 @@ describe("matchesMessagingToolDeliveryTarget", () => { it("matches when delivery has accountId and target omits it (tool fills accountId at exec)", () => { expect( - matchesMessagingToolDeliveryTarget( + sourceDeliveryTargetsMatch( { provider: "message", to: "123456" }, { channel: "telegram", to: "123456", accountId: "bot-a" }, ), @@ -84,7 +93,7 @@ describe("matchesMessagingToolDeliveryTarget", () => { it("matches when delivery and target carry the same accountId", () => { expect( - matchesMessagingToolDeliveryTarget( + sourceDeliveryTargetsMatch( { provider: "telegram", to: "123456", accountId: "bot-a" }, { channel: "telegram", to: "123456", accountId: "bot-a" }, ), diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index 6bebc1ac6f3..1addaa06c0c 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -27,6 +27,7 @@ import { createOutboundPayloadPlan, projectOutboundPayloadPlanForMirror, } from "../../infra/outbound/payloads.js"; +import type { SourceDeliveryOutcome } from "../../infra/outbound/source-delivery-plan.js"; import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js"; import { hasReplyPayloadContent } from "../../interactive/payload.js"; import { isAudioFileName } from "../../media/mime.js"; @@ -37,11 +38,7 @@ import { resolveAgentIdFromSessionKey, } from "../../routing/session-key.js"; import { createLazyImportLoader } from "../../shared/lazy-promise.js"; -import { - normalizeLowercaseStringOrEmpty, - normalizeOptionalLowercaseString, - normalizeOptionalString, -} from "../../shared/string-coerce.js"; +import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { shouldAttemptTtsPayload } from "../../tts/tts-config.js"; import { createCronExecutionId } from "../run-id.js"; import { hasScheduledNextRunAtMs } from "../service/jobs.js"; @@ -89,28 +86,6 @@ function normalizeSilentReplyText(text: string | undefined): NormalizedSilentRep return { text: next, strippedTrailingSilentToken }; } -export function matchesMessagingToolDeliveryTarget( - target: { provider?: string; to?: string; accountId?: string }, - delivery: { channel?: string; to?: string; accountId?: string }, -): boolean { - if (!delivery.channel || !delivery.to || !target.to) { - return false; - } - const channel = normalizeLowercaseStringOrEmpty(delivery.channel); - const provider = normalizeOptionalLowercaseString(target.provider); - if (provider && provider !== "message" && provider !== channel) { - return false; - } - if (delivery.accountId && target.accountId && target.accountId !== delivery.accountId) { - return false; - } - // Strip :topic:NNN from message targets and normalize Feishu/Lark prefixes on - // both sides so cron duplicate suppression compares canonical IDs. - const normalizedTargetTo = normalizeDeliveryTarget(channel, target.to.replace(/:topic:\d+$/, "")); - const normalizedDeliveryTo = normalizeDeliveryTarget(channel, delivery.to); - return normalizedTargetTo === normalizedDeliveryTo; -} - export function resolveCronDeliveryBestEffort(job: CronJob): boolean { return job.delivery?.bestEffort === true; } @@ -132,8 +107,7 @@ type DispatchCronDeliveryParams = { resolvedDelivery: DeliveryTargetResolution; deliveryRequested: boolean; skipHeartbeatDelivery: boolean; - skipMessagingToolDelivery?: boolean; - unverifiedMessagingToolDelivery?: boolean; + sourceDeliveryOutcome: SourceDeliveryOutcome; deliveryBestEffort: boolean; deliveryPayloadHasStructuredContent: boolean; deliveryPayloads: ReplyPayload[]; @@ -748,20 +722,18 @@ async function retryTransientDirectCronDelivery(params: { export async function dispatchCronDelivery( params: DispatchCronDeliveryParams, ): Promise { - const skipMessagingToolDelivery = params.skipMessagingToolDelivery === true; + const sourceDeliverySatisfied = params.sourceDeliveryOutcome.satisfiesSourceDelivery; + const verifiedMessageToolDelivery = params.sourceDeliveryOutcome.verifiedMessageToolDelivery; let summary = params.summary; let outputText = params.outputText; let synthesizedText = params.synthesizedText; let deliveryPayloads = params.deliveryPayloads; - // Shared callers can treat a matching message-tool send as the completed - // delivery path. Cron-owned callers keep this false so direct cron delivery - // remains the only source of delivered state. - let delivered = skipMessagingToolDelivery; - let deliveryAttempted = skipMessagingToolDelivery; + let delivered = verifiedMessageToolDelivery; + let deliveryAttempted = verifiedMessageToolDelivery; let directCronSessionDeleted = false; const formatDeliveryTargetError = (error: string) => - params.unverifiedMessagingToolDelivery === true + params.sourceDeliveryOutcome.unverifiedMessageToolDelivery ? `${error}; the agent used the message tool, but OpenClaw could not verify that message matched the cron delivery target` : error; const failDeliveryTarget = (error: string) => @@ -1212,7 +1184,7 @@ export async function dispatchCronDelivery( return await deliverViaDirectAndCleanup(delivery, { retryTransient: true }); }; - if (params.deliveryRequested && !params.skipHeartbeatDelivery && !skipMessagingToolDelivery) { + if (params.deliveryRequested && !params.skipHeartbeatDelivery && !sourceDeliverySatisfied) { if (!params.resolvedDelivery.ok) { if (!params.deliveryBestEffort) { return { diff --git a/src/cron/isolated-agent/run-delivery.runtime.ts b/src/cron/isolated-agent/run-delivery.runtime.ts index d1601a6ec4d..2c0746b2b3c 100644 --- a/src/cron/isolated-agent/run-delivery.runtime.ts +++ b/src/cron/isolated-agent/run-delivery.runtime.ts @@ -1,6 +1,2 @@ export { resolveDeliveryTarget } from "./delivery-target.js"; -export { - dispatchCronDelivery, - matchesMessagingToolDeliveryTarget, - resolveCronDeliveryBestEffort, -} from "./delivery-dispatch.js"; +export { dispatchCronDelivery, resolveCronDeliveryBestEffort } from "./delivery-dispatch.js"; diff --git a/src/cron/isolated-agent/run-executor.ts b/src/cron/isolated-agent/run-executor.ts index b00d2546b5c..c71c24cd2c0 100644 --- a/src/cron/isolated-agent/run-executor.ts +++ b/src/cron/isolated-agent/run-executor.ts @@ -2,10 +2,10 @@ import type { BootstrapContextMode } from "../../agents/bootstrap-files.js"; import { resolveCliRuntimeExecutionProvider } from "../../agents/model-runtime-aliases.js"; import type { SkillSnapshot } from "../../agents/skills.js"; import { normalizeToolList } from "../../agents/tool-policy.js"; -import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js"; import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js"; import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import type { SourceDeliveryPlan } from "../../infra/outbound/source-delivery-plan.js"; import { createLazyImportLoader } from "../../shared/lazy-promise.js"; import type { CronAgentExecutionPhaseUpdate, CronJob } from "../types.js"; import { @@ -111,19 +111,14 @@ export function createCronPromptExecutor(params: { /** Set when the cron payload's `timeoutSeconds` was explicitly configured. */ runTimeoutOverrideMs?: number; senderIsOwner: boolean; - messageChannel: string | undefined; suppressExecNotifyOnExit: boolean; resolvedDelivery: { + channel?: string; accountId?: string; to?: string; threadId?: string | number; }; - sourceReplyDeliveryMode?: SourceReplyDeliveryMode; - toolPolicy: { - requireExplicitMessageTarget: boolean; - disableMessageTool: boolean; - forceMessageTool: boolean; - }; + sourceDelivery: SourceDeliveryPlan; skillsSnapshot: SkillSnapshot; agentPayload: AgentTurnPayload; useSubagentFallbacks: boolean; @@ -158,6 +153,8 @@ export function createCronPromptExecutor(params: { params.cronSession.sessionEntry.systemPromptReport, ); const bootstrapContextMode = resolveCronBootstrapContextMode(params.agentPayload); + const sourceReplyDeliveryMode = params.sourceDelivery.sourceReplyDeliveryMode; + const messageChannel = params.sourceDelivery.target.channel ?? params.resolvedDelivery.channel; const runPrompt = async (promptText: string) => { const fallbackResult = await runWithModelFallback({ @@ -217,8 +214,8 @@ export function createCronPromptExecutor(params: { lane: resolveCronAgentLane(params.lane), cliSessionId, skillsSnapshot: params.skillsSnapshot, - messageChannel: params.messageChannel, - sourceReplyDeliveryMode: params.sourceReplyDeliveryMode, + messageChannel, + sourceReplyDeliveryMode, abortSignal: params.abortSignal, onExecutionStarted: params.onExecutionStarted, onExecutionPhase: params.onExecutionPhase, @@ -235,7 +232,7 @@ export function createCronPromptExecutor(params: { } const { resolveFastModeState, runEmbeddedPiAgent } = await loadCronEmbeddedRuntime(); const currentChannelId = await resolveCurrentChannelTarget({ - channel: params.messageChannel, + channel: messageChannel, to: params.resolvedDelivery.to, threadId: params.resolvedDelivery.threadId, }); @@ -251,7 +248,7 @@ export function createCronPromptExecutor(params: { ownerOnlyToolAllowlist: resolveCronOwnerOnlyToolAllowlist( params.agentPayload?.toolsAllow, ), - messageChannel: params.messageChannel, + messageChannel, agentAccountId: params.resolvedDelivery.accountId, messageTo: params.resolvedDelivery.to, messageThreadId: params.resolvedDelivery.threadId, @@ -290,11 +287,11 @@ export function createCronPromptExecutor(params: { notifyOnExitEmptySuccess: false, } : undefined, - sourceReplyDeliveryMode: params.sourceReplyDeliveryMode, + sourceReplyDeliveryMode, runId: params.cronSession.sessionEntry.sessionId, - requireExplicitMessageTarget: params.toolPolicy.requireExplicitMessageTarget, - disableMessageTool: params.toolPolicy.disableMessageTool, - forceMessageTool: params.toolPolicy.forceMessageTool, + requireExplicitMessageTarget: params.sourceDelivery.messageTool.requireExplicitTarget, + disableMessageTool: !params.sourceDelivery.messageTool.enabled, + forceMessageTool: params.sourceDelivery.messageTool.force, allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe, abortSignal: params.abortSignal, onExecutionStarted: params.onExecutionStarted, @@ -344,12 +341,7 @@ export async function executeCronRun(params: { to?: string; threadId?: string | number; }; - sourceReplyDeliveryMode?: SourceReplyDeliveryMode; - toolPolicy: { - requireExplicitMessageTarget: boolean; - disableMessageTool: boolean; - forceMessageTool: boolean; - }; + sourceDelivery: SourceDeliveryPlan; skillsSnapshot: SkillSnapshot; agentPayload: AgentTurnPayload; useSubagentFallbacks: boolean; @@ -396,11 +388,9 @@ export async function executeCronRun(params: { thinkLevel: params.thinkLevel, timeoutMs: params.timeoutMs, runTimeoutOverrideMs: params.runTimeoutOverrideMs, - messageChannel: params.resolvedDelivery.channel, suppressExecNotifyOnExit: params.suppressExecNotifyOnExit, resolvedDelivery: params.resolvedDelivery, - sourceReplyDeliveryMode: params.sourceReplyDeliveryMode, - toolPolicy: params.toolPolicy, + sourceDelivery: params.sourceDelivery, skillsSnapshot: params.skillsSnapshot, agentPayload: params.agentPayload, useSubagentFallbacks: params.useSubagentFallbacks, diff --git a/src/cron/isolated-agent/run.message-tool-policy.test.ts b/src/cron/isolated-agent/run.message-tool-policy.test.ts index 2470af29a23..69c32d00f0a 100644 --- a/src/cron/isolated-agent/run.message-tool-policy.test.ts +++ b/src/cron/isolated-agent/run.message-tool-policy.test.ts @@ -1,5 +1,6 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest"; import type { SkillSnapshot } from "../../agents/skills.js"; +import { createSourceDeliveryPlan } from "../../infra/outbound/source-delivery-plan.js"; import type { CronDeliveryMode } from "../types.js"; import type { MutableCronSession } from "./run-session-state.js"; import { @@ -249,7 +250,18 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { expect(dispatchCronDeliveryMock).toHaveBeenCalledTimes(1); expectDispatchFields({ deliveryRequested: true, - skipMessagingToolDelivery: true, + sourceDeliveryOutcome: { + visibleDeliveries: [ + { + via: "message_tool", + verifiedTarget: true, + target: { tool: "message", provider: "messagechat", to: "123" }, + }, + ], + verifiedMessageToolDelivery: true, + satisfiesSourceDelivery: true, + unverifiedMessageToolDelivery: false, + }, }); expectDeliveryFields(result.delivery, { intended: { channel: "messagechat", to: "123", source: "explicit" }, @@ -320,13 +332,20 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { thinkLevel: undefined, timeoutMs: 60_000, senderIsOwner: true, - messageChannel: "messagechat", suppressExecNotifyOnExit: true, - toolPolicy: { - requireExplicitMessageTarget: false, - disableMessageTool: false, - forceMessageTool: true, - }, + sourceDelivery: createSourceDeliveryPlan({ + owner: "message_tool_then_direct_fallback", + reason: "cron_announce", + target: { + channel: resolvedDelivery.channel ?? "messagechat", + to: resolvedDelivery.to, + accountId: resolvedDelivery.accountId, + threadId: resolvedDelivery.threadId, + }, + messageToolEnabled: true, + messageToolForced: true, + directFallback: true, + }), skillsSnapshot: emptySkillsSnapshot, agentPayload: null, useSubagentFallbacks: false, @@ -466,6 +485,64 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { }); }); + it("marks delivery.mode none delivered when the message tool sends to the explicit target", async () => { + mockRunCronFallbackPassthrough(); + resolveCronDeliveryPlanMock.mockReturnValue({ + requested: false, + mode: "none", + channel: "topicchat", + to: "room#42", + threadId: 42, + }); + resolveDeliveryTargetMock.mockResolvedValue({ + ok: true, + channel: "topicchat", + to: "room#42", + threadId: 42, + accountId: undefined, + error: undefined, + }); + runEmbeddedPiAgentMock.mockResolvedValue( + makeMessageToolRunResult([ + { tool: "message", provider: "topicchat", to: "room#42", threadId: "42" }, + ]), + ); + + const result = await runCronIsolatedAgentTurn({ + ...makeParams(), + job: makeMessageToolPolicyJob({ + mode: "none", + channel: "topicchat", + to: "room#42", + threadId: 42, + }), + }); + + expectDispatchFields({ + deliveryRequested: false, + sourceDeliveryOutcome: { + visibleDeliveries: [ + { + via: "message_tool", + verifiedTarget: true, + target: { tool: "message", provider: "topicchat", to: "room#42", threadId: "42" }, + }, + ], + verifiedMessageToolDelivery: true, + satisfiesSourceDelivery: false, + unverifiedMessageToolDelivery: false, + }, + }); + expect(result.delivered).toBe(true); + expect(result.deliveryAttempted).toBe(true); + expectDeliveryFields(result.delivery, { + intended: { channel: "topicchat", to: "room#42", threadId: 42, source: "explicit" }, + messageToolSentTo: [{ channel: "topicchat", to: "room#42", threadId: "42" }], + fallbackUsed: false, + delivered: true, + }); + }); + it('does not resolve implicit "last" context for bare delivery.mode none', async () => { mockRunCronFallbackPassthrough(); resolveCronDeliveryPlanMock.mockReturnValue({ @@ -513,8 +590,8 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { it("forwards explicit message targets into the embedded run", async () => { mockRunCronFallbackPassthrough(); const executor = createMessageToolExecutor({ - messageChannel: "topicchat", resolvedDelivery: { + channel: "topicchat", accountId: "ops", to: "room#42", threadId: 42, @@ -536,8 +613,8 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { it("lets channels build currentChannelId from split delivery fields", async () => { mockRunCronFallbackPassthrough(); const executor = createMessageToolExecutor({ - messageChannel: "topicchat", resolvedDelivery: { + channel: "topicchat", accountId: "ops", to: "room", threadId: 42, @@ -787,8 +864,18 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { expect(dispatchCronDeliveryMock).toHaveBeenCalledTimes(1); expectDispatchFields({ deliveryRequested: true, - skipMessagingToolDelivery: false, - unverifiedMessagingToolDelivery: true, + sourceDeliveryOutcome: { + visibleDeliveries: [ + { + via: "message_tool", + verifiedTarget: false, + target: { tool: "message", provider: "messagechat", to: "123" }, + }, + ], + verifiedMessageToolDelivery: false, + satisfiesSourceDelivery: false, + unverifiedMessageToolDelivery: true, + }, }); const delivery = expectDeliveryFields(result.delivery, { intended: { channel: "last", to: null, source: "last" }, @@ -823,8 +910,18 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { expect(dispatchCronDeliveryMock).toHaveBeenCalledTimes(1); expectDispatchFields({ deliveryRequested: false, - skipMessagingToolDelivery: false, - unverifiedMessagingToolDelivery: true, + sourceDeliveryOutcome: { + visibleDeliveries: [ + { + via: "message_tool", + verifiedTarget: false, + target: { tool: "message", provider: "messagechat", to: "123" }, + }, + ], + verifiedMessageToolDelivery: false, + satisfiesSourceDelivery: false, + unverifiedMessageToolDelivery: true, + }, }); expect(result.delivered).toBe(false); expect(result.deliveryAttempted).toBe(false); diff --git a/src/cron/isolated-agent/run.test-harness.ts b/src/cron/isolated-agent/run.test-harness.ts index dad00c87887..9fba621f2b6 100644 --- a/src/cron/isolated-agent/run.test-harness.ts +++ b/src/cron/isolated-agent/run.test-harness.ts @@ -519,22 +519,22 @@ function resetRunOutcomeMocks(): void { synthesizedText, deliveryRequested, skipHeartbeatDelivery, - skipMessagingToolDelivery, + sourceDeliveryOutcome, resolvedDelivery, }) => ({ result: undefined, delivered: Boolean( - skipMessagingToolDelivery || + sourceDeliveryOutcome?.verifiedMessageToolDelivery || (deliveryRequested && !skipHeartbeatDelivery && - !skipMessagingToolDelivery && + !sourceDeliveryOutcome?.satisfiesSourceDelivery && resolvedDelivery.ok), ), deliveryAttempted: Boolean( - skipMessagingToolDelivery || + sourceDeliveryOutcome?.verifiedMessageToolDelivery || (deliveryRequested && !skipHeartbeatDelivery && - !skipMessagingToolDelivery && + !sourceDeliveryOutcome?.satisfiesSourceDelivery && resolvedDelivery.ok), ), summary, diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 99c6de99677..81dee4358d1 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -2,14 +2,18 @@ import { hasAnyAuthProfileStoreSource } from "../../agents/auth-profiles/source- import { resolveAgentHarnessPolicy } from "../../agents/harness/selection.js"; import { listOpenAIAuthProfileProvidersForAgentRuntime } from "../../agents/openai-codex-routing.js"; import { retireSessionMcpRuntime } from "../../agents/pi-bundle-mcp-tools.js"; -import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.types.js"; import type { SkillSnapshot } from "../../agents/skills.js"; -import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js"; import type { ThinkLevel } from "../../auto-reply/thinking.js"; import type { CliDeps } from "../../cli/outbound-send-deps.js"; import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; -import { stringifyRouteThreadId } from "../../plugin-sdk/channel-route.js"; +import { + createSourceDeliveryPlan, + resolveSourceDeliveryOutcome, + type SourceDeliveryOutcome, + type SourceDeliveryPlan, + type SourceDeliveryVisibleDelivery, +} from "../../infra/outbound/source-delivery-plan.js"; import { isCommandLaneTaskTimeoutError } from "../../process/command-queue.js"; import { CommandLane } from "../../process/lanes.js"; import { createLazyImportLoader } from "../../shared/lazy-promise.js"; @@ -194,28 +198,17 @@ function normalizeCronTraceTarget( }; } -type MessagingToolTargetMatcher = ( - target: { provider?: string; to?: string; accountId?: string }, - delivery: { channel?: string; to?: string; accountId?: string }, -) => boolean; - function normalizeMessagingToolTarget( - target: MessagingToolSend, + delivery: SourceDeliveryVisibleDelivery, resolvedDelivery: ResolvedCronDeliveryTarget, - matchesMessagingToolDeliveryTarget: MessagingToolTargetMatcher, ): CronDeliveryTraceMessageTarget | undefined { + const { target } = delivery; const channel = target.provider?.trim(); if (!channel) { return undefined; } const traceChannel = - channel === "message" && - resolvedDelivery.ok && - matchesMessagingToolDeliveryTarget(target, { - channel: resolvedDelivery.channel, - to: resolvedDelivery.to, - accountId: resolvedDelivery.accountId, - }) + channel === "message" && resolvedDelivery.ok && delivery.verifiedTarget ? resolvedDelivery.channel : channel; return { @@ -257,8 +250,7 @@ function buildResolvedCronTraceTarget( function buildCronDeliveryTrace(params: { deliveryPlan: CronDeliveryPlan; resolvedDelivery: ResolvedCronDeliveryTarget; - messagingToolSentTargets: MessagingToolSend[]; - matchesMessagingToolDeliveryTarget: MessagingToolTargetMatcher; + sourceDeliveryOutcome: SourceDeliveryOutcome; fallbackUsed: boolean; delivered: boolean; }): CronDeliveryTrace { @@ -275,14 +267,8 @@ function buildCronDeliveryTrace(params: { const resolved = includeResolved ? buildResolvedCronTraceTarget(params.resolvedDelivery) : undefined; - const messageToolSentTo = params.messagingToolSentTargets - .map((target) => - normalizeMessagingToolTarget( - target, - params.resolvedDelivery, - params.matchesMessagingToolDeliveryTarget, - ), - ) + const messageToolSentTo = params.sourceDeliveryOutcome.visibleDeliveries + .map((delivery) => normalizeMessagingToolTarget(delivery, params.resolvedDelivery)) .filter((target): target is CronDeliveryTraceMessageTarget => Boolean(target)); return { ...(intended ? { intended } : {}), @@ -293,60 +279,50 @@ function buildCronDeliveryTrace(params: { }; } -function resolveMessagingToolSentTargets(params: { - resolvedDelivery: ResolvedCronDeliveryTarget; - runResult: CronExecutionResult["runResult"]; -}): MessagingToolSend[] { - const explicitTargets = params.runResult.messagingToolSentTargets ?? []; - if (explicitTargets.length > 0 || params.runResult.didSendViaMessagingTool !== true) { - return explicitTargets; - } - if (!params.resolvedDelivery.ok) { - return []; - } - const threadId = stringifyRouteThreadId(params.resolvedDelivery.threadId); - return [ - { - tool: "message", - provider: params.resolvedDelivery.channel, - ...(params.resolvedDelivery.accountId - ? { accountId: params.resolvedDelivery.accountId } - : {}), - ...(params.resolvedDelivery.to ? { to: params.resolvedDelivery.to } : {}), - ...(threadId ? { threadId } : {}), - }, - ]; -} - -function resolveCronToolPolicy(params: { deliveryMode: "announce" | "webhook" | "none" }) { - const enableMessageTool = params.deliveryMode !== "webhook"; - return { - requireExplicitMessageTarget: false, - disableMessageTool: !enableMessageTool, - forceMessageTool: enableMessageTool, - }; -} - -function resolveCronSourceReplyDeliveryMode(params: { +function resolveCronSourceDeliveryPlan(params: { deliveryPlan: CronDeliveryPlan; resolvedDelivery: ResolvedCronDeliveryTarget; - toolPolicy: ReturnType; -}): SourceReplyDeliveryMode | undefined { - if ( - params.deliveryPlan.mode !== "announce" || - params.toolPolicy.disableMessageTool || - !params.resolvedDelivery.ok - ) { - return undefined; +}): SourceDeliveryPlan { + const target = { + channel: params.resolvedDelivery.channel, + to: params.resolvedDelivery.to, + accountId: params.resolvedDelivery.accountId, + threadId: params.resolvedDelivery.threadId, + }; + if (params.deliveryPlan.mode === "webhook") { + return createSourceDeliveryPlan({ + owner: "none", + reason: "cron_webhook", + messageToolEnabled: false, + directFallback: false, + }); } - return "message_tool_only"; + if (params.deliveryPlan.mode === "none") { + return createSourceDeliveryPlan({ + owner: "none", + reason: "cron_none", + target, + messageToolEnabled: true, + messageToolForced: true, + directFallback: false, + }); + } + return createSourceDeliveryPlan({ + owner: params.resolvedDelivery.ok ? "message_tool_then_direct_fallback" : "direct_fallback", + reason: "cron_announce", + target, + messageToolEnabled: true, + messageToolForced: true, + directFallback: true, + skipFallbackWhenMessageToolSentToTarget: params.resolvedDelivery.ok, + }); } function canPromptForMessageTool(params: { - disableMessageTool: boolean; + sourceDelivery: SourceDeliveryPlan; toolsAllow?: string[]; }): boolean { - if (params.disableMessageTool) { + if (!params.sourceDelivery.messageTool.enabled) { return false; } return !params.toolsAllow?.length || params.toolsAllow.includes("message"); @@ -378,27 +354,24 @@ async function resolveCronDeliveryContext(params: { deliveryPlan, deliveryRequested: deliveryPlan.requested, resolvedDelivery, - toolPolicy: resolveCronToolPolicy({ - deliveryMode: deliveryPlan.mode, - }), + sourceDelivery: resolveCronSourceDeliveryPlan({ deliveryPlan, resolvedDelivery }), }; } if (deliveryPlan.mode === "none" && !hasExplicitCronDeliveryTarget(deliveryPlan)) { + const resolvedDelivery = { + ok: false as const, + channel: undefined, + to: undefined, + accountId: undefined, + threadId: undefined, + mode: "implicit" as const, + error: new Error("delivery is disabled"), + }; return { deliveryPlan, deliveryRequested: false, - resolvedDelivery: { - ok: false as const, - channel: undefined, - to: undefined, - accountId: undefined, - threadId: undefined, - mode: "implicit" as const, - error: new Error("delivery is disabled"), - }, - toolPolicy: resolveCronToolPolicy({ - deliveryMode: deliveryPlan.mode, - }), + resolvedDelivery, + sourceDelivery: resolveCronSourceDeliveryPlan({ deliveryPlan, resolvedDelivery }), }; } const { resolveDeliveryTarget } = await loadCronDeliveryRuntime(); @@ -413,9 +386,7 @@ async function resolveCronDeliveryContext(params: { deliveryPlan, deliveryRequested: deliveryPlan.requested, resolvedDelivery, - toolPolicy: resolveCronToolPolicy({ - deliveryMode: deliveryPlan.mode, - }), + sourceDelivery: resolveCronSourceDeliveryPlan({ deliveryPlan, resolvedDelivery }), }; } @@ -486,10 +457,9 @@ type PreparedCronRunContext = { deliveryPlan: CronDeliveryPlan; resolvedDelivery: ResolvedCronDeliveryTarget; deliveryRequested: boolean; - sourceReplyDeliveryMode?: SourceReplyDeliveryMode; + sourceDelivery: SourceDeliveryPlan; suppressExecNotifyOnExit: boolean; senderIsOwner: boolean; - toolPolicy: ReturnType; skillsSnapshot: SkillSnapshot; liveSelection: CronLiveSelection; useSubagentFallbacks: boolean; @@ -712,17 +682,12 @@ async function prepareCronRunContext(params: { ? explicitTimeoutSeconds * 1000 : undefined; const agentPayload = input.job.payload.kind === "agentTurn" ? input.job.payload : null; - const { deliveryPlan, deliveryRequested, resolvedDelivery, toolPolicy } = + const { deliveryPlan, deliveryRequested, resolvedDelivery, sourceDelivery } = await resolveCronDeliveryContext({ cfg: cfgWithAgentDefaults, job: input.job, agentId, }); - const sourceReplyDeliveryMode = resolveCronSourceReplyDeliveryMode({ - deliveryPlan, - resolvedDelivery, - toolPolicy, - }); const { formattedTime, timeLine } = resolveCronStyleNow(input.cfg, now); const base = `[cron:${input.job.id} ${input.job.name}] ${input.message}`.trim(); @@ -763,7 +728,7 @@ async function prepareCronRunContext(params: { commandBody, deliveryRequested, messageToolEnabled: canPromptForMessageTool({ - disableMessageTool: toolPolicy.disableMessageTool, + sourceDelivery, toolsAllow: agentPayload?.toolsAllow, }), resolvedDeliveryOk: resolvedDelivery.ok, @@ -855,10 +820,9 @@ async function prepareCronRunContext(params: { deliveryPlan, resolvedDelivery, deliveryRequested, - sourceReplyDeliveryMode, + sourceDelivery, suppressExecNotifyOnExit: deliveryPlan.mode === "none", senderIsOwner: !isExternalHook, - toolPolicy, skillsSnapshot, liveSelection, useSubagentFallbacks, @@ -1038,27 +1002,11 @@ async function finalizeCronRun(params: { prepared.deliveryRequested && !hasFatalErrorPayload && isHeartbeatOnlyResponse(deliveryPayloads, resolveHeartbeatAckMaxChars(prepared.agentCfg)); - const { - dispatchCronDelivery, - matchesMessagingToolDeliveryTarget, - resolveCronDeliveryBestEffort, - } = await loadCronDeliveryRuntime(); - const messagingToolSentTargets = resolveMessagingToolSentTargets({ - resolvedDelivery: prepared.resolvedDelivery, - runResult: finalRunResult, + const { dispatchCronDelivery, resolveCronDeliveryBestEffort } = await loadCronDeliveryRuntime(); + const sourceDeliveryOutcome = resolveSourceDeliveryOutcome(prepared.sourceDelivery, { + didSendViaMessageTool: finalRunResult.didSendViaMessagingTool, + messageToolSentTargets: finalRunResult.messagingToolSentTargets, }); - const didSendViaMessagingTool = - finalRunResult.didSendViaMessagingTool === true && messagingToolSentTargets.length > 0; - const skipMessagingToolDelivery = - didSendViaMessagingTool && - prepared.resolvedDelivery.ok && - messagingToolSentTargets.some((target) => - matchesMessagingToolDeliveryTarget(target, { - channel: prepared.resolvedDelivery.channel, - to: prepared.resolvedDelivery.to, - accountId: prepared.resolvedDelivery.accountId, - }), - ); const deliveryResult = await dispatchCronDelivery({ cfg: prepared.input.cfg, cfgWithAgentDefaults: prepared.cfgWithAgentDefaults, @@ -1074,8 +1022,7 @@ async function finalizeCronRun(params: { resolvedDelivery: prepared.resolvedDelivery, deliveryRequested: prepared.deliveryRequested, skipHeartbeatDelivery, - skipMessagingToolDelivery, - unverifiedMessagingToolDelivery: didSendViaMessagingTool && !prepared.resolvedDelivery.ok, + sourceDeliveryOutcome, deliveryBestEffort: resolveCronDeliveryBestEffort(prepared.input.job), deliveryPayloadHasStructuredContent, deliveryPayloads, @@ -1092,9 +1039,11 @@ async function finalizeCronRun(params: { const deliveryTrace = buildCronDeliveryTrace({ deliveryPlan: prepared.deliveryPlan, resolvedDelivery: prepared.resolvedDelivery, - messagingToolSentTargets, - matchesMessagingToolDeliveryTarget, - fallbackUsed: deliveryResult.deliveryAttempted && !skipMessagingToolDelivery, + sourceDeliveryOutcome, + fallbackUsed: + prepared.deliveryRequested && + deliveryResult.deliveryAttempted && + !sourceDeliveryOutcome.satisfiesSourceDelivery, delivered: deliveryResult.delivered, }); if (deliveryResult.result) { @@ -1202,8 +1151,7 @@ export async function runCronIsolatedAgentTurn(params: { accountId: prepared.context.resolvedDelivery.accountId, threadId: prepared.context.resolvedDelivery.threadId, }, - sourceReplyDeliveryMode: prepared.context.sourceReplyDeliveryMode, - toolPolicy: prepared.context.toolPolicy, + sourceDelivery: prepared.context.sourceDelivery, skillsSnapshot: prepared.context.skillsSnapshot, agentPayload: prepared.context.agentPayload, useSubagentFallbacks: prepared.context.useSubagentFallbacks, diff --git a/src/infra/outbound/source-delivery-plan.test.ts b/src/infra/outbound/source-delivery-plan.test.ts new file mode 100644 index 00000000000..24e2cabf71a --- /dev/null +++ b/src/infra/outbound/source-delivery-plan.test.ts @@ -0,0 +1,210 @@ +import { describe, expect, it } from "vitest"; +import { + createSourceDeliveryPlan, + resolveSourceDeliveryOutcome, + sourceDeliveryTargetsMatch, +} from "./source-delivery-plan.js"; + +describe("source delivery plan", () => { + it("projects message-tool-owned delivery to existing source reply and message tool fields", () => { + const contract = createSourceDeliveryPlan({ + owner: "message_tool_then_direct_fallback", + reason: "cron_announce", + target: { channel: "discord", to: "channel:123", accountId: "bot-a" }, + }); + + expect(contract.sourceReplyDeliveryMode).toBe("message_tool_only"); + expect(contract.normalFinal).toBe("private"); + expect(contract.fallback.skipWhenMessageToolSentToTarget).toBe(true); + expect(contract.messageTool).toMatchObject({ + requireExplicitTarget: false, + enabled: true, + force: true, + }); + }); + + it("keeps direct fallback delivery compatible with automatic final payload handling", () => { + const contract = createSourceDeliveryPlan({ + owner: "direct_fallback", + reason: "cron_announce", + target: { channel: "discord", to: "channel:123" }, + messageToolEnabled: true, + messageToolForced: true, + directFallback: true, + skipFallbackWhenMessageToolSentToTarget: false, + }); + + expect(contract.sourceReplyDeliveryMode).toBeUndefined(); + expect(contract.normalFinal).toBe("visible"); + expect(contract.fallback.directDelivery).toBe(true); + expect(contract.fallback.skipWhenMessageToolSentToTarget).toBe(false); + expect(contract.messageTool).toMatchObject({ + requireExplicitTarget: false, + enabled: true, + force: true, + }); + }); + + it("normalizes message-tool delivery outcomes against the planned source target", () => { + const contract = createSourceDeliveryPlan({ + owner: "message_tool_then_direct_fallback", + reason: "cron_announce", + target: { channel: "feishu", to: "oc_123", accountId: "bot-a", threadId: 456 }, + }); + + const outcome = resolveSourceDeliveryOutcome(contract, { + didSendViaMessageTool: true, + messageToolSentTargets: [ + { + tool: "message", + provider: "message", + accountId: "bot-a", + to: "oc_123:topic:456", + text: "done", + }, + ], + }); + + expect(outcome.satisfiesSourceDelivery).toBe(true); + expect(outcome.verifiedMessageToolDelivery).toBe(true); + expect(outcome.unverifiedMessageToolDelivery).toBe(false); + expect(outcome.visibleDeliveries).toEqual([ + { + via: "message_tool", + verifiedTarget: true, + target: { + tool: "message", + provider: "message", + accountId: "bot-a", + to: "oc_123:topic:456", + text: "done", + }, + }, + ]); + }); + + it("keeps unverified message-tool sends visible to fallback/error handling", () => { + const contract = createSourceDeliveryPlan({ + owner: "message_tool_then_direct_fallback", + reason: "cron_announce", + target: { channel: "slack", to: "channel:C1" }, + }); + + const outcome = resolveSourceDeliveryOutcome(contract, { + didSendViaMessageTool: true, + messageToolSentTargets: [{ tool: "message", provider: "slack", to: "channel:C2" }], + }); + + expect(outcome.satisfiesSourceDelivery).toBe(false); + expect(outcome.verifiedMessageToolDelivery).toBe(false); + expect(outcome.unverifiedMessageToolDelivery).toBe(true); + expect(outcome.visibleDeliveries[0]?.verifiedTarget).toBe(false); + }); + + it("keeps verified message-tool delivery separate from source fallback satisfaction", () => { + const contract = createSourceDeliveryPlan({ + owner: "none", + reason: "cron_none", + target: { channel: "slack", to: "channel:C1" }, + messageToolEnabled: true, + messageToolForced: true, + directFallback: false, + }); + + const outcome = resolveSourceDeliveryOutcome(contract, { + didSendViaMessageTool: true, + messageToolSentTargets: [{ tool: "message", provider: "slack", to: "channel:C1" }], + }); + + expect(outcome.verifiedMessageToolDelivery).toBe(true); + expect(outcome.satisfiesSourceDelivery).toBe(false); + expect(outcome.unverifiedMessageToolDelivery).toBe(false); + }); + + it("does not satisfy delivery from target metadata without a committed message-tool send", () => { + const contract = createSourceDeliveryPlan({ + owner: "message_tool_then_direct_fallback", + reason: "cron_announce", + target: { channel: "slack", to: "channel:C1" }, + }); + + const outcome = resolveSourceDeliveryOutcome(contract, { + didSendViaMessageTool: false, + messageToolSentTargets: [{ tool: "message", provider: "slack", to: "channel:C1" }], + }); + + expect(outcome.visibleDeliveries[0]?.verifiedTarget).toBe(true); + expect(outcome.verifiedMessageToolDelivery).toBe(false); + expect(outcome.satisfiesSourceDelivery).toBe(false); + expect(outcome.unverifiedMessageToolDelivery).toBe(false); + }); + + it("does not synthesize an implicit target without a concrete recipient", () => { + const contract = createSourceDeliveryPlan({ + owner: "direct_fallback", + reason: "cron_announce", + target: { channel: "slack" }, + messageToolEnabled: true, + messageToolForced: true, + directFallback: true, + skipFallbackWhenMessageToolSentToTarget: false, + }); + + const outcome = resolveSourceDeliveryOutcome(contract, { + didSendViaMessageTool: true, + }); + + expect(outcome.visibleDeliveries).toEqual([]); + expect(outcome.verifiedMessageToolDelivery).toBe(false); + expect(outcome.satisfiesSourceDelivery).toBe(false); + expect(outcome.unverifiedMessageToolDelivery).toBe(false); + }); + + it("matches source targets through the same provider normalization used by delivery", () => { + expect( + sourceDeliveryTargetsMatch( + { provider: "message", to: "channel:C1" }, + { channel: "slack", to: "channel:C1" }, + ), + ).toBe(true); + expect( + sourceDeliveryTargetsMatch( + { provider: "discord", to: "channel:C1" }, + { channel: "slack", to: "channel:C1" }, + ), + ).toBe(false); + }); + + it("matches threaded delivery only with explicit or supported implicit thread evidence", () => { + expect( + sourceDeliveryTargetsMatch( + { provider: "telegram", to: "-100:topic:462" }, + { channel: "telegram", to: "-100", threadId: 462 }, + ), + ).toBe(true); + expect( + sourceDeliveryTargetsMatch( + { provider: "telegram", to: "-100" }, + { channel: "telegram", to: "-100", threadId: 462 }, + ), + ).toBe(false); + expect( + sourceDeliveryTargetsMatch( + { provider: "telegram", to: "-100", threadImplicit: true }, + { channel: "telegram", to: "-100", threadId: 462 }, + ), + ).toBe(true); + expect( + sourceDeliveryTargetsMatch( + { provider: "telegram", to: "-100", threadImplicit: true, threadSuppressed: true }, + { channel: "telegram", to: "-100", threadId: 462 }, + ), + ).toBe(false); + expect( + sourceDeliveryTargetsMatch( + { provider: "telegram", to: "-100", threadId: "111" }, + { channel: "telegram", to: "-100", threadId: 462 }, + ), + ).toBe(false); + }); +}); diff --git a/src/infra/outbound/source-delivery-plan.ts b/src/infra/outbound/source-delivery-plan.ts new file mode 100644 index 00000000000..929a1b0df5a --- /dev/null +++ b/src/infra/outbound/source-delivery-plan.ts @@ -0,0 +1,221 @@ +import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js"; +import { stringifyRouteThreadId } from "../../plugin-sdk/channel-route.js"; +import { normalizeTargetForProvider } from "./target-normalization.js"; + +export type SourceVisibleDeliveryOwner = + | "automatic_source" + | "message_tool" + | "message_tool_then_direct_fallback" + | "direct_fallback" + | "none"; + +export type SourceDeliveryPlanReason = + | "config" + | "room_event" + | "cron_announce" + | "cron_webhook" + | "cron_none" + | "media_completion" + | "subagent_completion"; + +export type SourceDeliveryTarget = { + channel?: string; + to?: string; + accountId?: string; + threadId?: string | number; +}; + +export type SourceDeliveryMessageToolTarget = { + tool?: string; + provider?: string; + accountId?: string; + to?: string; + threadId?: string; + threadImplicit?: boolean; + threadSuppressed?: boolean; + text?: string; + mediaUrls?: string[]; +}; + +export type SourceDeliveryVisibleDelivery = { + via: "message_tool"; + target: SourceDeliveryMessageToolTarget; + verifiedTarget: boolean; +}; + +export type SourceDeliveryOutcome = { + visibleDeliveries: SourceDeliveryVisibleDelivery[]; + verifiedMessageToolDelivery: boolean; + satisfiesSourceDelivery: boolean; + unverifiedMessageToolDelivery: boolean; +}; + +export type SourceDeliveryPlan = { + owner: SourceVisibleDeliveryOwner; + reason: SourceDeliveryPlanReason; + target: SourceDeliveryTarget; + normalFinal: "visible" | "private"; + sourceReplyDeliveryMode?: SourceReplyDeliveryMode; + messageTool: { + enabled: boolean; + force: boolean; + requireExplicitTarget: boolean; + defaultTarget: boolean; + }; + fallback: { + directDelivery: boolean; + skipWhenMessageToolSentToTarget: boolean; + bestEffort: boolean; + }; + progress: { + allowCallbacksWhenSourceDeliverySuppressed: boolean; + }; +}; + +function isMessageToolOwnedDelivery(owner: SourceVisibleDeliveryOwner): boolean { + return owner === "message_tool" || owner === "message_tool_then_direct_fallback"; +} + +function normalizeDeliveryTarget(channel: string, to: string): string { + const toTrimmed = to.trim(); + return normalizeTargetForProvider(channel, toTrimmed) ?? toTrimmed; +} + +function normalizeDeliveryThreadId(threadId: string | number | undefined): string | undefined { + return stringifyRouteThreadId(threadId)?.trim() || undefined; +} + +function extractTopicThreadId(targetTo: string): string | undefined { + return targetTo.match(/:topic:(\d+)$/i)?.[1]; +} + +export function sourceDeliveryTargetsMatch( + target: SourceDeliveryMessageToolTarget, + delivery: SourceDeliveryTarget, +): boolean { + if (!delivery.channel || !delivery.to || !target.to) { + return false; + } + const channel = delivery.channel.trim().toLowerCase(); + const provider = target.provider?.trim().toLowerCase(); + if (provider && provider !== "message" && provider !== channel) { + return false; + } + if (delivery.accountId && target.accountId && target.accountId !== delivery.accountId) { + return false; + } + // Strip :topic:NNN from message targets and normalize Feishu/Lark prefixes on + // both sides so source-delivery suppression compares canonical IDs. + const normalizedTargetTo = normalizeDeliveryTarget(channel, target.to.replace(/:topic:\d+$/, "")); + const normalizedDeliveryTo = normalizeDeliveryTarget(channel, delivery.to); + if (normalizedTargetTo !== normalizedDeliveryTo) { + return false; + } + const deliveryThreadId = normalizeDeliveryThreadId(delivery.threadId); + const targetThreadId = + normalizeDeliveryThreadId(target.threadId) ?? extractTopicThreadId(target.to); + if (!deliveryThreadId && !targetThreadId) { + return true; + } + if (deliveryThreadId && !targetThreadId) { + return target.threadImplicit === true && target.threadSuppressed !== true; + } + return deliveryThreadId === targetThreadId; +} + +export function createSourceDeliveryPlan(params: { + owner: SourceVisibleDeliveryOwner; + reason: SourceDeliveryPlanReason; + target?: SourceDeliveryTarget; + messageToolEnabled?: boolean; + messageToolForced?: boolean; + requireExplicitMessageTarget?: boolean; + directFallback?: boolean; + skipFallbackWhenMessageToolSentToTarget?: boolean; + fallbackBestEffort?: boolean; + allowProgressCallbacksWhenSourceDeliverySuppressed?: boolean; +}): SourceDeliveryPlan { + const messageToolOwnsDelivery = isMessageToolOwnedDelivery(params.owner); + const sourceReplyDeliveryMode = messageToolOwnsDelivery ? "message_tool_only" : undefined; + const directDelivery = + params.directFallback ?? + (params.owner === "direct_fallback" || params.owner === "message_tool_then_direct_fallback"); + return { + owner: params.owner, + reason: params.reason, + target: params.target ?? {}, + normalFinal: + sourceReplyDeliveryMode === "message_tool_only" || params.owner === "none" + ? "private" + : "visible", + sourceReplyDeliveryMode, + messageTool: { + enabled: params.messageToolEnabled ?? messageToolOwnsDelivery, + force: params.messageToolForced ?? messageToolOwnsDelivery, + requireExplicitTarget: params.requireExplicitMessageTarget ?? false, + defaultTarget: Boolean(params.target?.channel || params.target?.to), + }, + fallback: { + directDelivery, + skipWhenMessageToolSentToTarget: + params.skipFallbackWhenMessageToolSentToTarget ?? + params.owner === "message_tool_then_direct_fallback", + bestEffort: params.fallbackBestEffort ?? false, + }, + progress: { + allowCallbacksWhenSourceDeliverySuppressed: + params.allowProgressCallbacksWhenSourceDeliverySuppressed ?? false, + }, + }; +} + +function resolveImplicitMessageToolDeliveryTarget( + plan: SourceDeliveryPlan, +): SourceDeliveryMessageToolTarget | undefined { + if (!plan.target.channel || !plan.target.to) { + return undefined; + } + const threadId = stringifyRouteThreadId(plan.target.threadId); + return { + tool: "message", + provider: plan.target.channel, + ...(plan.target.accountId ? { accountId: plan.target.accountId } : {}), + ...(plan.target.to ? { to: plan.target.to } : {}), + ...(threadId ? { threadId } : {}), + }; +} + +export function resolveSourceDeliveryOutcome( + plan: SourceDeliveryPlan, + params: { + didSendViaMessageTool?: boolean; + messageToolSentTargets?: SourceDeliveryMessageToolTarget[]; + }, +): SourceDeliveryOutcome { + const didSendViaMessageTool = params.didSendViaMessageTool === true; + const explicitTargets = params.messageToolSentTargets ?? []; + const sentTargets = + explicitTargets.length > 0 + ? explicitTargets + : didSendViaMessageTool + ? [resolveImplicitMessageToolDeliveryTarget(plan)].filter( + (target): target is SourceDeliveryMessageToolTarget => Boolean(target), + ) + : []; + const visibleDeliveries = sentTargets.map((target) => ({ + via: "message_tool" as const, + target, + verifiedTarget: sourceDeliveryTargetsMatch(target, plan.target), + })); + const hasVerifiedMessageToolDelivery = visibleDeliveries.some( + (delivery) => didSendViaMessageTool && delivery.verifiedTarget, + ); + return { + visibleDeliveries, + verifiedMessageToolDelivery: hasVerifiedMessageToolDelivery, + satisfiesSourceDelivery: + plan.fallback.skipWhenMessageToolSentToTarget && hasVerifiedMessageToolDelivery, + unverifiedMessageToolDelivery: + didSendViaMessageTool && sentTargets.length > 0 && !hasVerifiedMessageToolDelivery, + }; +}