From fc9798a788ff74648161f18189dc3d49b59461d2 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 18:42:52 +0100 Subject: [PATCH] refactor: resolve cron delivery from context (#82241) --- src/agents/tools/cron-tool.test.ts | 142 +++++++-------- src/agents/tools/cron-tool.ts | 165 ++---------------- src/cron/delivery-context.test.ts | 117 +++++++++++++ src/cron/delivery-context.ts | 59 +++++++ .../isolated-agent/delivery-target.test.ts | 82 +++++++++ src/cron/isolated-agent/delivery-target.ts | 26 ++- 6 files changed, 355 insertions(+), 236 deletions(-) create mode 100644 src/cron/delivery-context.test.ts create mode 100644 src/cron/delivery-context.ts diff --git a/src/agents/tools/cron-tool.test.ts b/src/agents/tools/cron-tool.test.ts index 133ab7f78bf..36670c3fa61 100644 --- a/src/agents/tools/cron-tool.test.ts +++ b/src/agents/tools/cron-tool.test.ts @@ -1,7 +1,8 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -const { callGatewayMock } = vi.hoisted(() => ({ +const { callGatewayMock, extractDeliveryInfoMock } = vi.hoisted(() => ({ callGatewayMock: vi.fn(), + extractDeliveryInfoMock: vi.fn(), })); vi.mock("../agent-scope.js", async () => { @@ -12,6 +13,10 @@ vi.mock("../agent-scope.js", async () => { }; }); +vi.mock("../../config/sessions/delivery-info.js", () => ({ + extractDeliveryInfo: extractDeliveryInfoMock, +})); + import { buildAgentPeerSessionKey } from "../../routing/session-key.js"; import { createCronTool } from "./cron-tool.js"; @@ -164,6 +169,8 @@ describe("cron tool", () => { beforeEach(() => { callGatewayMock.mockClear(); callGatewayMock.mockResolvedValue({ ok: true }); + extractDeliveryInfoMock.mockReset(); + extractDeliveryInfoMock.mockReturnValue({ deliveryContext: undefined, threadId: undefined }); }); it("marks cron as owner-only", () => { @@ -721,89 +728,56 @@ describe("cron tool", () => { expect(call.params?.agentId).toBeNull(); }); - it("infers delivery from threaded session keys", async () => { - expect( - await executeAddAndReadDelivery({ - callId: "call-thread", - agentSessionKey: "agent:main:slack:channel:general:thread:1699999999.0001", - }), - ).toEqual({ - mode: "announce", - channel: "slack", - to: "general", + it("does not infer delivery from raw session-key fragments without delivery context", async () => { + const slackDelivery = await executeAddAndReadDelivery({ + callId: "call-thread", + agentSessionKey: "agent:main:slack:channel:general:thread:1699999999.0001", }); + const telegramDelivery = await executeAddAndReadDelivery({ + callId: "call-telegram-topic", + agentSessionKey: "agent:main:telegram:group:-1001234567890:topic:99", + }); + + expect(slackDelivery?.channel).toBeUndefined(); + expect(slackDelivery?.to).toBeUndefined(); + expect(telegramDelivery?.channel).toBeUndefined(); + expect(telegramDelivery?.to).toBeUndefined(); }); - it("preserves telegram forum topics when inferring delivery", async () => { - expect( - await executeAddAndReadDelivery({ - callId: "call-telegram-topic", - agentSessionKey: "agent:main:telegram:group:-1001234567890:topic:99", - }), - ).toEqual({ - mode: "announce", - channel: "telegram", - to: "-1001234567890:topic:99", + it("uses stored delivery context when current context is unavailable", async () => { + extractDeliveryInfoMock.mockReturnValueOnce({ + deliveryContext: { + channel: "matrix", + to: "room:!AbCdEf1234567890:example.org", + accountId: "bot-a", + threadId: "$RootEvent:Example.Org", + }, + threadId: undefined, }); - }); - it("preserves telegram direct-chat thread ids when inferring delivery", async () => { expect( await executeAddAndReadDelivery({ - callId: "call-telegram-direct-thread", - agentSessionKey: "agent:main:telegram:direct:123456789:thread:123456789:99", + callId: "call-stored-context", + agentSessionKey: "agent:main:matrix:channel:!abcdef1234567890:example.org", }), ).toEqual({ mode: "announce", - channel: "telegram", - to: "123456789", - threadId: "99", - }); - }); - - it("preserves telegram account ids with direct-chat thread inference", async () => { - expect( - await executeAddAndReadDelivery({ - callId: "call-telegram-account-direct-thread", - agentSessionKey: "agent:main:telegram:bot-a:direct:123456789:thread:123456789:99", - }), - ).toEqual({ - mode: "announce", - channel: "telegram", - to: "123456789", + channel: "matrix", + to: "room:!AbCdEf1234567890:example.org", accountId: "bot-a", - threadId: "99", + threadId: "$RootEvent:Example.Org", }); }); - it("preserves legacy telegram dm thread ids when inferring delivery", async () => { - expect( - await executeAddAndReadDelivery({ - callId: "call-telegram-dm-thread", - agentSessionKey: "agent:main:telegram:dm:123456789:thread:123456789:99", - }), - ).toEqual({ - mode: "announce", - channel: "telegram", - to: "123456789", - threadId: "99", + it("prefers current delivery context over stored session context", async () => { + extractDeliveryInfoMock.mockReturnValueOnce({ + deliveryContext: { + channel: "matrix", + to: "!stored:example.org", + }, + threadId: undefined, }); - }); - it("drops mismatched telegram direct-chat thread ids when inferring delivery", async () => { - expect( - await executeAddAndReadDelivery({ - callId: "call-telegram-mismatched-direct-thread", - agentSessionKey: "agent:main:telegram:direct:123456789:thread:987654321:99", - }), - ).toEqual({ - mode: "announce", - channel: "telegram", - to: "123456789", - }); - }); - - it("prefers current delivery context over lowercased session-key targets", async () => { expect( await executeAddAndReadDelivery({ callId: "call-current-context", @@ -825,12 +799,8 @@ describe("cron tool", () => { }); it("does not surface lowercased LINE recipients when current delivery context is unavailable (#81628)", async () => { - // Reproduces openclaw/openclaw#81628. LINE chat IDs are case-sensitive — push - // requires capital C/U/R; lowercased recipients return HTTP 400. The runtime - // already lowercases LINE peer IDs when canonicalizing the session key, and - // when the delivery-recovery / post-reply-token-expiry push path is missing - // currentDeliveryContext, inferDeliveryFromSessionKey lifts the lowercased - // fragment straight into delivery.to. + // LINE chat IDs are case-sensitive; without current/persisted deliveryContext, + // cron must not rebuild delivery.to from the lowercased session-key fragment. const sessionKey = buildAgentPeerSessionKey({ agentId: "main", channel: "line", @@ -842,9 +812,7 @@ describe("cron tool", () => { const delivery = await executeAddAndReadDelivery({ callId: "call-line-group-no-context-81628", agentSessionKey: sessionKey, - // Intentionally no currentDeliveryContext — emulates the delivery-recovery - // boundary that reloads queued entries from disk after the reply token has - // expired. + // Intentionally no currentDeliveryContext. }); expect(delivery?.to).toBeUndefined(); @@ -990,7 +958,15 @@ describe("cron tool", () => { }); }); - it("falls back to session-key inference when current context has no target", async () => { + it("falls back to stored delivery context when current context has no target", async () => { + extractDeliveryInfoMock.mockReturnValueOnce({ + deliveryContext: { + channel: "telegram", + to: "-1001234567890", + }, + threadId: "99", + }); + expect( await executeAddAndReadDelivery({ callId: "call-empty-current-context", @@ -1003,7 +979,8 @@ describe("cron tool", () => { ).toEqual({ mode: "announce", channel: "telegram", - to: "-1001234567890:topic:99", + to: "-1001234567890", + threadId: "99", }); }); @@ -1022,6 +999,13 @@ describe("cron tool", () => { }); it("infers delivery when delivery is null", async () => { + extractDeliveryInfoMock.mockReturnValueOnce({ + deliveryContext: { + to: "alice", + }, + threadId: undefined, + }); + expect( await executeAddAndReadDelivery({ callId: "call-null-delivery", diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index aee971eb27a..5b0f36c0a6e 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -1,23 +1,13 @@ import { Type, type TSchema } from "typebox"; import { getRuntimeConfig } from "../../config/config.js"; +import { resolveCronCreationDelivery } from "../../cron/delivery-context.js"; import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js"; -import type { CronDelivery, CronMessageChannel } from "../../cron/types.js"; +import type { CronDelivery } from "../../cron/types.js"; import { normalizeHttpWebhookUrl } from "../../cron/webhook-url.js"; -import { - parseAgentSessionKey, - parseThreadSessionSuffix, -} from "../../sessions/session-key-utils.js"; import { extractTextFromChatContent } from "../../shared/chat-content.js"; -import { - normalizeLowercaseStringOrEmpty, - normalizeOptionalLowercaseString, - normalizeOptionalString, -} from "../../shared/string-coerce.js"; +import { normalizeLowercaseStringOrEmpty } from "../../shared/string-coerce.js"; import { isRecord, truncateUtf16Safe } from "../../utils.js"; -import { - normalizeDeliveryContext, - type DeliveryContext, -} from "../../utils/delivery-context.shared.js"; +import type { DeliveryContext } from "../../utils/delivery-context.shared.js"; import { resolveSessionAgentId } from "../agent-scope.js"; import { optionalStringEnum, stringEnum } from "../schema/typebox.js"; import { CRON_TOOL_DISPLAY_SUMMARY } from "../tool-description-presets.js"; @@ -500,143 +490,6 @@ async function buildReminderContextLines(params: { } } -function stripThreadSuffixFromSessionKey(sessionKey: string): string { - const normalized = normalizeLowercaseStringOrEmpty(sessionKey); - const idx = normalized.lastIndexOf(":thread:"); - if (idx <= 0) { - return sessionKey; - } - const parent = sessionKey.slice(0, idx).trim(); - return parent ? parent : sessionKey; -} - -function resolveTelegramDirectThreadId(params: { - peerId: string; - threadId?: string; -}): string | undefined { - const threadId = normalizeOptionalString(params.threadId); - if (!threadId) { - return undefined; - } - const peerId = normalizeOptionalString(params.peerId); - if (!peerId) { - return undefined; - } - const [threadChatId, ...threadIdParts] = threadId.split(":"); - if (threadIdParts.length === 0) { - return threadId; - } - if (normalizeOptionalLowercaseString(threadChatId) !== peerId) { - return undefined; - } - return normalizeOptionalString(threadIdParts.join(":")); -} - -function inferDeliveryFromSessionKey(agentSessionKey?: string): CronDelivery | null { - const rawSessionKey = agentSessionKey?.trim(); - if (!rawSessionKey) { - return null; - } - const threadSuffix = parseThreadSessionSuffix(rawSessionKey); - const parsed = parseAgentSessionKey( - threadSuffix.baseSessionKey ?? stripThreadSuffixFromSessionKey(rawSessionKey), - ); - if (!parsed || !parsed.rest) { - return null; - } - const parts = parsed.rest.split(":").filter(Boolean); - if (parts.length === 0) { - return null; - } - const head = normalizeOptionalLowercaseString(parts[0]); - if (!head || head === "main" || head === "subagent" || head === "acp") { - return null; - } - - // buildAgentPeerSessionKey encodes peers as: - // - direct: - // - :direct: - // - ::direct: - // - :group: - // - :channel: - // Note: legacy keys may use "dm" instead of "direct". - // Threaded sessions append :thread:, which we strip so delivery targets the parent peer. - // NOTE: Telegram forum topics encode as :topic: and should be preserved. - const markerIndex = parts.findIndex( - (part) => part === "direct" || part === "dm" || part === "group" || part === "channel", - ); - if (markerIndex === -1) { - return null; - } - const peerId = parts - .slice(markerIndex + 1) - .join(":") - .trim(); - if (!peerId) { - return null; - } - const marker = parts[markerIndex]; - - let channel: CronMessageChannel | undefined; - if (markerIndex >= 1) { - channel = normalizeOptionalLowercaseString(parts[0]) as CronMessageChannel | undefined; - } - - // LINE chat ids are case-sensitive (push requires capital C/U/R) but the - // session key holds the peer id lowercased for canonical routing. Rebuilding - // `to` from the session-key fragment would yield a value LINE rejects with - // HTTP 400, so refuse the fallback for LINE and let the caller surface the - // missing target instead of silently scheduling an undeliverable job. - // openclaw/openclaw#81628 - const isChannellessLineDirectId = - !channel && (marker === "direct" || marker === "dm") && /^[ucr][a-f0-9]{32}$/.test(peerId); - if (channel === "line" || isChannellessLineDirectId) { - return null; - } - - const delivery: CronDelivery = { mode: "announce", to: peerId }; - if (channel) { - delivery.channel = channel; - } - if (channel === "telegram" && markerIndex === 2) { - const accountId = normalizeOptionalString(parts[1]); - if (accountId) { - delivery.accountId = accountId; - } - } - if (channel === "telegram" && (marker === "direct" || marker === "dm")) { - const threadId = resolveTelegramDirectThreadId({ - peerId, - threadId: threadSuffix.threadId, - }); - if (threadId) { - delivery.threadId = threadId; - } - } - return delivery; -} - -function inferDeliveryFromContext(context?: DeliveryContext): CronDelivery | null { - const normalized = normalizeDeliveryContext(context); - if (!normalized?.to) { - return null; - } - const delivery: CronDelivery = { - mode: "announce", - to: normalized.to, - }; - if (normalized.channel) { - delivery.channel = normalized.channel as CronMessageChannel; - } - if (normalized.accountId) { - delivery.accountId = normalized.accountId; - } - if (normalized.threadId != null) { - delivery.threadId = normalized.threadId; - } - return delivery; -} - export function createCronTool(opts?: CronToolOptions, deps?: CronToolDeps): AnyAgentTool { const callGateway = deps?.callGatewayTool ?? callGatewayTool; return { @@ -808,8 +661,8 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con normalizeCronJobCreate(params.job, { sessionContext: { sessionKey: opts?.agentSessionKey }, }) ?? params.job; + const cfg = getRuntimeConfig(); if (job && typeof job === "object") { - const cfg = getRuntimeConfig(); const { mainKey, alias } = resolveMainSessionAlias(cfg); const resolvedSessionKey = opts?.agentSessionKey ? resolveInternalSessionKey({ key: opts.agentSessionKey, alias, mainKey }) @@ -858,9 +711,11 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con (mode === "" || mode === "announce") && !hasTarget; if (shouldInfer) { - const inferred = - inferDeliveryFromContext(opts.currentDeliveryContext) ?? - inferDeliveryFromSessionKey(opts.agentSessionKey); + const inferred = resolveCronCreationDelivery({ + cfg, + currentDeliveryContext: opts.currentDeliveryContext, + agentSessionKey: opts.agentSessionKey, + }); if (inferred) { (job as { delivery?: unknown }).delivery = { ...inferred, diff --git a/src/cron/delivery-context.test.ts b/src/cron/delivery-context.test.ts new file mode 100644 index 00000000000..75971f717c7 --- /dev/null +++ b/src/cron/delivery-context.test.ts @@ -0,0 +1,117 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; + +const { extractDeliveryInfoMock } = vi.hoisted(() => ({ + extractDeliveryInfoMock: vi.fn(), +})); + +vi.mock("../config/sessions/delivery-info.js", () => ({ + extractDeliveryInfo: extractDeliveryInfoMock, +})); + +import { cronDeliveryFromContext, resolveCronCreationDelivery } from "./delivery-context.js"; + +describe("cron delivery context", () => { + const cfg = {} as OpenClawConfig; + + beforeEach(() => { + extractDeliveryInfoMock.mockReset(); + extractDeliveryInfoMock.mockReturnValue({ deliveryContext: undefined, threadId: undefined }); + }); + + it("builds announce delivery from deliveryContext without changing target casing", () => { + expect( + cronDeliveryFromContext({ + channel: " Matrix ", + to: " !AbCdEf1234567890:Example.Org ", + accountId: " Bot-A ", + threadId: " $RootEvent:Example.Org ", + }), + ).toEqual({ + mode: "announce", + channel: "matrix", + to: "!AbCdEf1234567890:Example.Org", + accountId: "bot-a", + threadId: "$RootEvent:Example.Org", + }); + }); + + it("prefers current deliveryContext over stored session deliveryContext", () => { + extractDeliveryInfoMock.mockReturnValueOnce({ + deliveryContext: { channel: "matrix", to: "!stored:example.org" }, + threadId: undefined, + }); + + expect( + resolveCronCreationDelivery({ + cfg, + agentSessionKey: "agent:main:matrix:channel:!stored:example.org", + currentDeliveryContext: { + channel: "matrix", + to: "!Current:Example.Org", + }, + }), + ).toEqual({ + mode: "announce", + channel: "matrix", + to: "!Current:Example.Org", + }); + expect(extractDeliveryInfoMock).not.toHaveBeenCalled(); + }); + + it("uses stored session deliveryContext when current context is absent", () => { + extractDeliveryInfoMock.mockReturnValueOnce({ + deliveryContext: { + channel: "line", + to: "Cabcdef0123456789abcdef0123456789", + accountId: "primary", + }, + threadId: undefined, + }); + + expect( + resolveCronCreationDelivery({ + cfg, + agentSessionKey: "agent:main:line:group:cabcdef0123456789abcdef0123456789", + }), + ).toEqual({ + mode: "announce", + channel: "line", + to: "Cabcdef0123456789abcdef0123456789", + accountId: "primary", + }); + }); + + it("preserves parsed thread ids from stored session lookups", () => { + extractDeliveryInfoMock.mockReturnValueOnce({ + deliveryContext: { + channel: "telegram", + to: "-1001234567890", + threadId: "stale-topic", + }, + threadId: "99", + }); + + expect( + resolveCronCreationDelivery({ + cfg, + agentSessionKey: "agent:main:telegram:group:-1001234567890:topic:99", + }), + ).toEqual({ + mode: "announce", + channel: "telegram", + to: "-1001234567890", + threadId: "99", + }); + }); + + it("does not create delivery without a concrete target", () => { + expect(cronDeliveryFromContext({ channel: "matrix", to: " " })).toBeNull(); + expect( + resolveCronCreationDelivery({ + cfg, + agentSessionKey: "agent:main:matrix:channel:!abcdef:example.org", + }), + ).toBeNull(); + }); +}); diff --git a/src/cron/delivery-context.ts b/src/cron/delivery-context.ts new file mode 100644 index 00000000000..5474fa8b0be --- /dev/null +++ b/src/cron/delivery-context.ts @@ -0,0 +1,59 @@ +import { extractDeliveryInfo } from "../config/sessions/delivery-info.js"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { + normalizeDeliveryContext, + type DeliveryContext, +} from "../utils/delivery-context.shared.js"; +import type { CronDelivery, CronMessageChannel } from "./types.js"; + +export function cronDeliveryFromContext(context?: DeliveryContext): CronDelivery | null { + const normalized = normalizeDeliveryContext(context); + if (!normalized?.to) { + return null; + } + const delivery: CronDelivery = { + mode: "announce", + to: normalized.to, + }; + if (normalized.channel) { + delivery.channel = normalized.channel as CronMessageChannel; + } + if (normalized.accountId) { + delivery.accountId = normalized.accountId; + } + if (normalized.threadId != null) { + delivery.threadId = normalized.threadId; + } + return delivery; +} + +export function resolveCronStoredDeliveryContext(params: { + cfg: OpenClawConfig; + sessionKey?: string; +}): DeliveryContext | undefined { + const sessionKey = params.sessionKey?.trim(); + if (!sessionKey) { + return undefined; + } + const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey, { cfg: params.cfg }); + if (deliveryContext && threadId) { + return { ...deliveryContext, threadId }; + } + return deliveryContext; +} + +export function resolveCronCreationDelivery(params: { + cfg: OpenClawConfig; + currentDeliveryContext?: DeliveryContext; + agentSessionKey?: string; +}): CronDelivery | null { + return ( + cronDeliveryFromContext(params.currentDeliveryContext) ?? + cronDeliveryFromContext( + resolveCronStoredDeliveryContext({ + cfg: params.cfg, + sessionKey: params.agentSessionKey, + }), + ) + ); +} diff --git a/src/cron/isolated-agent/delivery-target.test.ts b/src/cron/isolated-agent/delivery-target.test.ts index 0cb4d59aa09..b51544c5b27 100644 --- a/src/cron/isolated-agent/delivery-target.test.ts +++ b/src/cron/isolated-agent/delivery-target.test.ts @@ -8,10 +8,19 @@ import { import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../../plugins/runtime.js"; import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js"; +const { extractDeliveryInfoMock } = vi.hoisted(() => ({ + extractDeliveryInfoMock: vi.fn(), +})); + vi.mock("../../config/sessions/main-session.js", () => ({ + canonicalizeMainSessionAlias: vi.fn(({ sessionKey }) => sessionKey), resolveAgentMainSessionKey: vi.fn().mockReturnValue("agent:test:main"), })); +vi.mock("../../config/sessions/delivery-info.js", () => ({ + extractDeliveryInfo: extractDeliveryInfoMock, +})); + vi.mock("../../config/sessions/paths.js", () => ({ resolveStorePath: vi.fn().mockReturnValue("/tmp/test-store.json"), })); @@ -39,6 +48,7 @@ vi.mock("../../infra/outbound/targets.runtime.js", () => ({ })); const mockedModuleIds = [ "../../config/sessions/main-session.js", + "../../config/sessions/delivery-info.js", "../../config/sessions/paths.js", "../../config/sessions/store-load.js", "../../infra/outbound/channel-selection.runtime.js", @@ -102,6 +112,8 @@ const normalizeTelegramTargetForDeliveryTest = vi.fn((raw: string): string | und beforeEach(() => { resetPluginRuntimeStateForTest(); + extractDeliveryInfoMock.mockReset(); + extractDeliveryInfoMock.mockReturnValue({ deliveryContext: undefined, threadId: undefined }); normalizeTelegramTargetForDeliveryTest.mockClear(); vi.mocked(readChannelAllowFromStoreEntriesSync).mockReset(); vi.mocked(readChannelAllowFromStoreEntriesSync).mockReturnValue([]); @@ -723,6 +735,76 @@ describe("resolveDeliveryTarget", () => { expect(result.threadId).toBe(42); }); + it("prefers stored deliveryContext lookup over exact session-store entries", async () => { + extractDeliveryInfoMock.mockReturnValueOnce({ + deliveryContext: { + channel: "alpha", + to: "RoomMixedCase", + accountId: "primary", + threadId: "thread-old-stored", + }, + threadId: "thread-stored", + }); + setSessionStore({ + "agent:test:thread:42": { + sessionId: "thread-session", + updatedAt: 2000, + lastChannel: "alpha", + lastTo: "room-lowercase", + lastThreadId: "thread-old", + }, + } as SessionStore); + + const result = await resolveDeliveryTarget(makeCfg({ bindings: [] }), AGENT_ID, { + channel: "last", + sessionKey: "agent:test:thread:42", + to: undefined, + }); + + expect(result).toMatchObject({ + ok: true, + channel: "alpha", + to: "RoomMixedCase", + accountId: "primary", + threadId: "thread-stored", + }); + }); + + it("scopes unqualified stored delivery lookups to the job agent", async () => { + extractDeliveryInfoMock.mockImplementation((sessionKey: string) => + sessionKey === "agent:agent-b:main" + ? { + deliveryContext: { + channel: "alpha", + to: "ops-room", + }, + threadId: undefined, + } + : { + deliveryContext: { + channel: "alpha", + to: "default-room", + }, + threadId: undefined, + }, + ); + + const result = await resolveDeliveryTarget(makeCfg({ bindings: [] }), AGENT_ID, { + channel: "last", + sessionKey: "main", + to: undefined, + }); + + expect(extractDeliveryInfoMock).toHaveBeenCalledWith("agent:agent-b:main", { + cfg: expect.any(Object), + }); + expect(result).toMatchObject({ + ok: true, + channel: "alpha", + to: "ops-room", + }); + }); + it("falls back to the main session entry when the requested sessionKey is missing", async () => { setSessionStore({ "agent:test:main": { diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index a302121ce8a..aa80436c7fc 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -3,6 +3,7 @@ import type { ChannelId } from "../../channels/plugins/types.public.js"; import { resolveAgentMainSessionKey } from "../../config/sessions/main-session.js"; import { resolveStorePath } from "../../config/sessions/paths.js"; import { loadSessionStore } from "../../config/sessions/store-load.js"; +import type { SessionEntry } from "../../config/sessions/types.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { formatErrorMessage } from "../../infra/errors.js"; import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-id-resolution.js"; @@ -12,6 +13,8 @@ import { resolveSessionDeliveryTarget } from "../../infra/outbound/targets-sessi import type { OutboundChannel } from "../../infra/outbound/targets.js"; import { normalizeAccountId } from "../../routing/session-key.js"; import { createLazyImportLoader } from "../../shared/lazy-promise.js"; +import { resolveCronStoredDeliveryContext } from "../delivery-context.js"; +import { resolveCronAgentSessionKey } from "./session-key.js"; export type DeliveryTargetResolution = | { @@ -133,9 +136,28 @@ export async function resolveDeliveryTarget( // Look up thread-specific session first (e.g. agent:main:main:thread:1234), // then fall back to the main session entry. - const threadSessionKey = jobPayload.sessionKey?.trim(); + const rawSessionKey = jobPayload.sessionKey?.trim(); + const threadSessionKey = rawSessionKey + ? resolveCronAgentSessionKey({ + sessionKey: rawSessionKey, + agentId, + mainKey: cfg.session?.mainKey, + cfg, + }) + : undefined; + const storedDeliveryContext = resolveCronStoredDeliveryContext({ + cfg, + sessionKey: threadSessionKey, + }); + const storedDeliveryEntry = storedDeliveryContext + ? ({ + sessionId: threadSessionKey ?? mainSessionKey, + updatedAt: 0, + deliveryContext: storedDeliveryContext, + } satisfies SessionEntry) + : undefined; const threadEntry = threadSessionKey ? store[threadSessionKey] : undefined; - const main = threadEntry ?? store[mainSessionKey]; + const main = storedDeliveryEntry ?? threadEntry ?? store[mainSessionKey]; const preliminary = resolveSessionDeliveryTarget({ entry: main,