diff --git a/CHANGELOG.md b/CHANGELOG.md index 1afd7f318a7..8b9daf4e4b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -243,6 +243,7 @@ Docs: https://docs.openclaw.ai - Auth/Codex CLI reuse: sync reused Codex CLI credentials into the supported `openai-codex:default` OAuth profile instead of reviving the deprecated `openai-codex:codex-cli` slot, so doctor cleanup no longer loops. (#45353) thanks @Gugu-sugar. - Deps/audit: bump the pinned `fast-xml-parser` override to the first patched release so `pnpm audit --prod --audit-level=high` no longer fails on the AWS Bedrock XML builder path. Thanks @vincentkoc. - Hooks/after_compaction: forward `sessionFile` for direct/manual compaction events and add `sessionFile` plus `sessionKey` to wired auto-compaction hook context so plugins receive the session metadata already declared in the hook types. (#40781) Thanks @jarimustonen. +- Sessions/BlueBubbles/cron: persist outbound session routing and transcript mirroring for new targets, auto-create BlueBubbles chats before attachment sends, and only suppress isolated cron deliveries when the run started hours late instead of merely finishing late. (#50092) ### Breaking diff --git a/extensions/bluebubbles/src/attachments.test.ts b/extensions/bluebubbles/src/attachments.test.ts index 704b907eb8b..cb40ca810e3 100644 --- a/extensions/bluebubbles/src/attachments.test.ts +++ b/extensions/bluebubbles/src/attachments.test.ts @@ -484,4 +484,94 @@ describe("sendBlueBubblesAttachment", () => { expect(bodyText).not.toContain('name="selectedMessageGuid"'); expect(bodyText).not.toContain('name="partIndex"'); }); + + it("auto-creates a new chat when sending to a phone number with no existing chat", async () => { + // First call: resolveChatGuidForTarget queries chats, returns empty (no match) + mockFetch.mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ data: [] }), + }); + // Second call: createChatForHandle creates new chat + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => + Promise.resolve( + JSON.stringify({ + data: { chatGuid: "iMessage;-;+15559876543", guid: "iMessage;-;+15559876543" }, + }), + ), + }); + // Third call: actual attachment send + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => Promise.resolve(JSON.stringify({ data: { guid: "attach-msg-1" } })), + }); + + const result = await sendBlueBubblesAttachment({ + to: "+15559876543", + buffer: new Uint8Array([1, 2, 3]), + filename: "photo.jpg", + contentType: "image/jpeg", + opts: { serverUrl: "http://localhost:1234", password: "test" }, + }); + + expect(result.messageId).toBe("attach-msg-1"); + // Verify chat creation was called + const createCallBody = JSON.parse(mockFetch.mock.calls[1][1].body); + expect(createCallBody.addresses).toEqual(["+15559876543"]); + // Verify attachment was sent to the newly created chat + const attachBody = mockFetch.mock.calls[2][1]?.body as Uint8Array; + const attachText = decodeBody(attachBody); + expect(attachText).toContain("iMessage;-;+15559876543"); + }); + + it("retries chatGuid resolution after creating a chat with no returned guid", async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ data: [] }), + }); + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => Promise.resolve(JSON.stringify({ data: {} })), + }); + mockFetch.mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ data: [{ guid: "iMessage;-;+15557654321" }] }), + }); + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => Promise.resolve(JSON.stringify({ data: { guid: "attach-msg-2" } })), + }); + + const result = await sendBlueBubblesAttachment({ + to: "+15557654321", + buffer: new Uint8Array([4, 5, 6]), + filename: "photo.jpg", + contentType: "image/jpeg", + opts: { serverUrl: "http://localhost:1234", password: "test" }, + }); + + expect(result.messageId).toBe("attach-msg-2"); + const createCallBody = JSON.parse(mockFetch.mock.calls[1][1].body); + expect(createCallBody.addresses).toEqual(["+15557654321"]); + const attachBody = mockFetch.mock.calls[3][1]?.body as Uint8Array; + const attachText = decodeBody(attachBody); + expect(attachText).toContain("iMessage;-;+15557654321"); + }); + + it("still throws for non-handle targets when chatGuid is not found", async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ data: [] }), + }); + + await expect( + sendBlueBubblesAttachment({ + to: "chat_id:999", + buffer: new Uint8Array([1, 2, 3]), + filename: "photo.jpg", + opts: { serverUrl: "http://localhost:1234", password: "test" }, + }), + ).rejects.toThrow("chatGuid not found"); + }); }); diff --git a/extensions/bluebubbles/src/attachments.ts b/extensions/bluebubbles/src/attachments.ts index 5aab9fd3b68..4c6fd09d6d5 100644 --- a/extensions/bluebubbles/src/attachments.ts +++ b/extensions/bluebubbles/src/attachments.ts @@ -10,7 +10,7 @@ import { resolveRequestUrl } from "./request-url.js"; import type { OpenClawConfig } from "./runtime-api.js"; import { getBlueBubblesRuntime, warnBlueBubbles } from "./runtime.js"; import { extractBlueBubblesMessageId, resolveBlueBubblesSendTarget } from "./send-helpers.js"; -import { resolveChatGuidForTarget } from "./send.js"; +import { resolveChatGuidForTarget, createChatForHandle } from "./send.js"; import { blueBubblesFetchWithTimeout, buildBlueBubblesApiUrl, @@ -180,16 +180,37 @@ export async function sendBlueBubblesAttachment(params: { } const target = resolveBlueBubblesSendTarget(to); - const chatGuid = await resolveChatGuidForTarget({ + let chatGuid = await resolveChatGuidForTarget({ baseUrl, password, timeoutMs: opts.timeoutMs, target, }); if (!chatGuid) { - throw new Error( - "BlueBubbles attachment send failed: chatGuid not found for target. Use a chat_guid target or ensure the chat exists.", - ); + // For handle targets (phone numbers/emails), auto-create a new DM chat + if (target.kind === "handle") { + const created = await createChatForHandle({ + baseUrl, + password, + address: target.address, + timeoutMs: opts.timeoutMs, + }); + chatGuid = created.chatGuid; + // If we still don't have a chatGuid, try resolving again (chat was created server-side) + if (!chatGuid) { + chatGuid = await resolveChatGuidForTarget({ + baseUrl, + password, + timeoutMs: opts.timeoutMs, + target, + }); + } + } + if (!chatGuid) { + throw new Error( + "BlueBubbles attachment send failed: chatGuid not found for target. Use a chat_guid target or ensure the chat exists.", + ); + } } const url = buildBlueBubblesApiUrl({ diff --git a/extensions/bluebubbles/src/send.test.ts b/extensions/bluebubbles/src/send.test.ts index f820ebd9b8b..ecb8b1f68e0 100644 --- a/extensions/bluebubbles/src/send.test.ts +++ b/extensions/bluebubbles/src/send.test.ts @@ -3,7 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import "./test-mocks.js"; import { getCachedBlueBubblesPrivateApiStatus } from "./probe.js"; import { clearBlueBubblesRuntime, setBlueBubblesRuntime } from "./runtime.js"; -import { sendMessageBlueBubbles, resolveChatGuidForTarget } from "./send.js"; +import { sendMessageBlueBubbles, resolveChatGuidForTarget, createChatForHandle } from "./send.js"; import { BLUE_BUBBLES_PRIVATE_API_STATUS, installBlueBubblesFetchTestHooks, @@ -781,4 +781,109 @@ describe("send", () => { expect(body.tempGuid.length).toBeGreaterThan(0); }); }); + + describe("createChatForHandle", () => { + it("creates a new chat and returns chatGuid from response", async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => + Promise.resolve( + JSON.stringify({ + data: { guid: "iMessage;-;+15559876543", chatGuid: "iMessage;-;+15559876543" }, + }), + ), + }); + + const result = await createChatForHandle({ + baseUrl: "http://localhost:1234", + password: "test", + address: "+15559876543", + message: "Hello!", + }); + + expect(result.chatGuid).toBe("iMessage;-;+15559876543"); + expect(result.messageId).toBeDefined(); + const body = JSON.parse(mockFetch.mock.calls[0][1].body); + expect(body.addresses).toEqual(["+15559876543"]); + expect(body.message).toBe("Hello!"); + }); + + it("creates a new chat without a message when message is omitted", async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => + Promise.resolve( + JSON.stringify({ + data: { guid: "iMessage;-;+15559876543" }, + }), + ), + }); + + const result = await createChatForHandle({ + baseUrl: "http://localhost:1234", + password: "test", + address: "+15559876543", + }); + + expect(result.chatGuid).toBe("iMessage;-;+15559876543"); + const body = JSON.parse(mockFetch.mock.calls[0][1].body); + expect(body.message).toBe(""); + }); + + it.each([ + ["data.chatGuid", { data: { chatGuid: "shape-chat-guid" } }, "shape-chat-guid"], + ["data.guid", { data: { guid: "shape-guid" } }, "shape-guid"], + [ + "data.chats[0].guid", + { data: { chats: [{ guid: "shape-array-guid" }] } }, + "shape-array-guid", + ], + ["data.chat.guid", { data: { chat: { guid: "shape-object-guid" } } }, "shape-object-guid"], + ])("extracts chatGuid from %s", async (_label, responseBody, expectedChatGuid) => { + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => Promise.resolve(JSON.stringify(responseBody)), + }); + + const result = await createChatForHandle({ + baseUrl: "http://localhost:1234", + password: "test", + address: "+15559876543", + }); + + expect(result.chatGuid).toBe(expectedChatGuid); + }); + + it("throws when Private API is not enabled", async () => { + mockFetch.mockResolvedValueOnce({ + ok: false, + status: 403, + text: () => Promise.resolve("Private API not enabled"), + }); + + await expect( + createChatForHandle({ + baseUrl: "http://localhost:1234", + password: "test", + address: "+15559876543", + }), + ).rejects.toThrow("Private API must be enabled"); + }); + + it("returns null chatGuid when response has no chat data", async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => Promise.resolve(JSON.stringify({ data: {} })), + }); + + const result = await createChatForHandle({ + baseUrl: "http://localhost:1234", + password: "test", + address: "+15559876543", + message: "Hello", + }); + + expect(result.chatGuid).toBeNull(); + }); + }); }); diff --git a/extensions/bluebubbles/src/send.ts b/extensions/bluebubbles/src/send.ts index 8fe622d13ff..a59bf993a55 100644 --- a/extensions/bluebubbles/src/send.ts +++ b/extensions/bluebubbles/src/send.ts @@ -312,16 +312,20 @@ export async function resolveChatGuidForTarget(params: { } /** - * Creates a new chat (DM) and optionally sends an initial message. + * Creates a new DM chat for the given address and returns the chat GUID. * Requires Private API to be enabled in BlueBubbles. + * + * If a `message` is provided it is sent as the initial message in the new chat; + * otherwise an empty-string message body is used (BlueBubbles still creates the + * chat but will not deliver a visible bubble). */ -async function createNewChatWithMessage(params: { +export async function createChatForHandle(params: { baseUrl: string; password: string; address: string; - message: string; + message?: string; timeoutMs?: number; -}): Promise { +}): Promise<{ chatGuid: string | null; messageId: string }> { const url = buildBlueBubblesApiUrl({ baseUrl: params.baseUrl, path: "/api/v1/chat/new", @@ -329,7 +333,7 @@ async function createNewChatWithMessage(params: { }); const payload = { addresses: [params.address], - message: params.message, + message: params.message ?? "", tempGuid: `temp-${crypto.randomUUID()}`, }; const res = await blueBubblesFetchWithTimeout( @@ -343,7 +347,6 @@ async function createNewChatWithMessage(params: { ); if (!res.ok) { const errorText = await res.text(); - // Check for Private API not enabled error if ( res.status === 400 || res.status === 403 || @@ -355,7 +358,64 @@ async function createNewChatWithMessage(params: { } throw new Error(`BlueBubbles create chat failed (${res.status}): ${errorText || "unknown"}`); } - return parseBlueBubblesMessageResponse(res); + const body = await res.text(); + let messageId = "ok"; + let chatGuid: string | null = null; + if (body) { + try { + const parsed = JSON.parse(body) as Record; + messageId = extractBlueBubblesMessageId(parsed); + // Extract chatGuid from the response data + const data = parsed.data as Record | undefined; + if (data) { + chatGuid = + (typeof data.chatGuid === "string" && data.chatGuid) || + (typeof data.guid === "string" && data.guid) || + null; + // Also try nested chats array (some BB versions nest it) + if (!chatGuid) { + const chats = data.chats ?? data.chat; + if (Array.isArray(chats) && chats.length > 0) { + const first = chats[0] as Record | undefined; + chatGuid = + (typeof first?.guid === "string" && first.guid) || + (typeof first?.chatGuid === "string" && first.chatGuid) || + null; + } else if (chats && typeof chats === "object" && !Array.isArray(chats)) { + const chatObj = chats as Record; + chatGuid = + (typeof chatObj.guid === "string" && chatObj.guid) || + (typeof chatObj.chatGuid === "string" && chatObj.chatGuid) || + null; + } + } + } + } catch { + // ignore parse errors + } + } + return { chatGuid, messageId }; +} + +/** + * Creates a new chat (DM) and sends an initial message. + * Requires Private API to be enabled in BlueBubbles. + */ +async function createNewChatWithMessage(params: { + baseUrl: string; + password: string; + address: string; + message: string; + timeoutMs?: number; +}): Promise { + const result = await createChatForHandle({ + baseUrl: params.baseUrl, + password: params.password, + address: params.address, + message: params.message, + timeoutMs: params.timeoutMs, + }); + return { messageId: result.messageId }; } export async function sendMessageBlueBubbles( diff --git a/extensions/slack/src/channel.test.ts b/extensions/slack/src/channel.test.ts index 73acfe3aeb7..691b6126557 100644 --- a/extensions/slack/src/channel.test.ts +++ b/extensions/slack/src/channel.test.ts @@ -308,6 +308,84 @@ describe("slackPlugin agentPrompt", () => { }); }); +describe("slackPlugin outbound new targets", () => { + const cfg = { + channels: { + slack: { + botToken: "xoxb-test", + appToken: "xapp-test", + }, + }, + }; + + it("sends to a new user target via DM without erroring", async () => { + const sendSlack = vi.fn().mockResolvedValue({ messageId: "m-new-user", channelId: "D999" }); + const sendText = slackPlugin.outbound?.sendText; + expect(sendText).toBeDefined(); + + const result = await sendText!({ + cfg, + to: "user:U99NEW", + text: "hello new user", + accountId: "default", + deps: { sendSlack }, + }); + + expect(sendSlack).toHaveBeenCalledWith( + "user:U99NEW", + "hello new user", + expect.objectContaining({ cfg }), + ); + expect(result).toEqual({ channel: "slack", messageId: "m-new-user", channelId: "D999" }); + }); + + it("sends to a new channel target without erroring", async () => { + const sendSlack = vi.fn().mockResolvedValue({ messageId: "m-new-chan", channelId: "C555" }); + const sendText = slackPlugin.outbound?.sendText; + expect(sendText).toBeDefined(); + + const result = await sendText!({ + cfg, + to: "channel:C555NEW", + text: "hello channel", + accountId: "default", + deps: { sendSlack }, + }); + + expect(sendSlack).toHaveBeenCalledWith( + "channel:C555NEW", + "hello channel", + expect.objectContaining({ cfg }), + ); + expect(result).toEqual({ channel: "slack", messageId: "m-new-chan", channelId: "C555" }); + }); + + it("sends media to a new user target without erroring", async () => { + const sendSlack = vi.fn().mockResolvedValue({ messageId: "m-new-media", channelId: "D888" }); + const sendMedia = slackPlugin.outbound?.sendMedia; + expect(sendMedia).toBeDefined(); + + const result = await sendMedia!({ + cfg, + to: "user:U88NEW", + text: "here is a file", + mediaUrl: "https://example.com/file.png", + accountId: "default", + deps: { sendSlack }, + }); + + expect(sendSlack).toHaveBeenCalledWith( + "user:U88NEW", + "here is a file", + expect.objectContaining({ + cfg, + mediaUrl: "https://example.com/file.png", + }), + ); + expect(result).toEqual({ channel: "slack", messageId: "m-new-media", channelId: "D888" }); + }); +}); + describe("slackPlugin config", () => { it("treats HTTP mode accounts with bot token + signing secret as configured", async () => { const cfg: OpenClawConfig = { diff --git a/src/config/sessions/sessions.test.ts b/src/config/sessions/sessions.test.ts index eedf63913eb..c0afc4aad8e 100644 --- a/src/config/sessions/sessions.test.ts +++ b/src/config/sessions/sessions.test.ts @@ -425,6 +425,52 @@ describe("appendAssistantMessageToSessionTranscript", () => { expect(messageLine.message.content[0].text).toBe("Hello from delivery mirror!"); }); + it("finds session entry using normalized (lowercased) key", async () => { + const sessionId = "test-session-normalized"; + // Store key is lowercase (as written by updateSessionStore/normalizeStoreSessionKey) + const storeKey = "agent:main:bluebubbles:direct:+15551234567"; + const store = { + [storeKey]: { + sessionId, + chatType: "direct", + channel: "bluebubbles", + }, + }; + fs.writeFileSync(fixture.storePath(), JSON.stringify(store), "utf-8"); + + // Pass a mixed-case key — append should still find the entry via normalization + const result = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:BlueBubbles:direct:+15551234567", + text: "Hello normalized!", + storePath: fixture.storePath(), + }); + + expect(result.ok).toBe(true); + }); + + it("finds Slack session entry using normalized (lowercased) key", async () => { + const sessionId = "test-slack-session"; + // Slack session keys include channel type and target ID; store key is lowercase + const storeKey = "agent:main:slack:direct:u12345abc"; + const store = { + [storeKey]: { + sessionId, + chatType: "direct", + channel: "slack", + }, + }; + fs.writeFileSync(fixture.storePath(), JSON.stringify(store), "utf-8"); + + // Pass a mixed-case key (as resolveSlackSession might produce) — normalization should match + const result = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:slack:direct:U12345ABC", + text: "Hello Slack user!", + storePath: fixture.storePath(), + }); + + expect(result.ok).toBe(true); + }); + it("ignores malformed transcript lines when checking mirror idempotency", async () => { writeTranscriptStore(); diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index aa1890de953..78bf1eb0cb9 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -10,7 +10,7 @@ import { resolveSessionTranscriptPath, } from "./paths.js"; import { resolveAndPersistSessionFile } from "./session-file.js"; -import { loadSessionStore } from "./store.js"; +import { loadSessionStore, normalizeStoreSessionKey } from "./store.js"; import type { SessionEntry } from "./types.js"; function stripQuery(value: string): string { @@ -154,7 +154,8 @@ export async function appendAssistantMessageToSessionTranscript(params: { const storePath = params.storePath ?? resolveDefaultSessionStorePath(params.agentId); const store = loadSessionStore(storePath, { skipCache: true }); - const entry = store[sessionKey] as SessionEntry | undefined; + const normalizedKey = normalizeStoreSessionKey(sessionKey); + const entry = (store[normalizedKey] ?? store[sessionKey]) as SessionEntry | undefined; if (!entry?.sessionId) { return { ok: false, reason: `unknown sessionKey: ${sessionKey}` }; } diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts index b245b4b9c94..4ed41f7de3a 100644 --- a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -143,6 +143,7 @@ describe("dispatchCronDelivery — double-announce guard", () => { }); afterEach(() => { + vi.useRealTimers(); vi.unstubAllEnvs(); }); @@ -255,6 +256,59 @@ describe("dispatchCronDelivery — double-announce guard", () => { ).toBe(false); }); + it("skips stale cron deliveries while still suppressing fallback main summary", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-18T17:00:00.000Z")); + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + + const params = makeBaseParams({ synthesizedText: "Yesterday's morning briefing." }); + (params.job as { state?: { nextRunAtMs?: number } }).state = { + nextRunAtMs: Date.now() - (3 * 60 * 60_000 + 1), + }; + + const state = await dispatchCronDelivery(params); + + expect(state.result).toEqual( + expect.objectContaining({ + status: "ok", + delivered: false, + deliveryAttempted: true, + }), + ); + expect(deliverOutboundPayloads).not.toHaveBeenCalled(); + expect( + shouldEnqueueCronMainSummary({ + summaryText: "Yesterday's morning briefing.", + deliveryRequested: true, + delivered: state.result?.delivered, + deliveryAttempted: state.result?.deliveryAttempted, + suppressMainSummary: false, + isCronSystemEvent: () => true, + }), + ).toBe(false); + }); + + it("still delivers when the run started on time but finished more than three hours later", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-18T17:00:00.000Z")); + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]); + + const params = makeBaseParams({ synthesizedText: "Long running report finished." }); + params.runStartedAt = Date.now() - (3 * 60 * 60_000 + 1); + (params.job as { state?: { nextRunAtMs?: number } }).state = { + nextRunAtMs: params.runStartedAt, + }; + + const state = await dispatchCronDelivery(params); + + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + expect(state.delivered).toBe(true); + expect(state.deliveryAttempted).toBe(true); + }); + it("text delivery fires exactly once (no double-deliver)", async () => { vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index 6ddddf20669..eda32740e4a 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -134,6 +134,8 @@ const PERMANENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [ /outbound not configured for channel/i, ]; +const STALE_CRON_DELIVERY_MAX_START_DELAY_MS = 3 * 60 * 60_000; + type CompletedDirectCronDelivery = { ts: number; results: OutboundDeliveryResult[]; @@ -174,6 +176,21 @@ function pruneCompletedDirectCronDeliveries(now: number) { } } +function resolveCronDeliveryScheduledAtMs(params: { job: CronJob; runStartedAt: number }): number { + const scheduledAt = params.job.state?.nextRunAtMs; + return typeof scheduledAt === "number" && Number.isFinite(scheduledAt) + ? scheduledAt + : params.runStartedAt; +} + +function resolveCronDeliveryStartDelayMs(params: { job: CronJob; runStartedAt: number }): number { + return params.runStartedAt - resolveCronDeliveryScheduledAtMs(params); +} + +function isStaleCronDelivery(params: { job: CronJob; runStartedAt: number }): boolean { + return resolveCronDeliveryStartDelayMs(params) > STALE_CRON_DELIVERY_MAX_START_DELAY_MS; +} + function rememberCompletedDirectCronDelivery( idempotencyKey: string, results: readonly OutboundDeliveryResult[], @@ -331,6 +348,35 @@ export async function dispatchCronDelivery( ...params.telemetry, }); } + if ( + params.deliveryRequested && + isStaleCronDelivery({ + job: params.job, + runStartedAt: params.runStartedAt, + }) + ) { + deliveryAttempted = true; + const nowMs = Date.now(); + const scheduledAtMs = resolveCronDeliveryScheduledAtMs({ + job: params.job, + runStartedAt: params.runStartedAt, + }); + const startDelayMs = resolveCronDeliveryStartDelayMs({ + job: params.job, + runStartedAt: params.runStartedAt, + }); + logWarn( + `[cron:${params.job.id}] skipping stale delivery scheduled at ${new Date(scheduledAtMs).toISOString()}, started ${Math.round(startDelayMs / 60_000)}m late, current age ${Math.round((nowMs - scheduledAtMs) / 60_000)}m`, + ); + return params.withRunSession({ + status: "ok", + summary, + outputText, + deliveryAttempted, + delivered: false, + ...params.telemetry, + }); + } deliveryAttempted = true; const cachedResults = getCompletedDirectCronDelivery(deliveryIdempotencyKey); if (cachedResults) { diff --git a/src/infra/outbound/outbound-session.ts b/src/infra/outbound/outbound-session.ts index 6d990c8b0e6..8eefc3e5504 100644 --- a/src/infra/outbound/outbound-session.ts +++ b/src/infra/outbound/outbound-session.ts @@ -1,11 +1,31 @@ +import { parseDiscordTarget } from "../../../extensions/discord/src/targets.js"; +import { + parseIMessageTarget, + normalizeIMessageHandle, +} from "../../../extensions/imessage/src/targets.js"; +import { + looksLikeUuid, + resolveSignalPeerId, + resolveSignalRecipient, + resolveSignalSender, +} from "../../../extensions/signal/src/identity.js"; +import { resolveSlackAccount } from "../../../extensions/slack/src/accounts.js"; +import { createSlackWebClient } from "../../../extensions/slack/src/client.js"; +import { normalizeAllowListLower } from "../../../extensions/slack/src/monitor/allow-list.js"; +import { parseSlackTarget } from "../../../extensions/slack/src/targets.js"; +import { buildTelegramGroupPeerId } from "../../../extensions/telegram/src/bot/helpers.js"; +import { resolveTelegramTargetChatType } from "../../../extensions/telegram/src/inline-buttons.js"; +import { parseTelegramThreadId } from "../../../extensions/telegram/src/outbound-params.js"; +import { parseTelegramTarget } from "../../../extensions/telegram/src/targets.js"; import type { MsgContext } from "../../auto-reply/templating.js"; import type { ChatType } from "../../channels/chat-type.js"; import { getChannelPlugin } from "../../channels/plugins/index.js"; import type { ChannelId } from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; import { recordSessionMetaFromInbound, resolveStorePath } from "../../config/sessions.js"; -import type { RoutePeer } from "../../routing/resolve-route.js"; -import { buildOutboundBaseSessionKey } from "./base-session-key.js"; +import { buildAgentSessionKey, type RoutePeer } from "../../routing/resolve-route.js"; +import { resolveThreadSessionKeys } from "../../routing/session-key.js"; +import { isWhatsAppGroupJid, normalizeWhatsAppTarget } from "../../whatsapp/normalize.js"; import type { ResolvedMessagingTarget } from "./target-resolver.js"; export type OutboundSessionRoute = { @@ -29,6 +49,23 @@ export type ResolveOutboundSessionRouteParams = { threadId?: string | number | null; }; +// Cache Slack channel type lookups to avoid repeated API calls. +const SLACK_CHANNEL_TYPE_CACHE = new Map(); + +function normalizeThreadId(value?: string | number | null): string | undefined { + if (value == null) { + return undefined; + } + if (typeof value === "number") { + if (!Number.isFinite(value)) { + return undefined; + } + return String(Math.trunc(value)); + } + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; +} + function stripProviderPrefix(raw: string, channel: string): string { const trimmed = raw.trim(); const lower = trimmed.toLowerCase(); @@ -74,7 +111,779 @@ function buildBaseSessionKey(params: { accountId?: string | null; peer: RoutePeer; }): string { - return buildOutboundBaseSessionKey(params); + return buildAgentSessionKey({ + agentId: params.agentId, + channel: params.channel, + accountId: params.accountId, + peer: params.peer, + dmScope: params.cfg.session?.dmScope ?? "main", + identityLinks: params.cfg.session?.identityLinks, + }); +} + +// Best-effort mpim detection: allowlist/config, then Slack API (if token available). +async function resolveSlackChannelType(params: { + cfg: OpenClawConfig; + accountId?: string | null; + channelId: string; +}): Promise<"channel" | "group" | "dm" | "unknown"> { + const channelId = params.channelId.trim(); + if (!channelId) { + return "unknown"; + } + const cached = SLACK_CHANNEL_TYPE_CACHE.get(`${params.accountId ?? "default"}:${channelId}`); + if (cached) { + return cached; + } + + const account = resolveSlackAccount({ cfg: params.cfg, accountId: params.accountId }); + const groupChannels = normalizeAllowListLower(account.dm?.groupChannels); + const channelIdLower = channelId.toLowerCase(); + if ( + groupChannels.includes(channelIdLower) || + groupChannels.includes(`slack:${channelIdLower}`) || + groupChannels.includes(`channel:${channelIdLower}`) || + groupChannels.includes(`group:${channelIdLower}`) || + groupChannels.includes(`mpim:${channelIdLower}`) + ) { + SLACK_CHANNEL_TYPE_CACHE.set(`${account.accountId}:${channelId}`, "group"); + return "group"; + } + + const channelKeys = Object.keys(account.channels ?? {}); + if ( + channelKeys.some((key) => { + const normalized = key.trim().toLowerCase(); + return ( + normalized === channelIdLower || + normalized === `channel:${channelIdLower}` || + normalized.replace(/^#/, "") === channelIdLower + ); + }) + ) { + SLACK_CHANNEL_TYPE_CACHE.set(`${account.accountId}:${channelId}`, "channel"); + return "channel"; + } + + const token = account.botToken?.trim() || account.userToken || ""; + if (!token) { + SLACK_CHANNEL_TYPE_CACHE.set(`${account.accountId}:${channelId}`, "unknown"); + return "unknown"; + } + + try { + const client = createSlackWebClient(token); + const info = await client.conversations.info({ channel: channelId }); + const channel = info.channel as { is_im?: boolean; is_mpim?: boolean } | undefined; + const type = channel?.is_im ? "dm" : channel?.is_mpim ? "group" : "channel"; + SLACK_CHANNEL_TYPE_CACHE.set(`${account.accountId}:${channelId}`, type); + return type; + } catch { + SLACK_CHANNEL_TYPE_CACHE.set(`${account.accountId}:${channelId}`, "unknown"); + return "unknown"; + } +} + +async function resolveSlackSession( + params: ResolveOutboundSessionRouteParams, +): Promise { + const parsed = parseSlackTarget(params.target, { defaultKind: "channel" }); + if (!parsed) { + return null; + } + const isDm = parsed.kind === "user"; + let peerKind: ChatType = isDm ? "direct" : "channel"; + if (!isDm && /^G/i.test(parsed.id)) { + // Slack mpim/group DMs share the G-prefix; detect to align session keys with inbound. + const channelType = await resolveSlackChannelType({ + cfg: params.cfg, + accountId: params.accountId, + channelId: parsed.id, + }); + if (channelType === "group") { + peerKind = "group"; + } + if (channelType === "dm") { + peerKind = "direct"; + } + } + const peer: RoutePeer = { + kind: peerKind, + id: parsed.id, + }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "slack", + accountId: params.accountId, + peer, + }); + const threadId = normalizeThreadId(params.threadId ?? params.replyToId); + const threadKeys = resolveThreadSessionKeys({ + baseSessionKey, + threadId, + }); + return { + sessionKey: threadKeys.sessionKey, + baseSessionKey, + peer, + chatType: peerKind === "direct" ? "direct" : "channel", + from: + peerKind === "direct" + ? `slack:${parsed.id}` + : peerKind === "group" + ? `slack:group:${parsed.id}` + : `slack:channel:${parsed.id}`, + to: peerKind === "direct" ? `user:${parsed.id}` : `channel:${parsed.id}`, + threadId, + }; +} + +function resolveDiscordSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + const parsed = parseDiscordTarget(params.target, { + defaultKind: resolveDiscordOutboundTargetKindHint(params), + }); + if (!parsed) { + return null; + } + const isDm = parsed.kind === "user"; + const peer: RoutePeer = { + kind: isDm ? "direct" : "channel", + id: parsed.id, + }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "discord", + accountId: params.accountId, + peer, + }); + const explicitThreadId = normalizeThreadId(params.threadId); + const threadCandidate = explicitThreadId ?? normalizeThreadId(params.replyToId); + // Discord threads use their own channel id; avoid adding a :thread suffix. + const threadKeys = resolveThreadSessionKeys({ + baseSessionKey, + threadId: threadCandidate, + useSuffix: false, + }); + return { + sessionKey: threadKeys.sessionKey, + baseSessionKey, + peer, + chatType: isDm ? "direct" : "channel", + from: isDm ? `discord:${parsed.id}` : `discord:channel:${parsed.id}`, + to: isDm ? `user:${parsed.id}` : `channel:${parsed.id}`, + threadId: explicitThreadId ?? undefined, + }; +} + +function resolveDiscordOutboundTargetKindHint( + params: ResolveOutboundSessionRouteParams, +): "user" | "channel" | undefined { + const resolvedKind = params.resolvedTarget?.kind; + if (resolvedKind === "user") { + return "user"; + } + if (resolvedKind === "group" || resolvedKind === "channel") { + return "channel"; + } + + const target = params.target.trim(); + if (/^channel:/i.test(target)) { + return "channel"; + } + if (/^(user:|discord:|@|<@!?)/i.test(target)) { + return "user"; + } + return undefined; +} + +function resolveTelegramSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + const parsed = parseTelegramTarget(params.target); + const chatId = parsed.chatId.trim(); + if (!chatId) { + return null; + } + const parsedThreadId = parsed.messageThreadId; + const fallbackThreadId = normalizeThreadId(params.threadId); + const resolvedThreadId = parsedThreadId ?? parseTelegramThreadId(fallbackThreadId); + // Telegram topics are encoded in the peer id (chatId:topic:). + const chatType = resolveTelegramTargetChatType(params.target); + // If the target is a username and we lack a resolvedTarget, default to DM to avoid group keys. + const isGroup = + chatType === "group" || + (chatType === "unknown" && + params.resolvedTarget?.kind && + params.resolvedTarget.kind !== "user"); + // For groups: include thread ID in peerId. For DMs: use simple chatId (thread handled via suffix). + const peerId = + isGroup && resolvedThreadId ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : chatId; + const peer: RoutePeer = { + kind: isGroup ? "group" : "direct", + id: peerId, + }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "telegram", + accountId: params.accountId, + peer, + }); + // Use thread suffix for DM topics to match inbound session key format + const threadKeys = + resolvedThreadId && !isGroup + ? { sessionKey: `${baseSessionKey}:thread:${resolvedThreadId}` } + : null; + return { + sessionKey: threadKeys?.sessionKey ?? baseSessionKey, + baseSessionKey, + peer, + chatType: isGroup ? "group" : "direct", + from: isGroup + ? `telegram:group:${peerId}` + : resolvedThreadId + ? `telegram:${chatId}:topic:${resolvedThreadId}` + : `telegram:${chatId}`, + to: `telegram:${chatId}`, + threadId: resolvedThreadId, + }; +} + +function resolveWhatsAppSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + const normalized = normalizeWhatsAppTarget(params.target); + if (!normalized) { + return null; + } + const isGroup = isWhatsAppGroupJid(normalized); + const peer: RoutePeer = { + kind: isGroup ? "group" : "direct", + id: normalized, + }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "whatsapp", + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: isGroup ? "group" : "direct", + from: normalized, + to: normalized, + }; +} + +function resolveSignalSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + const stripped = stripProviderPrefix(params.target, "signal"); + const lowered = stripped.toLowerCase(); + if (lowered.startsWith("group:")) { + const groupId = stripped.slice("group:".length).trim(); + if (!groupId) { + return null; + } + const peer: RoutePeer = { kind: "group", id: groupId }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "signal", + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: "group", + from: `group:${groupId}`, + to: `group:${groupId}`, + }; + } + + let recipient = stripped.trim(); + if (lowered.startsWith("username:")) { + recipient = stripped.slice("username:".length).trim(); + } else if (lowered.startsWith("u:")) { + recipient = stripped.slice("u:".length).trim(); + } + if (!recipient) { + return null; + } + + const uuidCandidate = recipient.toLowerCase().startsWith("uuid:") + ? recipient.slice("uuid:".length) + : recipient; + const sender = resolveSignalSender({ + sourceUuid: looksLikeUuid(uuidCandidate) ? uuidCandidate : null, + sourceNumber: looksLikeUuid(uuidCandidate) ? null : recipient, + }); + const peerId = sender ? resolveSignalPeerId(sender) : recipient; + const displayRecipient = sender ? resolveSignalRecipient(sender) : recipient; + const peer: RoutePeer = { kind: "direct", id: peerId }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "signal", + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: "direct", + from: `signal:${displayRecipient}`, + to: `signal:${displayRecipient}`, + }; +} + +function resolveIMessageSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + const parsed = parseIMessageTarget(params.target); + if (parsed.kind === "handle") { + const handle = normalizeIMessageHandle(parsed.to); + if (!handle) { + return null; + } + const peer: RoutePeer = { kind: "direct", id: handle }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "imessage", + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: "direct", + from: `imessage:${handle}`, + to: `imessage:${handle}`, + }; + } + + const peerId = + parsed.kind === "chat_id" + ? String(parsed.chatId) + : parsed.kind === "chat_guid" + ? parsed.chatGuid + : parsed.chatIdentifier; + if (!peerId) { + return null; + } + const peer: RoutePeer = { kind: "group", id: peerId }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "imessage", + accountId: params.accountId, + peer, + }); + const toPrefix = + parsed.kind === "chat_id" + ? "chat_id" + : parsed.kind === "chat_guid" + ? "chat_guid" + : "chat_identifier"; + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: "group", + from: `imessage:group:${peerId}`, + to: `${toPrefix}:${peerId}`, + }; +} + +function resolveMatrixSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + const stripped = stripProviderPrefix(params.target, "matrix"); + const isUser = + params.resolvedTarget?.kind === "user" || stripped.startsWith("@") || /^user:/i.test(stripped); + const rawId = stripKindPrefix(stripped); + if (!rawId) { + return null; + } + const peer: RoutePeer = { kind: isUser ? "direct" : "channel", id: rawId }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "matrix", + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: isUser ? "direct" : "channel", + from: isUser ? `matrix:${rawId}` : `matrix:channel:${rawId}`, + to: `room:${rawId}`, + }; +} + +function resolveMSTeamsSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + let trimmed = params.target.trim(); + if (!trimmed) { + return null; + } + trimmed = trimmed.replace(/^(msteams|teams):/i, "").trim(); + + const lower = trimmed.toLowerCase(); + const isUser = lower.startsWith("user:"); + const rawId = stripKindPrefix(trimmed); + if (!rawId) { + return null; + } + const conversationId = rawId.split(";")[0] ?? rawId; + const isChannel = !isUser && /@thread\.tacv2/i.test(conversationId); + const peer: RoutePeer = { + kind: isUser ? "direct" : isChannel ? "channel" : "group", + id: conversationId, + }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "msteams", + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: isUser ? "direct" : isChannel ? "channel" : "group", + from: isUser + ? `msteams:${conversationId}` + : isChannel + ? `msteams:channel:${conversationId}` + : `msteams:group:${conversationId}`, + to: isUser ? `user:${conversationId}` : `conversation:${conversationId}`, + }; +} + +function resolveMattermostSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + let trimmed = params.target.trim(); + if (!trimmed) { + return null; + } + trimmed = trimmed.replace(/^mattermost:/i, "").trim(); + const lower = trimmed.toLowerCase(); + const resolvedKind = params.resolvedTarget?.kind; + const isUser = + resolvedKind === "user" || + (resolvedKind !== "channel" && + resolvedKind !== "group" && + (lower.startsWith("user:") || trimmed.startsWith("@"))); + if (trimmed.startsWith("@")) { + trimmed = trimmed.slice(1).trim(); + } + const rawId = stripKindPrefix(trimmed); + if (!rawId) { + return null; + } + const peer: RoutePeer = { kind: isUser ? "direct" : "channel", id: rawId }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "mattermost", + accountId: params.accountId, + peer, + }); + const threadId = normalizeThreadId(params.replyToId ?? params.threadId); + const threadKeys = resolveThreadSessionKeys({ + baseSessionKey, + threadId, + }); + return { + sessionKey: threadKeys.sessionKey, + baseSessionKey, + peer, + chatType: isUser ? "direct" : "channel", + from: isUser ? `mattermost:${rawId}` : `mattermost:channel:${rawId}`, + to: isUser ? `user:${rawId}` : `channel:${rawId}`, + threadId, + }; +} + +function resolveBlueBubblesSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + const stripped = stripProviderPrefix(params.target, "bluebubbles"); + const lower = stripped.toLowerCase(); + const isGroup = + lower.startsWith("chat_id:") || + lower.startsWith("chat_guid:") || + lower.startsWith("chat_identifier:") || + lower.startsWith("group:"); + const rawPeerId = isGroup + ? stripKindPrefix(stripped) + : stripped.replace(/^(imessage|sms|auto):/i, ""); + // BlueBubbles inbound group ids omit chat_* prefixes; strip them to align sessions. + const peerId = isGroup + ? rawPeerId.replace(/^(chat_id|chat_guid|chat_identifier):/i, "") + : rawPeerId; + if (!peerId) { + return null; + } + const peer: RoutePeer = { + kind: isGroup ? "group" : "direct", + id: peerId, + }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "bluebubbles", + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: isGroup ? "group" : "direct", + from: isGroup ? `group:${peerId}` : `bluebubbles:${peerId}`, + to: `bluebubbles:${stripped}`, + }; +} + +function resolveNextcloudTalkSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + let trimmed = params.target.trim(); + if (!trimmed) { + return null; + } + trimmed = trimmed.replace(/^(nextcloud-talk|nc-talk|nc):/i, "").trim(); + trimmed = trimmed.replace(/^room:/i, "").trim(); + if (!trimmed) { + return null; + } + const peer: RoutePeer = { kind: "group", id: trimmed }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "nextcloud-talk", + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: "group", + from: `nextcloud-talk:room:${trimmed}`, + to: `nextcloud-talk:${trimmed}`, + }; +} + +function resolveZaloSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + return resolveZaloLikeSession(params, "zalo", /^(zl):/i); +} + +function resolveZaloLikeSession( + params: ResolveOutboundSessionRouteParams, + channel: "zalo" | "zalouser", + aliasPrefix: RegExp, +): OutboundSessionRoute | null { + const trimmed = stripProviderPrefix(params.target, channel).replace(aliasPrefix, "").trim(); + if (!trimmed) { + return null; + } + const isGroup = trimmed.toLowerCase().startsWith("group:"); + const peerId = stripKindPrefix(trimmed); + const peer: RoutePeer = { kind: isGroup ? "group" : "direct", id: peerId }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel, + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: isGroup ? "group" : "direct", + from: isGroup ? `${channel}:group:${peerId}` : `${channel}:${peerId}`, + to: `${channel}:${peerId}`, + }; +} + +function resolveZalouserSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + // Keep DM vs group aligned with inbound sessions for Zalo Personal. + return resolveZaloLikeSession(params, "zalouser", /^(zlu):/i); +} + +function resolveNostrSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + const trimmed = stripProviderPrefix(params.target, "nostr").trim(); + if (!trimmed) { + return null; + } + const peer: RoutePeer = { kind: "direct", id: trimmed }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "nostr", + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: "direct", + from: `nostr:${trimmed}`, + to: `nostr:${trimmed}`, + }; +} + +function normalizeTlonShip(raw: string): string { + const trimmed = raw.trim(); + if (!trimmed) { + return trimmed; + } + return trimmed.startsWith("~") ? trimmed : `~${trimmed}`; +} + +function resolveTlonSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + let trimmed = stripProviderPrefix(params.target, "tlon"); + trimmed = trimmed.trim(); + if (!trimmed) { + return null; + } + const lower = trimmed.toLowerCase(); + let isGroup = + lower.startsWith("group:") || lower.startsWith("room:") || lower.startsWith("chat/"); + let peerId = trimmed; + if (lower.startsWith("group:") || lower.startsWith("room:")) { + peerId = trimmed.replace(/^(group|room):/i, "").trim(); + if (!peerId.startsWith("chat/")) { + const parts = peerId.split("/").filter(Boolean); + if (parts.length === 2) { + peerId = `chat/${normalizeTlonShip(parts[0])}/${parts[1]}`; + } + } + isGroup = true; + } else if (lower.startsWith("dm:")) { + peerId = normalizeTlonShip(trimmed.slice("dm:".length)); + isGroup = false; + } else if (lower.startsWith("chat/")) { + peerId = trimmed; + isGroup = true; + } else if (trimmed.includes("/")) { + const parts = trimmed.split("/").filter(Boolean); + if (parts.length === 2) { + peerId = `chat/${normalizeTlonShip(parts[0])}/${parts[1]}`; + isGroup = true; + } + } else { + peerId = normalizeTlonShip(trimmed); + } + + const peer: RoutePeer = { kind: isGroup ? "group" : "direct", id: peerId }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "tlon", + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: isGroup ? "group" : "direct", + from: isGroup ? `tlon:group:${peerId}` : `tlon:${peerId}`, + to: `tlon:${peerId}`, + }; +} + +/** + * Feishu ID formats: + * - oc_xxx: chat_id (can be group or DM, use chat_mode to distinguish or explicit dm:/group: prefix) + * - ou_xxx: user open_id (DM) + * - on_xxx: user union_id (DM) + * - cli_xxx: app_id (not a valid send target) + */ +function resolveFeishuSession( + params: ResolveOutboundSessionRouteParams, +): OutboundSessionRoute | null { + let trimmed = stripProviderPrefix(params.target, "feishu"); + trimmed = stripProviderPrefix(trimmed, "lark").trim(); + if (!trimmed) { + return null; + } + + const lower = trimmed.toLowerCase(); + let isGroup = false; + let typeExplicit = false; + + if (lower.startsWith("group:") || lower.startsWith("chat:")) { + trimmed = trimmed.replace(/^(group|chat):/i, "").trim(); + isGroup = true; + typeExplicit = true; + } else if (lower.startsWith("user:") || lower.startsWith("dm:")) { + trimmed = trimmed.replace(/^(user|dm):/i, "").trim(); + isGroup = false; + typeExplicit = true; + } + + const idLower = trimmed.toLowerCase(); + // Only infer type from ID prefix if not explicitly specified + // Note: oc_ is a chat_id and can be either group or DM (must check chat_mode from API) + // Only ou_/on_ can be reliably identified as user IDs (always DM) + if (!typeExplicit) { + if (idLower.startsWith("ou_") || idLower.startsWith("on_")) { + isGroup = false; + } + // oc_ requires explicit prefix: dm:oc_xxx or group:oc_xxx + } + + const peer: RoutePeer = { + kind: isGroup ? "group" : "direct", + id: trimmed, + }; + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: "feishu", + accountId: params.accountId, + peer, + }); + return { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: isGroup ? "group" : "direct", + from: isGroup ? `feishu:group:${trimmed}` : `feishu:${trimmed}`, + to: trimmed, + }; } function resolveFallbackSession( @@ -115,6 +924,29 @@ function resolveFallbackSession( }; } +type OutboundSessionResolver = ( + params: ResolveOutboundSessionRouteParams, +) => OutboundSessionRoute | null | Promise; + +const OUTBOUND_SESSION_RESOLVERS: Partial> = { + slack: resolveSlackSession, + discord: resolveDiscordSession, + telegram: resolveTelegramSession, + whatsapp: resolveWhatsAppSession, + signal: resolveSignalSession, + imessage: resolveIMessageSession, + matrix: resolveMatrixSession, + msteams: resolveMSTeamsSession, + mattermost: resolveMattermostSession, + bluebubbles: resolveBlueBubblesSession, + "nextcloud-talk": resolveNextcloudTalkSession, + zalo: resolveZaloSession, + zalouser: resolveZalouserSession, + nostr: resolveNostrSession, + tlon: resolveTlonSession, + feishu: resolveFeishuSession, +}; + export async function resolveOutboundSessionRoute( params: ResolveOutboundSessionRouteParams, ): Promise { @@ -123,21 +955,11 @@ export async function resolveOutboundSessionRoute( return null; } const nextParams = { ...params, target }; - const pluginRoute = await getChannelPlugin( - params.channel, - )?.messaging?.resolveOutboundSessionRoute?.({ - cfg: nextParams.cfg, - agentId: nextParams.agentId, - accountId: nextParams.accountId, - target, - resolvedTarget: nextParams.resolvedTarget, - replyToId: nextParams.replyToId, - threadId: nextParams.threadId, - }); - if (pluginRoute) { - return pluginRoute; + const resolver = OUTBOUND_SESSION_RESOLVERS[params.channel]; + if (!resolver) { + return resolveFallbackSession(nextParams); } - return resolveFallbackSession(nextParams); + return await resolver(nextParams); } export async function ensureOutboundSessionEntry(params: { diff --git a/src/infra/outbound/outbound.test.ts b/src/infra/outbound/outbound.test.ts index 7dcdab184ed..f90fc7f221e 100644 --- a/src/infra/outbound/outbound.test.ts +++ b/src/infra/outbound/outbound.test.ts @@ -1196,6 +1196,30 @@ describe("resolveOutboundSessionRoute", () => { chatType: "direct", }, }, + { + name: "Slack user DM target", + cfg: perChannelPeerCfg, + channel: "slack", + target: "user:U12345ABC", + expected: { + sessionKey: "agent:main:slack:direct:u12345abc", + from: "slack:U12345ABC", + to: "user:U12345ABC", + chatType: "direct", + }, + }, + { + name: "Slack channel target without thread", + cfg: baseConfig, + channel: "slack", + target: "channel:C999XYZ", + expected: { + sessionKey: "agent:main:slack:channel:c999xyz", + from: "slack:channel:C999XYZ", + to: "channel:C999XYZ", + chatType: "channel", + }, + }, ]; for (const testCase of cases) {