From ae63f76bbd516d9c1c0cc9a49dc8819bd17fcc7b Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Tue, 28 Apr 2026 02:47:20 -0700 Subject: [PATCH] fix(cron): infer session agentId when omitted (#72326) * fix(cron): infer session agentId when omitted * fix(clownfish): address review for ghcrawl-165998-agentic-merge (1) --- CHANGELOG.md | 2 + src/agents/tools/cron-tool.test.ts | 39 ++++++ src/agents/tools/cron-tool.ts | 2 +- .../isolated-agent/delivery-target.test.ts | 121 +++++++++++++++++- src/cron/isolated-agent/delivery-target.ts | 63 ++++++++- 5 files changed, 219 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 36be8a1c929..3be8648ee9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,7 +33,9 @@ Docs: https://docs.openclaw.ai - Plugins/startup: precompute bundled runtime mirror fingerprints before taking the mirror lock and keep Docker bundled plugin runtime deps/mirrors in a Docker-managed volume instead of the Windows/WSL config bind mount, so cold starts avoid slow host-volume mirror writes. Fixes #73339. Thanks @1yihui. - Channels/LINE: persist inbound image, video, audio, and file downloads in `~/.openclaw/media/inbound/` instead of temporary files so agents can still read LINE media after `/tmp` cleanup. Fixes #73370. Thanks @hijirii and @wenxu007. - CLI/plugins: keep bundled plugin installs out of `plugins.load.paths` while preserving install records, so install/inspect/doctor loops no longer warn about the current bundled plugin directory. Thanks @vincentkoc. +- Cron tool: infer the creating session's agentId for `cron.add` jobs when `agentId` is omitted or passed as undefined, keeping scheduled agentTurn jobs routed to the session agent; #40571 identified the guard bug and supplied the focused regression coverage. Thanks @ChanningYul. - Cron/Telegram: add `--thread-id` to `openclaw cron add` and `openclaw cron edit`, preserving Telegram forum topic delivery targets across scheduled announcements. Carries forward #51581, #60373, and #60890. Thanks @ChunHao-dev. +- Cron/Telegram: preserve session-derived Telegram topic thread IDs when isolated cron delivery explicitly targets the parent chat, keeping bare chat targets in the active forum topic without leaking stale topics to other chats. Carries forward #64708. Thanks @addelh. - Memory/compaction: keep pre-compaction memory-flush prompts runtime-only so session transcripts and `chat.history` no longer expose them as normal user turns. Fixes #54408 and #58956; refs #43567. Thanks @markgong and @guoyuhang9. - Control UI/WebChat: keep large attachment payloads out of Lit state and optimistic chat messages, using object URL previews plus send-time payload serialization so PDF/image uploads no longer trigger `RangeError: Maximum call stack size exceeded`. Fixes #73360; refs #54378 and #63432. Thanks @hejunhui-73, @Ansub, and @christianhernandez3-afk. - Agents/Anthropic: cancel stalled Anthropic Messages SSE body reads when abort signals fire, so active-memory timeouts release transport resources instead of leaving hidden recall runs parked on `reader.read()`. Refs #72965 and #73120. Thanks @wdeveloper16. diff --git a/src/agents/tools/cron-tool.test.ts b/src/agents/tools/cron-tool.test.ts index 0ec02fa1e03..cbe3a3279f3 100644 --- a/src/agents/tools/cron-tool.test.ts +++ b/src/agents/tools/cron-tool.test.ts @@ -131,6 +131,25 @@ describe("cron tool", () => { return payload?.sessionKey; } + async function executeAddAndReadAgentId(params: { + callId: string; + agentSessionKey: string; + agentId?: unknown; + includeAgentId?: boolean; + }): Promise { + const tool = createTestCronTool({ agentSessionKey: params.agentSessionKey }); + await tool.execute(params.callId, { + action: "add", + job: { + name: "reminder", + schedule: { at: new Date(123).toISOString() }, + payload: { kind: "agentTurn", message: "hello" }, + ...(params.includeAgentId ? { agentId: params.agentId } : {}), + }, + }); + return readGatewayCall().params?.agentId; + } + async function executeAddWithContextMessages(callId: string, contextMessages: number) { const tool = createTestCronTool({ agentSessionKey: "main" }); await tool.execute(callId, { @@ -265,6 +284,26 @@ describe("cron tool", () => { expect(call?.params?.agentId).toBeNull(); }); + it("infers session agentId when job.agentId is omitted", async () => { + await expect( + executeAddAndReadAgentId({ + callId: "call-omitted-agent-id", + agentSessionKey: "agent:agent-123:telegram:direct:channing", + }), + ).resolves.toBe("agent-123"); + }); + + it("infers session agentId when job.agentId is undefined", async () => { + await expect( + executeAddAndReadAgentId({ + callId: "call-undefined-agent-id", + agentSessionKey: "agent:agent-123:telegram:direct:channing", + includeAgentId: true, + agentId: undefined, + }), + ).resolves.toBe("agent-123"); + }); + it("passes through failureAlert=false for add", async () => { const tool = createTestCronTool(); await tool.execute("call-disable-alerts-add", { diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index 6d0cb2d689b..700f545d819 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -662,7 +662,7 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con const resolvedSessionKey = opts?.agentSessionKey ? resolveInternalSessionKey({ key: opts.agentSessionKey, alias, mainKey }) : undefined; - if (!("agentId" in job)) { + if (!("agentId" in job) || (job as { agentId?: unknown }).agentId === undefined) { const agentId = opts?.agentSessionKey ? resolveSessionAgentId({ sessionKey: opts.agentSessionKey, config: cfg }) : undefined; diff --git a/src/cron/isolated-agent/delivery-target.test.ts b/src/cron/isolated-agent/delivery-target.test.ts index 37251455315..bcd52567059 100644 --- a/src/cron/isolated-agent/delivery-target.test.ts +++ b/src/cron/isolated-agent/delivery-target.test.ts @@ -1,7 +1,10 @@ import { afterAll, afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { ChannelOutboundAdapter } from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; -import { forumMessagingForTest } from "../../infra/outbound/targets.test-helpers.js"; +import { + forumMessagingForTest, + telegramMessagingForTest, +} from "../../infra/outbound/targets.test-helpers.js"; import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../../plugins/runtime.js"; import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js"; @@ -86,8 +89,20 @@ function createAllowlistAwareStubOutbound(label: string): ChannelOutboundAdapter }; } +const normalizeTelegramTargetForDeliveryTest = vi.fn((raw: string): string | undefined => { + const target = telegramMessagingForTest.parseExplicitTarget?.({ raw }); + if (!target?.to) { + return undefined; + } + const normalizedTo = target.to.toLowerCase(); + return target.threadId == null + ? `telegram:${normalizedTo}` + : `telegram:${normalizedTo}:topic:${target.threadId}`; +}); + beforeEach(() => { resetPluginRuntimeStateForTest(); + normalizeTelegramTargetForDeliveryTest.mockClear(); vi.mocked(resolveOutboundTarget).mockReset(); setActivePluginRegistry( createTestRegistry([ @@ -100,6 +115,18 @@ beforeEach(() => { }), source: "test", }, + { + pluginId: "telegram", + plugin: createOutboundTestPlugin({ + id: "telegram", + outbound: createStubOutbound("Telegram"), + messaging: { + ...telegramMessagingForTest, + normalizeTarget: normalizeTelegramTargetForDeliveryTest, + }, + }), + source: "test", + }, { pluginId: "alpha", plugin: { @@ -421,6 +448,38 @@ describe("resolveDeliveryTarget", () => { ); }); + it("returns an unresolved target when loaded target resolution throws", async () => { + setMainSessionEntry(undefined); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "alpha", + plugin: createOutboundTestPlugin({ + id: "alpha", + outbound: { + deliveryMode: "gateway", + resolveTarget: () => { + throw new Error("target normalizer exploded"); + }, + }, + }), + source: "test", + }, + ]), + ); + + const result = await resolveDeliveryTarget(makeCfg({ bindings: [] }), AGENT_ID, { + channel: "alpha", + to: "room:default", + }); + + expect(result.ok).toBe(false); + if (result.ok) { + throw new Error("expected invalid delivery target"); + } + expect(result.error.message).toContain("Invalid delivery target: target normalizer exploded"); + }); + it("selects correct binding when multiple agents have bindings", async () => { setMainSessionEntry(undefined); @@ -483,6 +542,66 @@ describe("resolveDeliveryTarget", () => { expect(result.threadId).toBe("thread-2"); }); + it("keeps a session Telegram topic threadId when a bare explicit target matches the topic route", async () => { + setLastSessionEntry({ + sessionId: "sess-telegram-topic", + lastChannel: "telegram", + lastTo: "-100200300:topic:77", + lastThreadId: "77", + }); + normalizeTelegramTargetForDeliveryTest.mockClear(); + + const result = await resolveDeliveryTarget(makeCfg({ bindings: [] }), AGENT_ID, { + channel: "telegram", + to: "-100200300", + }); + + expect(result.ok).toBe(true); + expect(result.to).toBe("-100200300"); + expect(result.threadId).toBe(77); + expect(normalizeTelegramTargetForDeliveryTest).toHaveBeenCalledWith("-100200300"); + expect(normalizeTelegramTargetForDeliveryTest).toHaveBeenCalledWith("-100200300:topic:77"); + }); + + it("drops carried threadId instead of throwing when target normalization fails", async () => { + setLastSessionEntry({ + sessionId: "sess-telegram-topic-invalid", + lastChannel: "telegram", + lastTo: "-100200300:topic:77", + lastThreadId: "77", + }); + normalizeTelegramTargetForDeliveryTest.mockImplementationOnce(() => { + throw new Error("target normalizer exploded"); + }); + + const result = await resolveDeliveryTarget(makeCfg({ bindings: [] }), AGENT_ID, { + channel: "telegram", + to: "-100200300", + }); + + expect(result.ok).toBe(true); + expect(result.to).toBe("-100200300"); + expect(result.threadId).toBeUndefined(); + }); + + it("drops a session Telegram topic threadId when a bare explicit target names a different chat", async () => { + setLastSessionEntry({ + sessionId: "sess-telegram-topic-stale", + lastChannel: "telegram", + lastTo: "-100200300:topic:77", + lastThreadId: "77", + }); + + const result = await resolveDeliveryTarget(makeCfg({ bindings: [] }), AGENT_ID, { + channel: "telegram", + to: "-100999999", + }); + + expect(result.ok).toBe(true); + expect(result.to).toBe("-100999999"); + expect(result.threadId).toBeUndefined(); + }); + it("uses single configured channel when neither explicit nor session channel exists", async () => { setMainSessionEntry(undefined); diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index fcd71d78585..fcafa2f8787 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -1,3 +1,4 @@ +import { parseExplicitTargetForLoadedChannel } from "../../channels/plugins/target-parsing-loaded.js"; import type { ChannelId } from "../../channels/plugins/types.public.js"; import { resolveAgentMainSessionKey } from "../../config/sessions/main-session.js"; import { resolveStorePath } from "../../config/sessions/paths.js"; @@ -5,6 +6,7 @@ import { loadSessionStore } from "../../config/sessions/store-load.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { formatErrorMessage } from "../../infra/errors.js"; import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-id-resolution.js"; +import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js"; import { tryResolveLoadedOutboundTarget } from "../../infra/outbound/targets-loaded.js"; import { resolveSessionDeliveryTarget } from "../../infra/outbound/targets-session.js"; import type { OutboundChannel } from "../../infra/outbound/targets.js"; @@ -41,12 +43,56 @@ async function loadTargetsRuntime() { async function resolveOutboundTargetWithRuntime( params: Parameters[0], ) { - const loaded = tryResolveLoadedOutboundTarget(params); - if (loaded) { - return loaded; + try { + const loaded = tryResolveLoadedOutboundTarget(params); + if (loaded) { + return loaded; + } + const { resolveOutboundTarget } = await loadTargetsRuntime(); + return resolveOutboundTarget(params); + } catch (err) { + return { + ok: false as const, + error: new Error(`Invalid delivery target: ${formatErrorMessage(err)}`), + }; } - const { resolveOutboundTarget } = await loadTargetsRuntime(); - return resolveOutboundTarget(params); +} + +function normalizeTargetForThreadCarry( + channel: Exclude | undefined, + to: string | undefined, +): string | undefined { + if (!channel || !to) { + return undefined; + } + try { + const normalized = normalizeTargetForProvider(channel, to); + const comparable = normalized ?? to.trim(); + if (!comparable) { + return undefined; + } + const parsed = parseExplicitTargetForLoadedChannel(channel, comparable); + const base = parsed?.to ?? comparable; + return normalizeTargetForProvider(channel, base) ?? base; + } catch { + return undefined; + } +} + +function deliveryTargetsShareThreadRoute(params: { + channel: Exclude | undefined; + to: string | undefined; + lastTo: string | undefined; +}): boolean { + if (!params.to || !params.lastTo) { + return false; + } + if (params.to === params.lastTo) { + return true; + } + const normalizedTo = normalizeTargetForThreadCarry(params.channel, params.to); + const normalizedLastTo = normalizeTargetForThreadCarry(params.channel, params.lastTo); + return Boolean(normalizedTo && normalizedLastTo && normalizedTo === normalizedLastTo); } let channelSelectionRuntimePromise: @@ -160,7 +206,12 @@ export async function resolveDeliveryTarget( // stale thread IDs from leaking to a different chat. let threadId = resolved.threadId && - (resolved.threadIdExplicit || (resolved.to && resolved.to === resolved.lastTo)) + (resolved.threadIdExplicit || + deliveryTargetsShareThreadRoute({ + channel, + to: resolved.to, + lastTo: resolved.lastTo, + })) ? resolved.threadId : undefined;