From 7d9a9d83ffcaa1cb06e0e962ff14caad439fc9ba Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Mon, 20 Apr 2026 08:05:32 +0530 Subject: [PATCH] fix: preserve isolated message targets (#69153) * test(cron): cover delivery target context for mode none * fix(cron): preserve target context for delivery mode none * test(cron): cover isolated message target forwarding * fix(cron): forward isolated message targets into embedded runs * fix(cron): ignore implicit last-target context for mode none * fix(cron): keep mode none channel explicit only * test(cron): fix isolated target test typing * fix: preserve isolated message targets (#69153) * fix: preserve isolated message targets (#69153) --- CHANGELOG.md | 1 + src/cron/delivery-plan.test.ts | 48 +++++ src/cron/delivery-plan.ts | 6 +- src/cron/isolated-agent/run-executor.ts | 29 ++- .../run.message-tool-policy.test.ts | 172 ++++++++++++++++++ src/cron/isolated-agent/run.ts | 9 +- 6 files changed, 260 insertions(+), 5 deletions(-) create mode 100644 src/cron/delivery-plan.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 43149bf1465..a45b469659f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai - Matrix/commands: recognize slash commands that are prefixed with the bot's Matrix mention, so room messages like `@bot:server /new` trigger the command path without requiring custom mention regexes. (#68570) Thanks @nightq and @johnlanni. - Agents/subagents: include requested role and runtime timing on subagent failure payloads so parent agents can correlate failed or timed-out child work. (#68726) Thanks @BKF-Gitty. - Gateway/sessions: reject stale agent-scoped sessions after an agent is removed from config while preserving legacy default-agent main-session aliases. (#65986) Thanks @bittoby. +- Cron/isolated-agent: preserve explicit `delivery.mode: "none"` message targets for isolated runs without inheriting implicit `last` routing, so agent-initiated Telegram sends keep their authored destination while bare `mode:none` jobs stay targetless. (#69153) Thanks @obviyus. ## 2026.4.19-beta.2 diff --git a/src/cron/delivery-plan.test.ts b/src/cron/delivery-plan.test.ts new file mode 100644 index 00000000000..7d85512b9fe --- /dev/null +++ b/src/cron/delivery-plan.test.ts @@ -0,0 +1,48 @@ +import { describe, expect, it } from "vitest"; +import { resolveCronDeliveryPlan } from "./delivery-plan.js"; +import type { CronJob } from "./types.js"; + +function makeJob(overrides: Partial): CronJob { + const now = Date.now(); + return { + id: "job-1", + name: "test", + enabled: true, + createdAtMs: now, + updatedAtMs: now, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "hello" }, + state: {}, + ...overrides, + }; +} + +describe("resolveCronDeliveryPlan", () => { + it("preserves explicit message target context for delivery.mode=none", () => { + const plan = resolveCronDeliveryPlan( + makeJob({ + name: "Cron Target Context", + payload: { kind: "agentTurn", message: "send a message" }, + delivery: { + mode: "none", + channel: "telegram", + to: "123:topic:42", + threadId: 42, + accountId: "ops", + }, + }), + ); + + expect(plan).toEqual({ + mode: "none", + channel: "telegram", + to: "123:topic:42", + threadId: 42, + accountId: "ops", + source: "delivery", + requested: false, + }); + }); +}); diff --git a/src/cron/delivery-plan.ts b/src/cron/delivery-plan.ts index 83685a8f185..2d6f24ec264 100644 --- a/src/cron/delivery-plan.ts +++ b/src/cron/delivery-plan.ts @@ -50,18 +50,18 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { const deliveryThreadId = normalizeOptionalThreadValue( (delivery as { threadId?: unknown } | undefined)?.threadId, ); - const channel = deliveryChannel ?? "last"; const to = deliveryTo; const deliveryAccountId = normalizeOptionalString( (delivery as { accountId?: unknown } | undefined)?.accountId, ); if (hasDelivery) { const resolvedMode = mode ?? "announce"; + const channel = resolvedMode === "announce" ? (deliveryChannel ?? "last") : deliveryChannel; return { mode: resolvedMode, - channel: resolvedMode === "announce" ? channel : undefined, + channel: resolvedMode === "webhook" ? undefined : channel, to, - threadId: resolvedMode === "announce" ? deliveryThreadId : undefined, + threadId: resolvedMode === "webhook" ? undefined : deliveryThreadId, accountId: deliveryAccountId, source: "delivery", requested: resolvedMode === "announce", diff --git a/src/cron/isolated-agent/run-executor.ts b/src/cron/isolated-agent/run-executor.ts index e2d075a5b20..140938d0652 100644 --- a/src/cron/isolated-agent/run-executor.ts +++ b/src/cron/isolated-agent/run-executor.ts @@ -33,6 +33,20 @@ type CronSubagentRegistryRuntime = typeof import("./run-subagent-registry.runtim let cronEmbeddedRuntimePromise: Promise | undefined; let cronSubagentRegistryRuntimePromise: Promise | undefined; +function resolveCurrentChannelTarget(params: { + channel?: string; + to?: string; + threadId?: string | number; +}): string | undefined { + if (!params.to) { + return undefined; + } + if (params.channel !== "telegram" || params.threadId == null) { + return params.to; + } + return params.to.includes(":topic:") ? params.to : `${params.to}:topic:${params.threadId}`; +} + async function loadCronEmbeddedRuntime() { cronEmbeddedRuntimePromise ??= import("./run-embedded.runtime.js"); return await cronEmbeddedRuntimePromise; @@ -65,7 +79,11 @@ export function createCronPromptExecutor(params: { thinkLevel: ThinkLevel | undefined; timeoutMs: number; messageChannel: string | undefined; - resolvedDelivery: { accountId?: string }; + resolvedDelivery: { + accountId?: string; + to?: string; + threadId?: string | number; + }; toolPolicy: { requireExplicitMessageTarget: boolean; disableMessageTool: boolean; @@ -150,6 +168,13 @@ export function createCronPromptExecutor(params: { senderIsOwner: false, messageChannel: params.messageChannel, agentAccountId: params.resolvedDelivery.accountId, + messageTo: params.resolvedDelivery.to, + messageThreadId: params.resolvedDelivery.threadId, + currentChannelId: resolveCurrentChannelTarget({ + channel: params.messageChannel, + to: params.resolvedDelivery.to, + threadId: params.resolvedDelivery.threadId, + }), sessionFile, agentDir: params.agentDir, workspaceDir: params.workspaceDir, @@ -222,6 +247,8 @@ export async function executeCronRun(params: { resolvedDelivery: { channel?: string; accountId?: string; + to?: string; + threadId?: string | number; }; toolPolicy: { requireExplicitMessageTarget: boolean; diff --git a/src/cron/isolated-agent/run.message-tool-policy.test.ts b/src/cron/isolated-agent/run.message-tool-policy.test.ts index 6c1071ab6c9..5cdbc5d4844 100644 --- a/src/cron/isolated-agent/run.message-tool-policy.test.ts +++ b/src/cron/isolated-agent/run.message-tool-policy.test.ts @@ -1,10 +1,13 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import type { SkillSnapshot } from "../../agents/skills.js"; import type { CronDeliveryMode } from "../types.js"; +import type { MutableCronSession } from "./run-session-state.js"; import { clearFastTestEnv, dispatchCronDeliveryMock, isHeartbeatOnlyResponseMock, loadRunCronIsolatedAgentTurn, + makeCronSession, mockRunCronFallbackPassthrough, resetRunCronIsolatedAgentTurnHarness, resolveCronDeliveryPlanMock, @@ -14,6 +17,7 @@ import { } from "./run.test-harness.js"; const runCronIsolatedAgentTurn = await loadRunCronIsolatedAgentTurn(); +const { createCronPromptExecutor } = await import("./run-executor.js"); function makeParams() { return { @@ -73,6 +77,13 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { }); }); + const emptySkillsSnapshot: SkillSnapshot = { + prompt: "", + skills: [], + resolvedSkills: [], + version: 1, + }; + afterEach(() => { restoreFastTestEnv(previousFastTestEnv); }); @@ -84,6 +95,167 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { }); }); + it("preserves explicit delivery targets for agent-initiated messaging when delivery.mode is none", async () => { + mockRunCronFallbackPassthrough(); + resolveCronDeliveryPlanMock.mockReturnValue({ + requested: false, + mode: "none", + channel: "telegram", + to: "123:topic:42", + threadId: 42, + }); + resolveDeliveryTargetMock.mockResolvedValue({ + ok: true, + channel: "telegram", + to: "123:topic:42", + threadId: 42, + accountId: undefined, + error: undefined, + }); + + await runCronIsolatedAgentTurn({ + ...makeParams(), + job: { + id: "message-tool-policy", + name: "Message Tool Policy", + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + payload: { kind: "agentTurn", message: "send a message" }, + delivery: { mode: "none", channel: "telegram", to: "123:topic:42", threadId: 42 }, + } as never, + }); + + expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); + expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]).toMatchObject({ + disableMessageTool: false, + messageChannel: "telegram", + messageTo: "123:topic:42", + messageThreadId: 42, + currentChannelId: "123:topic:42", + }); + }); + + it("does not resolve implicit last-target context for bare delivery.mode none", async () => { + mockRunCronFallbackPassthrough(); + resolveCronDeliveryPlanMock.mockReturnValue({ + requested: false, + mode: "none", + channel: "last", + }); + + await runCronIsolatedAgentTurn({ + ...makeParams(), + job: { + id: "message-tool-policy", + name: "Message Tool Policy", + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + payload: { kind: "agentTurn", message: "send a message" }, + delivery: { mode: "none" }, + } as never, + }); + + expect(resolveDeliveryTargetMock).not.toHaveBeenCalled(); + expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); + expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]).toMatchObject({ + disableMessageTool: false, + messageChannel: undefined, + messageTo: undefined, + messageThreadId: undefined, + currentChannelId: undefined, + }); + }); + + it("forwards explicit message targets into the embedded run", async () => { + mockRunCronFallbackPassthrough(); + const executor = createCronPromptExecutor({ + cfg: {}, + cfgWithAgentDefaults: {}, + job: makeParams().job, + agentId: "default", + agentDir: "/tmp/agent-dir", + agentSessionKey: "cron:message-tool-policy", + workspaceDir: "/tmp/workspace", + resolvedVerboseLevel: "off", + thinkLevel: undefined, + timeoutMs: 60_000, + messageChannel: "telegram", + resolvedDelivery: { + accountId: "ops", + to: "123:topic:42", + threadId: 42, + }, + toolPolicy: { + requireExplicitMessageTarget: false, + disableMessageTool: false, + }, + skillsSnapshot: emptySkillsSnapshot, + agentPayload: null, + liveSelection: { + provider: "openai", + model: "gpt-5.4", + }, + cronSession: makeCronSession() as MutableCronSession, + abortReason: () => "aborted", + }); + + await executor.runPrompt("send a message"); + + expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); + expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]).toMatchObject({ + messageChannel: "telegram", + agentAccountId: "ops", + messageTo: "123:topic:42", + messageThreadId: 42, + currentChannelId: "123:topic:42", + }); + }); + + it("preserves topic routing when inferred currentChannelId is built from split delivery fields", async () => { + mockRunCronFallbackPassthrough(); + const executor = createCronPromptExecutor({ + cfg: {}, + cfgWithAgentDefaults: {}, + job: makeParams().job, + agentId: "default", + agentDir: "/tmp/agent-dir", + agentSessionKey: "cron:message-tool-policy", + workspaceDir: "/tmp/workspace", + resolvedVerboseLevel: "off", + thinkLevel: undefined, + timeoutMs: 60_000, + messageChannel: "telegram", + resolvedDelivery: { + accountId: "ops", + to: "123", + threadId: 42, + }, + toolPolicy: { + requireExplicitMessageTarget: false, + disableMessageTool: false, + }, + skillsSnapshot: emptySkillsSnapshot, + agentPayload: null, + liveSelection: { + provider: "openai", + model: "gpt-5.4", + }, + cronSession: makeCronSession() as MutableCronSession, + abortReason: () => "aborted", + }); + + await executor.runPrompt("send a message"); + + expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); + expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]).toMatchObject({ + messageChannel: "telegram", + agentAccountId: "ops", + messageTo: "123", + messageThreadId: 42, + currentChannelId: "123:topic:42", + }); + }); + it("disables the message tool when cron delivery is active", async () => { await expectMessageToolDisabledForPlan({ requested: true, diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index fc1580b9da1..aaf83c213ac 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -142,7 +142,12 @@ async function resolveCronDeliveryContext(params: { deliveryContract: IsolatedDeliveryContract; }) { const deliveryPlan = resolveCronDeliveryPlan(params.job); - if (!deliveryPlan.requested) { + const hasMessageTargetContext = + deliveryPlan.mode !== "webhook" && + (deliveryPlan.to !== undefined || + deliveryPlan.threadId !== undefined || + deliveryPlan.accountId !== undefined); + if (!deliveryPlan.requested && !hasMessageTargetContext) { const resolvedDelivery = { ok: false as const, channel: undefined, @@ -746,7 +751,9 @@ export async function runCronIsolatedAgentTurn(params: { lane: params.lane, resolvedDelivery: { channel: prepared.context.resolvedDelivery.channel, + to: prepared.context.resolvedDelivery.to, accountId: prepared.context.resolvedDelivery.accountId, + threadId: prepared.context.resolvedDelivery.threadId, }, toolPolicy: prepared.context.toolPolicy, skillsSnapshot: prepared.context.skillsSnapshot,