refactor(cron): centralize source delivery plan

This commit is contained in:
Peter Steinberger
2026-05-18 02:39:42 +01:00
parent 81b9058cc3
commit 4c613fbfe0
13 changed files with 770 additions and 229 deletions

View File

@@ -6,6 +6,8 @@ export type MessagingToolSend = {
accountId?: string;
to?: string;
threadId?: string;
threadImplicit?: boolean;
threadSuppressed?: boolean;
text?: string;
mediaUrls?: string[];
};

View File

@@ -17,6 +17,7 @@ describe("extractMessagingToolSend", () => {
plugin: {
...createChannelTestPluginBase({ id: "telegram" }),
messaging: { normalizeTarget: normalizeTelegramMessagingTargetForTest },
threading: { resolveAutoThreadId: () => "456" },
},
source: "test",
},
@@ -25,6 +26,18 @@ describe("extractMessagingToolSend", () => {
plugin: {
...createChannelTestPluginBase({ id: "slack" }),
messaging: { normalizeTarget: (raw: string) => raw.trim().toLowerCase() },
actions: {
extractToolSend: (params: { args: Record<string, unknown> }) => {
const { args } = params;
return args.action === "sendMessage" && typeof args.to === "string"
? {
to: args.to,
accountId: typeof args.accountId === "string" ? args.accountId : undefined,
threadId: typeof args.threadId === "string" ? args.threadId : undefined,
}
: null;
},
},
},
source: "test",
},
@@ -119,4 +132,55 @@ describe("extractMessagingToolSend", () => {
expect(result?.to).toBe("channel:123");
expect(result?.threadId).toBe("456");
});
it("records when message sends can inherit the current thread", () => {
const result = extractMessagingToolSend("message", {
action: "send",
provider: "telegram",
to: "123",
content: "done",
});
expect(result?.threadImplicit).toBe(true);
});
it("keeps provider-tool extracted thread id evidence", () => {
const result = extractMessagingToolSend("slack", {
action: "sendMessage",
to: " Channel:C1 ",
threadId: "171.222",
accountId: "bot-a",
content: "done",
});
expect(result).toMatchObject({
tool: "slack",
provider: "slack",
accountId: "bot-a",
to: "channel:c1",
threadId: "171.222",
});
});
it("records when message sends explicitly suppress implicit thread delivery", () => {
const topLevel = extractMessagingToolSend("message", {
action: "send",
provider: "telegram",
to: "123",
topLevel: true,
content: "done",
});
const nullThread = extractMessagingToolSend("message", {
action: "send",
provider: "telegram",
to: "123",
threadId: null,
content: "done",
});
expect(topLevel?.threadSuppressed).toBe(true);
expect(topLevel?.threadImplicit).toBeUndefined();
expect(nullThread?.threadSuppressed).toBe(true);
expect(nullThread?.threadImplicit).toBeUndefined();
});
});

View File

@@ -579,8 +579,21 @@ export function extractMessagingToolSend(
const provider = providerId ?? normalizeOptionalLowercaseString(providerHint) ?? "message";
const to = normalizeTargetForProvider(provider, toRaw);
const threadId = normalizeOptionalString(args.threadId);
const threadSuppressed = args.topLevel === true || args.threadId === null;
const threadImplicit =
!threadId &&
!threadSuppressed &&
Boolean(providerId && getChannelPlugin(providerId)?.threading?.resolveAutoThreadId);
return to
? { tool: toolName, provider, accountId, to, ...(threadId ? { threadId } : {}) }
? {
tool: toolName,
provider,
accountId,
to,
...(threadId ? { threadId } : {}),
...(threadImplicit ? { threadImplicit: true } : {}),
...(threadSuppressed ? { threadSuppressed: true } : {}),
}
: undefined;
}
const providerId = normalizeChannelId(toolName);
@@ -593,12 +606,14 @@ export function extractMessagingToolSend(
return undefined;
}
const to = normalizeTargetForProvider(providerId, extracted.to);
const threadId = normalizeOptionalString(extracted.threadId);
return to
? {
tool: toolName,
provider: providerId,
accountId: extracted.accountId ?? accountId,
to,
...(threadId ? { threadId } : {}),
}
: undefined;
}

View File

@@ -231,6 +231,12 @@ function makeBaseParams(overrides: {
resolvedDelivery,
deliveryRequested: overrides.deliveryRequested ?? true,
skipHeartbeatDelivery: false,
sourceDeliveryOutcome: {
visibleDeliveries: [],
verifiedMessageToolDelivery: false,
satisfiesSourceDelivery: false,
unverifiedMessageToolDelivery: false,
},
deliveryBestEffort: overrides.deliveryBestEffort ?? false,
deliveryPayloadHasStructuredContent: false,
deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [],
@@ -1392,7 +1398,18 @@ describe("dispatchCronDelivery — double-announce guard", () => {
mode: "implicit",
error: new Error("sessionKey is required to resolve delivery.channel=last"),
};
params.unverifiedMessagingToolDelivery = true;
params.sourceDeliveryOutcome = {
visibleDeliveries: [
{
via: "message_tool",
target: { tool: "message", provider: "messagechat", to: "123" },
verifiedTarget: false,
},
],
verifiedMessageToolDelivery: false,
satisfiesSourceDelivery: false,
unverifiedMessageToolDelivery: true,
};
const state = await dispatchCronDelivery(params);

View File

@@ -1,5 +1,5 @@
import { describe, expect, it, vi } from "vitest";
import { matchesMessagingToolDeliveryTarget } from "./delivery-dispatch.js";
import { sourceDeliveryTargetsMatch } from "../../infra/outbound/source-delivery-plan.js";
// Mock the announce flow dependencies to test the fallback behavior.
vi.mock("../../agents/subagent-announce.js", () => ({
@@ -9,10 +9,10 @@ vi.mock("../../agents/subagent-registry-read.js", () => ({
countActiveDescendantRuns: vi.fn().mockReturnValue(0),
}));
describe("matchesMessagingToolDeliveryTarget", () => {
describe("sourceDeliveryTargetsMatch", () => {
it("matches when channel and to agree", () => {
expect(
matchesMessagingToolDeliveryTarget(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "123456" },
{ channel: "telegram", to: "123456" },
),
@@ -21,7 +21,7 @@ describe("matchesMessagingToolDeliveryTarget", () => {
it("rejects when channel differs", () => {
expect(
matchesMessagingToolDeliveryTarget(
sourceDeliveryTargetsMatch(
{ provider: "whatsapp", to: "123456" },
{ channel: "telegram", to: "123456" },
),
@@ -30,7 +30,7 @@ describe("matchesMessagingToolDeliveryTarget", () => {
it("rejects when to is missing from delivery", () => {
expect(
matchesMessagingToolDeliveryTarget(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "123456" },
{ channel: "telegram", to: undefined },
),
@@ -39,25 +39,34 @@ describe("matchesMessagingToolDeliveryTarget", () => {
it("rejects when channel is missing from delivery", () => {
expect(
matchesMessagingToolDeliveryTarget(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "123456" },
{ channel: undefined, to: "123456" },
),
).toBe(false);
});
it("strips :topic:NNN suffix from target.to before comparing", () => {
it("matches topic suffixes against the resolved delivery thread", () => {
expect(
matchesMessagingToolDeliveryTarget(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "-1003597428309:topic:462" },
{ channel: "telegram", to: "-1003597428309" },
{ channel: "telegram", to: "-1003597428309", threadId: 462 },
),
).toBe(true);
});
it("rejects matching room targets when thread ids differ", () => {
expect(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "-1003597428309", threadId: "111" },
{ channel: "telegram", to: "-1003597428309", threadId: 462 },
),
).toBe(false);
});
it("matches when provider is 'message' (generic)", () => {
expect(
matchesMessagingToolDeliveryTarget(
sourceDeliveryTargetsMatch(
{ provider: "message", to: "123456" },
{ channel: "telegram", to: "123456" },
),
@@ -66,7 +75,7 @@ describe("matchesMessagingToolDeliveryTarget", () => {
it("rejects when accountIds differ", () => {
expect(
matchesMessagingToolDeliveryTarget(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "123456", accountId: "bot-a" },
{ channel: "telegram", to: "123456", accountId: "bot-b" },
),
@@ -75,7 +84,7 @@ describe("matchesMessagingToolDeliveryTarget", () => {
it("matches when delivery has accountId and target omits it (tool fills accountId at exec)", () => {
expect(
matchesMessagingToolDeliveryTarget(
sourceDeliveryTargetsMatch(
{ provider: "message", to: "123456" },
{ channel: "telegram", to: "123456", accountId: "bot-a" },
),
@@ -84,7 +93,7 @@ describe("matchesMessagingToolDeliveryTarget", () => {
it("matches when delivery and target carry the same accountId", () => {
expect(
matchesMessagingToolDeliveryTarget(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "123456", accountId: "bot-a" },
{ channel: "telegram", to: "123456", accountId: "bot-a" },
),

View File

@@ -27,6 +27,7 @@ import {
createOutboundPayloadPlan,
projectOutboundPayloadPlanForMirror,
} from "../../infra/outbound/payloads.js";
import type { SourceDeliveryOutcome } from "../../infra/outbound/source-delivery-plan.js";
import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js";
import { hasReplyPayloadContent } from "../../interactive/payload.js";
import { isAudioFileName } from "../../media/mime.js";
@@ -37,11 +38,7 @@ import {
resolveAgentIdFromSessionKey,
} from "../../routing/session-key.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalLowercaseString,
normalizeOptionalString,
} from "../../shared/string-coerce.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { shouldAttemptTtsPayload } from "../../tts/tts-config.js";
import { createCronExecutionId } from "../run-id.js";
import { hasScheduledNextRunAtMs } from "../service/jobs.js";
@@ -89,28 +86,6 @@ function normalizeSilentReplyText(text: string | undefined): NormalizedSilentRep
return { text: next, strippedTrailingSilentToken };
}
export function matchesMessagingToolDeliveryTarget(
target: { provider?: string; to?: string; accountId?: string },
delivery: { channel?: string; to?: string; accountId?: string },
): boolean {
if (!delivery.channel || !delivery.to || !target.to) {
return false;
}
const channel = normalizeLowercaseStringOrEmpty(delivery.channel);
const provider = normalizeOptionalLowercaseString(target.provider);
if (provider && provider !== "message" && provider !== channel) {
return false;
}
if (delivery.accountId && target.accountId && target.accountId !== delivery.accountId) {
return false;
}
// Strip :topic:NNN from message targets and normalize Feishu/Lark prefixes on
// both sides so cron duplicate suppression compares canonical IDs.
const normalizedTargetTo = normalizeDeliveryTarget(channel, target.to.replace(/:topic:\d+$/, ""));
const normalizedDeliveryTo = normalizeDeliveryTarget(channel, delivery.to);
return normalizedTargetTo === normalizedDeliveryTo;
}
export function resolveCronDeliveryBestEffort(job: CronJob): boolean {
return job.delivery?.bestEffort === true;
}
@@ -132,8 +107,7 @@ type DispatchCronDeliveryParams = {
resolvedDelivery: DeliveryTargetResolution;
deliveryRequested: boolean;
skipHeartbeatDelivery: boolean;
skipMessagingToolDelivery?: boolean;
unverifiedMessagingToolDelivery?: boolean;
sourceDeliveryOutcome: SourceDeliveryOutcome;
deliveryBestEffort: boolean;
deliveryPayloadHasStructuredContent: boolean;
deliveryPayloads: ReplyPayload[];
@@ -748,20 +722,18 @@ async function retryTransientDirectCronDelivery<T>(params: {
export async function dispatchCronDelivery(
params: DispatchCronDeliveryParams,
): Promise<DispatchCronDeliveryState> {
const skipMessagingToolDelivery = params.skipMessagingToolDelivery === true;
const sourceDeliverySatisfied = params.sourceDeliveryOutcome.satisfiesSourceDelivery;
const verifiedMessageToolDelivery = params.sourceDeliveryOutcome.verifiedMessageToolDelivery;
let summary = params.summary;
let outputText = params.outputText;
let synthesizedText = params.synthesizedText;
let deliveryPayloads = params.deliveryPayloads;
// Shared callers can treat a matching message-tool send as the completed
// delivery path. Cron-owned callers keep this false so direct cron delivery
// remains the only source of delivered state.
let delivered = skipMessagingToolDelivery;
let deliveryAttempted = skipMessagingToolDelivery;
let delivered = verifiedMessageToolDelivery;
let deliveryAttempted = verifiedMessageToolDelivery;
let directCronSessionDeleted = false;
const formatDeliveryTargetError = (error: string) =>
params.unverifiedMessagingToolDelivery === true
params.sourceDeliveryOutcome.unverifiedMessageToolDelivery
? `${error}; the agent used the message tool, but OpenClaw could not verify that message matched the cron delivery target`
: error;
const failDeliveryTarget = (error: string) =>
@@ -1212,7 +1184,7 @@ export async function dispatchCronDelivery(
return await deliverViaDirectAndCleanup(delivery, { retryTransient: true });
};
if (params.deliveryRequested && !params.skipHeartbeatDelivery && !skipMessagingToolDelivery) {
if (params.deliveryRequested && !params.skipHeartbeatDelivery && !sourceDeliverySatisfied) {
if (!params.resolvedDelivery.ok) {
if (!params.deliveryBestEffort) {
return {

View File

@@ -1,6 +1,2 @@
export { resolveDeliveryTarget } from "./delivery-target.js";
export {
dispatchCronDelivery,
matchesMessagingToolDeliveryTarget,
resolveCronDeliveryBestEffort,
} from "./delivery-dispatch.js";
export { dispatchCronDelivery, resolveCronDeliveryBestEffort } from "./delivery-dispatch.js";

View File

@@ -2,10 +2,10 @@ import type { BootstrapContextMode } from "../../agents/bootstrap-files.js";
import { resolveCliRuntimeExecutionProvider } from "../../agents/model-runtime-aliases.js";
import type { SkillSnapshot } from "../../agents/skills.js";
import { normalizeToolList } from "../../agents/tool-policy.js";
import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js";
import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js";
import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import type { SourceDeliveryPlan } from "../../infra/outbound/source-delivery-plan.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
import type { CronAgentExecutionPhaseUpdate, CronJob } from "../types.js";
import {
@@ -111,19 +111,14 @@ export function createCronPromptExecutor(params: {
/** Set when the cron payload's `timeoutSeconds` was explicitly configured. */
runTimeoutOverrideMs?: number;
senderIsOwner: boolean;
messageChannel: string | undefined;
suppressExecNotifyOnExit: boolean;
resolvedDelivery: {
channel?: string;
accountId?: string;
to?: string;
threadId?: string | number;
};
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
toolPolicy: {
requireExplicitMessageTarget: boolean;
disableMessageTool: boolean;
forceMessageTool: boolean;
};
sourceDelivery: SourceDeliveryPlan;
skillsSnapshot: SkillSnapshot;
agentPayload: AgentTurnPayload;
useSubagentFallbacks: boolean;
@@ -158,6 +153,8 @@ export function createCronPromptExecutor(params: {
params.cronSession.sessionEntry.systemPromptReport,
);
const bootstrapContextMode = resolveCronBootstrapContextMode(params.agentPayload);
const sourceReplyDeliveryMode = params.sourceDelivery.sourceReplyDeliveryMode;
const messageChannel = params.sourceDelivery.target.channel ?? params.resolvedDelivery.channel;
const runPrompt = async (promptText: string) => {
const fallbackResult = await runWithModelFallback({
@@ -217,8 +214,8 @@ export function createCronPromptExecutor(params: {
lane: resolveCronAgentLane(params.lane),
cliSessionId,
skillsSnapshot: params.skillsSnapshot,
messageChannel: params.messageChannel,
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
messageChannel,
sourceReplyDeliveryMode,
abortSignal: params.abortSignal,
onExecutionStarted: params.onExecutionStarted,
onExecutionPhase: params.onExecutionPhase,
@@ -235,7 +232,7 @@ export function createCronPromptExecutor(params: {
}
const { resolveFastModeState, runEmbeddedPiAgent } = await loadCronEmbeddedRuntime();
const currentChannelId = await resolveCurrentChannelTarget({
channel: params.messageChannel,
channel: messageChannel,
to: params.resolvedDelivery.to,
threadId: params.resolvedDelivery.threadId,
});
@@ -251,7 +248,7 @@ export function createCronPromptExecutor(params: {
ownerOnlyToolAllowlist: resolveCronOwnerOnlyToolAllowlist(
params.agentPayload?.toolsAllow,
),
messageChannel: params.messageChannel,
messageChannel,
agentAccountId: params.resolvedDelivery.accountId,
messageTo: params.resolvedDelivery.to,
messageThreadId: params.resolvedDelivery.threadId,
@@ -290,11 +287,11 @@ export function createCronPromptExecutor(params: {
notifyOnExitEmptySuccess: false,
}
: undefined,
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
sourceReplyDeliveryMode,
runId: params.cronSession.sessionEntry.sessionId,
requireExplicitMessageTarget: params.toolPolicy.requireExplicitMessageTarget,
disableMessageTool: params.toolPolicy.disableMessageTool,
forceMessageTool: params.toolPolicy.forceMessageTool,
requireExplicitMessageTarget: params.sourceDelivery.messageTool.requireExplicitTarget,
disableMessageTool: !params.sourceDelivery.messageTool.enabled,
forceMessageTool: params.sourceDelivery.messageTool.force,
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
abortSignal: params.abortSignal,
onExecutionStarted: params.onExecutionStarted,
@@ -344,12 +341,7 @@ export async function executeCronRun(params: {
to?: string;
threadId?: string | number;
};
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
toolPolicy: {
requireExplicitMessageTarget: boolean;
disableMessageTool: boolean;
forceMessageTool: boolean;
};
sourceDelivery: SourceDeliveryPlan;
skillsSnapshot: SkillSnapshot;
agentPayload: AgentTurnPayload;
useSubagentFallbacks: boolean;
@@ -396,11 +388,9 @@ export async function executeCronRun(params: {
thinkLevel: params.thinkLevel,
timeoutMs: params.timeoutMs,
runTimeoutOverrideMs: params.runTimeoutOverrideMs,
messageChannel: params.resolvedDelivery.channel,
suppressExecNotifyOnExit: params.suppressExecNotifyOnExit,
resolvedDelivery: params.resolvedDelivery,
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
toolPolicy: params.toolPolicy,
sourceDelivery: params.sourceDelivery,
skillsSnapshot: params.skillsSnapshot,
agentPayload: params.agentPayload,
useSubagentFallbacks: params.useSubagentFallbacks,

View File

@@ -1,5 +1,6 @@
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import type { SkillSnapshot } from "../../agents/skills.js";
import { createSourceDeliveryPlan } from "../../infra/outbound/source-delivery-plan.js";
import type { CronDeliveryMode } from "../types.js";
import type { MutableCronSession } from "./run-session-state.js";
import {
@@ -249,7 +250,18 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
expect(dispatchCronDeliveryMock).toHaveBeenCalledTimes(1);
expectDispatchFields({
deliveryRequested: true,
skipMessagingToolDelivery: true,
sourceDeliveryOutcome: {
visibleDeliveries: [
{
via: "message_tool",
verifiedTarget: true,
target: { tool: "message", provider: "messagechat", to: "123" },
},
],
verifiedMessageToolDelivery: true,
satisfiesSourceDelivery: true,
unverifiedMessageToolDelivery: false,
},
});
expectDeliveryFields(result.delivery, {
intended: { channel: "messagechat", to: "123", source: "explicit" },
@@ -320,13 +332,20 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
thinkLevel: undefined,
timeoutMs: 60_000,
senderIsOwner: true,
messageChannel: "messagechat",
suppressExecNotifyOnExit: true,
toolPolicy: {
requireExplicitMessageTarget: false,
disableMessageTool: false,
forceMessageTool: true,
},
sourceDelivery: createSourceDeliveryPlan({
owner: "message_tool_then_direct_fallback",
reason: "cron_announce",
target: {
channel: resolvedDelivery.channel ?? "messagechat",
to: resolvedDelivery.to,
accountId: resolvedDelivery.accountId,
threadId: resolvedDelivery.threadId,
},
messageToolEnabled: true,
messageToolForced: true,
directFallback: true,
}),
skillsSnapshot: emptySkillsSnapshot,
agentPayload: null,
useSubagentFallbacks: false,
@@ -466,6 +485,64 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
});
});
it("marks delivery.mode none delivered when the message tool sends to the explicit target", async () => {
mockRunCronFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
requested: false,
mode: "none",
channel: "topicchat",
to: "room#42",
threadId: 42,
});
resolveDeliveryTargetMock.mockResolvedValue({
ok: true,
channel: "topicchat",
to: "room#42",
threadId: 42,
accountId: undefined,
error: undefined,
});
runEmbeddedPiAgentMock.mockResolvedValue(
makeMessageToolRunResult([
{ tool: "message", provider: "topicchat", to: "room#42", threadId: "42" },
]),
);
const result = await runCronIsolatedAgentTurn({
...makeParams(),
job: makeMessageToolPolicyJob({
mode: "none",
channel: "topicchat",
to: "room#42",
threadId: 42,
}),
});
expectDispatchFields({
deliveryRequested: false,
sourceDeliveryOutcome: {
visibleDeliveries: [
{
via: "message_tool",
verifiedTarget: true,
target: { tool: "message", provider: "topicchat", to: "room#42", threadId: "42" },
},
],
verifiedMessageToolDelivery: true,
satisfiesSourceDelivery: false,
unverifiedMessageToolDelivery: false,
},
});
expect(result.delivered).toBe(true);
expect(result.deliveryAttempted).toBe(true);
expectDeliveryFields(result.delivery, {
intended: { channel: "topicchat", to: "room#42", threadId: 42, source: "explicit" },
messageToolSentTo: [{ channel: "topicchat", to: "room#42", threadId: "42" }],
fallbackUsed: false,
delivered: true,
});
});
it('does not resolve implicit "last" context for bare delivery.mode none', async () => {
mockRunCronFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
@@ -513,8 +590,8 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
it("forwards explicit message targets into the embedded run", async () => {
mockRunCronFallbackPassthrough();
const executor = createMessageToolExecutor({
messageChannel: "topicchat",
resolvedDelivery: {
channel: "topicchat",
accountId: "ops",
to: "room#42",
threadId: 42,
@@ -536,8 +613,8 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
it("lets channels build currentChannelId from split delivery fields", async () => {
mockRunCronFallbackPassthrough();
const executor = createMessageToolExecutor({
messageChannel: "topicchat",
resolvedDelivery: {
channel: "topicchat",
accountId: "ops",
to: "room",
threadId: 42,
@@ -787,8 +864,18 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
expect(dispatchCronDeliveryMock).toHaveBeenCalledTimes(1);
expectDispatchFields({
deliveryRequested: true,
skipMessagingToolDelivery: false,
unverifiedMessagingToolDelivery: true,
sourceDeliveryOutcome: {
visibleDeliveries: [
{
via: "message_tool",
verifiedTarget: false,
target: { tool: "message", provider: "messagechat", to: "123" },
},
],
verifiedMessageToolDelivery: false,
satisfiesSourceDelivery: false,
unverifiedMessageToolDelivery: true,
},
});
const delivery = expectDeliveryFields(result.delivery, {
intended: { channel: "last", to: null, source: "last" },
@@ -823,8 +910,18 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
expect(dispatchCronDeliveryMock).toHaveBeenCalledTimes(1);
expectDispatchFields({
deliveryRequested: false,
skipMessagingToolDelivery: false,
unverifiedMessagingToolDelivery: true,
sourceDeliveryOutcome: {
visibleDeliveries: [
{
via: "message_tool",
verifiedTarget: false,
target: { tool: "message", provider: "messagechat", to: "123" },
},
],
verifiedMessageToolDelivery: false,
satisfiesSourceDelivery: false,
unverifiedMessageToolDelivery: true,
},
});
expect(result.delivered).toBe(false);
expect(result.deliveryAttempted).toBe(false);

View File

@@ -519,22 +519,22 @@ function resetRunOutcomeMocks(): void {
synthesizedText,
deliveryRequested,
skipHeartbeatDelivery,
skipMessagingToolDelivery,
sourceDeliveryOutcome,
resolvedDelivery,
}) => ({
result: undefined,
delivered: Boolean(
skipMessagingToolDelivery ||
sourceDeliveryOutcome?.verifiedMessageToolDelivery ||
(deliveryRequested &&
!skipHeartbeatDelivery &&
!skipMessagingToolDelivery &&
!sourceDeliveryOutcome?.satisfiesSourceDelivery &&
resolvedDelivery.ok),
),
deliveryAttempted: Boolean(
skipMessagingToolDelivery ||
sourceDeliveryOutcome?.verifiedMessageToolDelivery ||
(deliveryRequested &&
!skipHeartbeatDelivery &&
!skipMessagingToolDelivery &&
!sourceDeliveryOutcome?.satisfiesSourceDelivery &&
resolvedDelivery.ok),
),
summary,

View File

@@ -2,14 +2,18 @@ import { hasAnyAuthProfileStoreSource } from "../../agents/auth-profiles/source-
import { resolveAgentHarnessPolicy } from "../../agents/harness/selection.js";
import { listOpenAIAuthProfileProvidersForAgentRuntime } from "../../agents/openai-codex-routing.js";
import { retireSessionMcpRuntime } from "../../agents/pi-bundle-mcp-tools.js";
import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.types.js";
import type { SkillSnapshot } from "../../agents/skills.js";
import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js";
import type { ThinkLevel } from "../../auto-reply/thinking.js";
import type { CliDeps } from "../../cli/outbound-send-deps.js";
import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { stringifyRouteThreadId } from "../../plugin-sdk/channel-route.js";
import {
createSourceDeliveryPlan,
resolveSourceDeliveryOutcome,
type SourceDeliveryOutcome,
type SourceDeliveryPlan,
type SourceDeliveryVisibleDelivery,
} from "../../infra/outbound/source-delivery-plan.js";
import { isCommandLaneTaskTimeoutError } from "../../process/command-queue.js";
import { CommandLane } from "../../process/lanes.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
@@ -194,28 +198,17 @@ function normalizeCronTraceTarget(
};
}
type MessagingToolTargetMatcher = (
target: { provider?: string; to?: string; accountId?: string },
delivery: { channel?: string; to?: string; accountId?: string },
) => boolean;
function normalizeMessagingToolTarget(
target: MessagingToolSend,
delivery: SourceDeliveryVisibleDelivery,
resolvedDelivery: ResolvedCronDeliveryTarget,
matchesMessagingToolDeliveryTarget: MessagingToolTargetMatcher,
): CronDeliveryTraceMessageTarget | undefined {
const { target } = delivery;
const channel = target.provider?.trim();
if (!channel) {
return undefined;
}
const traceChannel =
channel === "message" &&
resolvedDelivery.ok &&
matchesMessagingToolDeliveryTarget(target, {
channel: resolvedDelivery.channel,
to: resolvedDelivery.to,
accountId: resolvedDelivery.accountId,
})
channel === "message" && resolvedDelivery.ok && delivery.verifiedTarget
? resolvedDelivery.channel
: channel;
return {
@@ -257,8 +250,7 @@ function buildResolvedCronTraceTarget(
function buildCronDeliveryTrace(params: {
deliveryPlan: CronDeliveryPlan;
resolvedDelivery: ResolvedCronDeliveryTarget;
messagingToolSentTargets: MessagingToolSend[];
matchesMessagingToolDeliveryTarget: MessagingToolTargetMatcher;
sourceDeliveryOutcome: SourceDeliveryOutcome;
fallbackUsed: boolean;
delivered: boolean;
}): CronDeliveryTrace {
@@ -275,14 +267,8 @@ function buildCronDeliveryTrace(params: {
const resolved = includeResolved
? buildResolvedCronTraceTarget(params.resolvedDelivery)
: undefined;
const messageToolSentTo = params.messagingToolSentTargets
.map((target) =>
normalizeMessagingToolTarget(
target,
params.resolvedDelivery,
params.matchesMessagingToolDeliveryTarget,
),
)
const messageToolSentTo = params.sourceDeliveryOutcome.visibleDeliveries
.map((delivery) => normalizeMessagingToolTarget(delivery, params.resolvedDelivery))
.filter((target): target is CronDeliveryTraceMessageTarget => Boolean(target));
return {
...(intended ? { intended } : {}),
@@ -293,60 +279,50 @@ function buildCronDeliveryTrace(params: {
};
}
function resolveMessagingToolSentTargets(params: {
resolvedDelivery: ResolvedCronDeliveryTarget;
runResult: CronExecutionResult["runResult"];
}): MessagingToolSend[] {
const explicitTargets = params.runResult.messagingToolSentTargets ?? [];
if (explicitTargets.length > 0 || params.runResult.didSendViaMessagingTool !== true) {
return explicitTargets;
}
if (!params.resolvedDelivery.ok) {
return [];
}
const threadId = stringifyRouteThreadId(params.resolvedDelivery.threadId);
return [
{
tool: "message",
provider: params.resolvedDelivery.channel,
...(params.resolvedDelivery.accountId
? { accountId: params.resolvedDelivery.accountId }
: {}),
...(params.resolvedDelivery.to ? { to: params.resolvedDelivery.to } : {}),
...(threadId ? { threadId } : {}),
},
];
}
function resolveCronToolPolicy(params: { deliveryMode: "announce" | "webhook" | "none" }) {
const enableMessageTool = params.deliveryMode !== "webhook";
return {
requireExplicitMessageTarget: false,
disableMessageTool: !enableMessageTool,
forceMessageTool: enableMessageTool,
};
}
function resolveCronSourceReplyDeliveryMode(params: {
function resolveCronSourceDeliveryPlan(params: {
deliveryPlan: CronDeliveryPlan;
resolvedDelivery: ResolvedCronDeliveryTarget;
toolPolicy: ReturnType<typeof resolveCronToolPolicy>;
}): SourceReplyDeliveryMode | undefined {
if (
params.deliveryPlan.mode !== "announce" ||
params.toolPolicy.disableMessageTool ||
!params.resolvedDelivery.ok
) {
return undefined;
}): SourceDeliveryPlan {
const target = {
channel: params.resolvedDelivery.channel,
to: params.resolvedDelivery.to,
accountId: params.resolvedDelivery.accountId,
threadId: params.resolvedDelivery.threadId,
};
if (params.deliveryPlan.mode === "webhook") {
return createSourceDeliveryPlan({
owner: "none",
reason: "cron_webhook",
messageToolEnabled: false,
directFallback: false,
});
}
return "message_tool_only";
if (params.deliveryPlan.mode === "none") {
return createSourceDeliveryPlan({
owner: "none",
reason: "cron_none",
target,
messageToolEnabled: true,
messageToolForced: true,
directFallback: false,
});
}
return createSourceDeliveryPlan({
owner: params.resolvedDelivery.ok ? "message_tool_then_direct_fallback" : "direct_fallback",
reason: "cron_announce",
target,
messageToolEnabled: true,
messageToolForced: true,
directFallback: true,
skipFallbackWhenMessageToolSentToTarget: params.resolvedDelivery.ok,
});
}
function canPromptForMessageTool(params: {
disableMessageTool: boolean;
sourceDelivery: SourceDeliveryPlan;
toolsAllow?: string[];
}): boolean {
if (params.disableMessageTool) {
if (!params.sourceDelivery.messageTool.enabled) {
return false;
}
return !params.toolsAllow?.length || params.toolsAllow.includes("message");
@@ -378,27 +354,24 @@ async function resolveCronDeliveryContext(params: {
deliveryPlan,
deliveryRequested: deliveryPlan.requested,
resolvedDelivery,
toolPolicy: resolveCronToolPolicy({
deliveryMode: deliveryPlan.mode,
}),
sourceDelivery: resolveCronSourceDeliveryPlan({ deliveryPlan, resolvedDelivery }),
};
}
if (deliveryPlan.mode === "none" && !hasExplicitCronDeliveryTarget(deliveryPlan)) {
const resolvedDelivery = {
ok: false as const,
channel: undefined,
to: undefined,
accountId: undefined,
threadId: undefined,
mode: "implicit" as const,
error: new Error("delivery is disabled"),
};
return {
deliveryPlan,
deliveryRequested: false,
resolvedDelivery: {
ok: false as const,
channel: undefined,
to: undefined,
accountId: undefined,
threadId: undefined,
mode: "implicit" as const,
error: new Error("delivery is disabled"),
},
toolPolicy: resolveCronToolPolicy({
deliveryMode: deliveryPlan.mode,
}),
resolvedDelivery,
sourceDelivery: resolveCronSourceDeliveryPlan({ deliveryPlan, resolvedDelivery }),
};
}
const { resolveDeliveryTarget } = await loadCronDeliveryRuntime();
@@ -413,9 +386,7 @@ async function resolveCronDeliveryContext(params: {
deliveryPlan,
deliveryRequested: deliveryPlan.requested,
resolvedDelivery,
toolPolicy: resolveCronToolPolicy({
deliveryMode: deliveryPlan.mode,
}),
sourceDelivery: resolveCronSourceDeliveryPlan({ deliveryPlan, resolvedDelivery }),
};
}
@@ -486,10 +457,9 @@ type PreparedCronRunContext = {
deliveryPlan: CronDeliveryPlan;
resolvedDelivery: ResolvedCronDeliveryTarget;
deliveryRequested: boolean;
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
sourceDelivery: SourceDeliveryPlan;
suppressExecNotifyOnExit: boolean;
senderIsOwner: boolean;
toolPolicy: ReturnType<typeof resolveCronToolPolicy>;
skillsSnapshot: SkillSnapshot;
liveSelection: CronLiveSelection;
useSubagentFallbacks: boolean;
@@ -712,17 +682,12 @@ async function prepareCronRunContext(params: {
? explicitTimeoutSeconds * 1000
: undefined;
const agentPayload = input.job.payload.kind === "agentTurn" ? input.job.payload : null;
const { deliveryPlan, deliveryRequested, resolvedDelivery, toolPolicy } =
const { deliveryPlan, deliveryRequested, resolvedDelivery, sourceDelivery } =
await resolveCronDeliveryContext({
cfg: cfgWithAgentDefaults,
job: input.job,
agentId,
});
const sourceReplyDeliveryMode = resolveCronSourceReplyDeliveryMode({
deliveryPlan,
resolvedDelivery,
toolPolicy,
});
const { formattedTime, timeLine } = resolveCronStyleNow(input.cfg, now);
const base = `[cron:${input.job.id} ${input.job.name}] ${input.message}`.trim();
@@ -763,7 +728,7 @@ async function prepareCronRunContext(params: {
commandBody,
deliveryRequested,
messageToolEnabled: canPromptForMessageTool({
disableMessageTool: toolPolicy.disableMessageTool,
sourceDelivery,
toolsAllow: agentPayload?.toolsAllow,
}),
resolvedDeliveryOk: resolvedDelivery.ok,
@@ -855,10 +820,9 @@ async function prepareCronRunContext(params: {
deliveryPlan,
resolvedDelivery,
deliveryRequested,
sourceReplyDeliveryMode,
sourceDelivery,
suppressExecNotifyOnExit: deliveryPlan.mode === "none",
senderIsOwner: !isExternalHook,
toolPolicy,
skillsSnapshot,
liveSelection,
useSubagentFallbacks,
@@ -1038,27 +1002,11 @@ async function finalizeCronRun(params: {
prepared.deliveryRequested &&
!hasFatalErrorPayload &&
isHeartbeatOnlyResponse(deliveryPayloads, resolveHeartbeatAckMaxChars(prepared.agentCfg));
const {
dispatchCronDelivery,
matchesMessagingToolDeliveryTarget,
resolveCronDeliveryBestEffort,
} = await loadCronDeliveryRuntime();
const messagingToolSentTargets = resolveMessagingToolSentTargets({
resolvedDelivery: prepared.resolvedDelivery,
runResult: finalRunResult,
const { dispatchCronDelivery, resolveCronDeliveryBestEffort } = await loadCronDeliveryRuntime();
const sourceDeliveryOutcome = resolveSourceDeliveryOutcome(prepared.sourceDelivery, {
didSendViaMessageTool: finalRunResult.didSendViaMessagingTool,
messageToolSentTargets: finalRunResult.messagingToolSentTargets,
});
const didSendViaMessagingTool =
finalRunResult.didSendViaMessagingTool === true && messagingToolSentTargets.length > 0;
const skipMessagingToolDelivery =
didSendViaMessagingTool &&
prepared.resolvedDelivery.ok &&
messagingToolSentTargets.some((target) =>
matchesMessagingToolDeliveryTarget(target, {
channel: prepared.resolvedDelivery.channel,
to: prepared.resolvedDelivery.to,
accountId: prepared.resolvedDelivery.accountId,
}),
);
const deliveryResult = await dispatchCronDelivery({
cfg: prepared.input.cfg,
cfgWithAgentDefaults: prepared.cfgWithAgentDefaults,
@@ -1074,8 +1022,7 @@ async function finalizeCronRun(params: {
resolvedDelivery: prepared.resolvedDelivery,
deliveryRequested: prepared.deliveryRequested,
skipHeartbeatDelivery,
skipMessagingToolDelivery,
unverifiedMessagingToolDelivery: didSendViaMessagingTool && !prepared.resolvedDelivery.ok,
sourceDeliveryOutcome,
deliveryBestEffort: resolveCronDeliveryBestEffort(prepared.input.job),
deliveryPayloadHasStructuredContent,
deliveryPayloads,
@@ -1092,9 +1039,11 @@ async function finalizeCronRun(params: {
const deliveryTrace = buildCronDeliveryTrace({
deliveryPlan: prepared.deliveryPlan,
resolvedDelivery: prepared.resolvedDelivery,
messagingToolSentTargets,
matchesMessagingToolDeliveryTarget,
fallbackUsed: deliveryResult.deliveryAttempted && !skipMessagingToolDelivery,
sourceDeliveryOutcome,
fallbackUsed:
prepared.deliveryRequested &&
deliveryResult.deliveryAttempted &&
!sourceDeliveryOutcome.satisfiesSourceDelivery,
delivered: deliveryResult.delivered,
});
if (deliveryResult.result) {
@@ -1202,8 +1151,7 @@ export async function runCronIsolatedAgentTurn(params: {
accountId: prepared.context.resolvedDelivery.accountId,
threadId: prepared.context.resolvedDelivery.threadId,
},
sourceReplyDeliveryMode: prepared.context.sourceReplyDeliveryMode,
toolPolicy: prepared.context.toolPolicy,
sourceDelivery: prepared.context.sourceDelivery,
skillsSnapshot: prepared.context.skillsSnapshot,
agentPayload: prepared.context.agentPayload,
useSubagentFallbacks: prepared.context.useSubagentFallbacks,

View File

@@ -0,0 +1,210 @@
import { describe, expect, it } from "vitest";
import {
createSourceDeliveryPlan,
resolveSourceDeliveryOutcome,
sourceDeliveryTargetsMatch,
} from "./source-delivery-plan.js";
describe("source delivery plan", () => {
it("projects message-tool-owned delivery to existing source reply and message tool fields", () => {
const contract = createSourceDeliveryPlan({
owner: "message_tool_then_direct_fallback",
reason: "cron_announce",
target: { channel: "discord", to: "channel:123", accountId: "bot-a" },
});
expect(contract.sourceReplyDeliveryMode).toBe("message_tool_only");
expect(contract.normalFinal).toBe("private");
expect(contract.fallback.skipWhenMessageToolSentToTarget).toBe(true);
expect(contract.messageTool).toMatchObject({
requireExplicitTarget: false,
enabled: true,
force: true,
});
});
it("keeps direct fallback delivery compatible with automatic final payload handling", () => {
const contract = createSourceDeliveryPlan({
owner: "direct_fallback",
reason: "cron_announce",
target: { channel: "discord", to: "channel:123" },
messageToolEnabled: true,
messageToolForced: true,
directFallback: true,
skipFallbackWhenMessageToolSentToTarget: false,
});
expect(contract.sourceReplyDeliveryMode).toBeUndefined();
expect(contract.normalFinal).toBe("visible");
expect(contract.fallback.directDelivery).toBe(true);
expect(contract.fallback.skipWhenMessageToolSentToTarget).toBe(false);
expect(contract.messageTool).toMatchObject({
requireExplicitTarget: false,
enabled: true,
force: true,
});
});
it("normalizes message-tool delivery outcomes against the planned source target", () => {
const contract = createSourceDeliveryPlan({
owner: "message_tool_then_direct_fallback",
reason: "cron_announce",
target: { channel: "feishu", to: "oc_123", accountId: "bot-a", threadId: 456 },
});
const outcome = resolveSourceDeliveryOutcome(contract, {
didSendViaMessageTool: true,
messageToolSentTargets: [
{
tool: "message",
provider: "message",
accountId: "bot-a",
to: "oc_123:topic:456",
text: "done",
},
],
});
expect(outcome.satisfiesSourceDelivery).toBe(true);
expect(outcome.verifiedMessageToolDelivery).toBe(true);
expect(outcome.unverifiedMessageToolDelivery).toBe(false);
expect(outcome.visibleDeliveries).toEqual([
{
via: "message_tool",
verifiedTarget: true,
target: {
tool: "message",
provider: "message",
accountId: "bot-a",
to: "oc_123:topic:456",
text: "done",
},
},
]);
});
it("keeps unverified message-tool sends visible to fallback/error handling", () => {
const contract = createSourceDeliveryPlan({
owner: "message_tool_then_direct_fallback",
reason: "cron_announce",
target: { channel: "slack", to: "channel:C1" },
});
const outcome = resolveSourceDeliveryOutcome(contract, {
didSendViaMessageTool: true,
messageToolSentTargets: [{ tool: "message", provider: "slack", to: "channel:C2" }],
});
expect(outcome.satisfiesSourceDelivery).toBe(false);
expect(outcome.verifiedMessageToolDelivery).toBe(false);
expect(outcome.unverifiedMessageToolDelivery).toBe(true);
expect(outcome.visibleDeliveries[0]?.verifiedTarget).toBe(false);
});
it("keeps verified message-tool delivery separate from source fallback satisfaction", () => {
const contract = createSourceDeliveryPlan({
owner: "none",
reason: "cron_none",
target: { channel: "slack", to: "channel:C1" },
messageToolEnabled: true,
messageToolForced: true,
directFallback: false,
});
const outcome = resolveSourceDeliveryOutcome(contract, {
didSendViaMessageTool: true,
messageToolSentTargets: [{ tool: "message", provider: "slack", to: "channel:C1" }],
});
expect(outcome.verifiedMessageToolDelivery).toBe(true);
expect(outcome.satisfiesSourceDelivery).toBe(false);
expect(outcome.unverifiedMessageToolDelivery).toBe(false);
});
it("does not satisfy delivery from target metadata without a committed message-tool send", () => {
const contract = createSourceDeliveryPlan({
owner: "message_tool_then_direct_fallback",
reason: "cron_announce",
target: { channel: "slack", to: "channel:C1" },
});
const outcome = resolveSourceDeliveryOutcome(contract, {
didSendViaMessageTool: false,
messageToolSentTargets: [{ tool: "message", provider: "slack", to: "channel:C1" }],
});
expect(outcome.visibleDeliveries[0]?.verifiedTarget).toBe(true);
expect(outcome.verifiedMessageToolDelivery).toBe(false);
expect(outcome.satisfiesSourceDelivery).toBe(false);
expect(outcome.unverifiedMessageToolDelivery).toBe(false);
});
it("does not synthesize an implicit target without a concrete recipient", () => {
const contract = createSourceDeliveryPlan({
owner: "direct_fallback",
reason: "cron_announce",
target: { channel: "slack" },
messageToolEnabled: true,
messageToolForced: true,
directFallback: true,
skipFallbackWhenMessageToolSentToTarget: false,
});
const outcome = resolveSourceDeliveryOutcome(contract, {
didSendViaMessageTool: true,
});
expect(outcome.visibleDeliveries).toEqual([]);
expect(outcome.verifiedMessageToolDelivery).toBe(false);
expect(outcome.satisfiesSourceDelivery).toBe(false);
expect(outcome.unverifiedMessageToolDelivery).toBe(false);
});
it("matches source targets through the same provider normalization used by delivery", () => {
expect(
sourceDeliveryTargetsMatch(
{ provider: "message", to: "channel:C1" },
{ channel: "slack", to: "channel:C1" },
),
).toBe(true);
expect(
sourceDeliveryTargetsMatch(
{ provider: "discord", to: "channel:C1" },
{ channel: "slack", to: "channel:C1" },
),
).toBe(false);
});
it("matches threaded delivery only with explicit or supported implicit thread evidence", () => {
expect(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "-100:topic:462" },
{ channel: "telegram", to: "-100", threadId: 462 },
),
).toBe(true);
expect(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "-100" },
{ channel: "telegram", to: "-100", threadId: 462 },
),
).toBe(false);
expect(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "-100", threadImplicit: true },
{ channel: "telegram", to: "-100", threadId: 462 },
),
).toBe(true);
expect(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "-100", threadImplicit: true, threadSuppressed: true },
{ channel: "telegram", to: "-100", threadId: 462 },
),
).toBe(false);
expect(
sourceDeliveryTargetsMatch(
{ provider: "telegram", to: "-100", threadId: "111" },
{ channel: "telegram", to: "-100", threadId: 462 },
),
).toBe(false);
});
});

View File

@@ -0,0 +1,221 @@
import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js";
import { stringifyRouteThreadId } from "../../plugin-sdk/channel-route.js";
import { normalizeTargetForProvider } from "./target-normalization.js";
export type SourceVisibleDeliveryOwner =
| "automatic_source"
| "message_tool"
| "message_tool_then_direct_fallback"
| "direct_fallback"
| "none";
export type SourceDeliveryPlanReason =
| "config"
| "room_event"
| "cron_announce"
| "cron_webhook"
| "cron_none"
| "media_completion"
| "subagent_completion";
export type SourceDeliveryTarget = {
channel?: string;
to?: string;
accountId?: string;
threadId?: string | number;
};
export type SourceDeliveryMessageToolTarget = {
tool?: string;
provider?: string;
accountId?: string;
to?: string;
threadId?: string;
threadImplicit?: boolean;
threadSuppressed?: boolean;
text?: string;
mediaUrls?: string[];
};
export type SourceDeliveryVisibleDelivery = {
via: "message_tool";
target: SourceDeliveryMessageToolTarget;
verifiedTarget: boolean;
};
export type SourceDeliveryOutcome = {
visibleDeliveries: SourceDeliveryVisibleDelivery[];
verifiedMessageToolDelivery: boolean;
satisfiesSourceDelivery: boolean;
unverifiedMessageToolDelivery: boolean;
};
export type SourceDeliveryPlan = {
owner: SourceVisibleDeliveryOwner;
reason: SourceDeliveryPlanReason;
target: SourceDeliveryTarget;
normalFinal: "visible" | "private";
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
messageTool: {
enabled: boolean;
force: boolean;
requireExplicitTarget: boolean;
defaultTarget: boolean;
};
fallback: {
directDelivery: boolean;
skipWhenMessageToolSentToTarget: boolean;
bestEffort: boolean;
};
progress: {
allowCallbacksWhenSourceDeliverySuppressed: boolean;
};
};
function isMessageToolOwnedDelivery(owner: SourceVisibleDeliveryOwner): boolean {
return owner === "message_tool" || owner === "message_tool_then_direct_fallback";
}
function normalizeDeliveryTarget(channel: string, to: string): string {
const toTrimmed = to.trim();
return normalizeTargetForProvider(channel, toTrimmed) ?? toTrimmed;
}
function normalizeDeliveryThreadId(threadId: string | number | undefined): string | undefined {
return stringifyRouteThreadId(threadId)?.trim() || undefined;
}
function extractTopicThreadId(targetTo: string): string | undefined {
return targetTo.match(/:topic:(\d+)$/i)?.[1];
}
export function sourceDeliveryTargetsMatch(
target: SourceDeliveryMessageToolTarget,
delivery: SourceDeliveryTarget,
): boolean {
if (!delivery.channel || !delivery.to || !target.to) {
return false;
}
const channel = delivery.channel.trim().toLowerCase();
const provider = target.provider?.trim().toLowerCase();
if (provider && provider !== "message" && provider !== channel) {
return false;
}
if (delivery.accountId && target.accountId && target.accountId !== delivery.accountId) {
return false;
}
// Strip :topic:NNN from message targets and normalize Feishu/Lark prefixes on
// both sides so source-delivery suppression compares canonical IDs.
const normalizedTargetTo = normalizeDeliveryTarget(channel, target.to.replace(/:topic:\d+$/, ""));
const normalizedDeliveryTo = normalizeDeliveryTarget(channel, delivery.to);
if (normalizedTargetTo !== normalizedDeliveryTo) {
return false;
}
const deliveryThreadId = normalizeDeliveryThreadId(delivery.threadId);
const targetThreadId =
normalizeDeliveryThreadId(target.threadId) ?? extractTopicThreadId(target.to);
if (!deliveryThreadId && !targetThreadId) {
return true;
}
if (deliveryThreadId && !targetThreadId) {
return target.threadImplicit === true && target.threadSuppressed !== true;
}
return deliveryThreadId === targetThreadId;
}
export function createSourceDeliveryPlan(params: {
owner: SourceVisibleDeliveryOwner;
reason: SourceDeliveryPlanReason;
target?: SourceDeliveryTarget;
messageToolEnabled?: boolean;
messageToolForced?: boolean;
requireExplicitMessageTarget?: boolean;
directFallback?: boolean;
skipFallbackWhenMessageToolSentToTarget?: boolean;
fallbackBestEffort?: boolean;
allowProgressCallbacksWhenSourceDeliverySuppressed?: boolean;
}): SourceDeliveryPlan {
const messageToolOwnsDelivery = isMessageToolOwnedDelivery(params.owner);
const sourceReplyDeliveryMode = messageToolOwnsDelivery ? "message_tool_only" : undefined;
const directDelivery =
params.directFallback ??
(params.owner === "direct_fallback" || params.owner === "message_tool_then_direct_fallback");
return {
owner: params.owner,
reason: params.reason,
target: params.target ?? {},
normalFinal:
sourceReplyDeliveryMode === "message_tool_only" || params.owner === "none"
? "private"
: "visible",
sourceReplyDeliveryMode,
messageTool: {
enabled: params.messageToolEnabled ?? messageToolOwnsDelivery,
force: params.messageToolForced ?? messageToolOwnsDelivery,
requireExplicitTarget: params.requireExplicitMessageTarget ?? false,
defaultTarget: Boolean(params.target?.channel || params.target?.to),
},
fallback: {
directDelivery,
skipWhenMessageToolSentToTarget:
params.skipFallbackWhenMessageToolSentToTarget ??
params.owner === "message_tool_then_direct_fallback",
bestEffort: params.fallbackBestEffort ?? false,
},
progress: {
allowCallbacksWhenSourceDeliverySuppressed:
params.allowProgressCallbacksWhenSourceDeliverySuppressed ?? false,
},
};
}
function resolveImplicitMessageToolDeliveryTarget(
plan: SourceDeliveryPlan,
): SourceDeliveryMessageToolTarget | undefined {
if (!plan.target.channel || !plan.target.to) {
return undefined;
}
const threadId = stringifyRouteThreadId(plan.target.threadId);
return {
tool: "message",
provider: plan.target.channel,
...(plan.target.accountId ? { accountId: plan.target.accountId } : {}),
...(plan.target.to ? { to: plan.target.to } : {}),
...(threadId ? { threadId } : {}),
};
}
export function resolveSourceDeliveryOutcome(
plan: SourceDeliveryPlan,
params: {
didSendViaMessageTool?: boolean;
messageToolSentTargets?: SourceDeliveryMessageToolTarget[];
},
): SourceDeliveryOutcome {
const didSendViaMessageTool = params.didSendViaMessageTool === true;
const explicitTargets = params.messageToolSentTargets ?? [];
const sentTargets =
explicitTargets.length > 0
? explicitTargets
: didSendViaMessageTool
? [resolveImplicitMessageToolDeliveryTarget(plan)].filter(
(target): target is SourceDeliveryMessageToolTarget => Boolean(target),
)
: [];
const visibleDeliveries = sentTargets.map((target) => ({
via: "message_tool" as const,
target,
verifiedTarget: sourceDeliveryTargetsMatch(target, plan.target),
}));
const hasVerifiedMessageToolDelivery = visibleDeliveries.some(
(delivery) => didSendViaMessageTool && delivery.verifiedTarget,
);
return {
visibleDeliveries,
verifiedMessageToolDelivery: hasVerifiedMessageToolDelivery,
satisfiesSourceDelivery:
plan.fallback.skipWhenMessageToolSentToTarget && hasVerifiedMessageToolDelivery,
unverifiedMessageToolDelivery:
didSendViaMessageTool && sentTargets.length > 0 && !hasVerifiedMessageToolDelivery,
};
}