mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-19 14:00:51 +00:00
fix: persist outbound sends and skip stale cron deliveries (#50092)
* fix(bluebubbles): auto-create chats for new numbers, persist outbound messages to session transcripts
Two fixes for BlueBubbles message tool behavior:
1. **Attachment sends to new phone numbers**: sendBlueBubblesAttachment now
auto-creates a new DM chat (via /api/v1/chat/new) when no existing chat
is found for a handle target, matching the behavior already present in
sendMessageBlueBubbles for text sends. The existing createNewChatWithMessage
is refactored into a reusable createChatForHandle that returns the chatGuid.
2. **Outbound message session persistence**: Ensures outbound messages sent
via the message tool are reliably tracked in session transcripts:
- ensureOutboundSessionEntry now falls back to directly creating a session
store entry when recordSessionMetaFromInbound returns null, guaranteeing
a sessionId exists for the subsequent mirror append.
- appendAssistantMessageToSessionTranscript now normalizes the session key
(lowercased) when looking up the store, preventing case mismatches
between the store keys and the mirror sessionKey.
Tests added for all changes.
* test(slack): verify outbound session tracking and new target sends for Slack
The shared infrastructure changes from the BlueBubbles fix (session key
normalization in transcript.ts and fallback session entry creation in
outbound-session.ts) already cover Slack. Slack's sendMessageSlack uses
conversations.open to auto-create DM channels for new user targets.
Add tests confirming:
- Slack user DM and channel session route resolution (outbound.test.ts)
- Slack session key normalization for transcript append (sessions.test.ts)
- Slack outbound sendText/sendMedia to new user and channel targets (channel.test.ts)
* fix(cron): skip stale delayed deliveries
* fix: prep PR #50092
This commit is contained in:
@@ -243,6 +243,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Auth/Codex CLI reuse: sync reused Codex CLI credentials into the supported `openai-codex:default` OAuth profile instead of reviving the deprecated `openai-codex:codex-cli` slot, so doctor cleanup no longer loops. (#45353) thanks @Gugu-sugar.
|
||||
- Deps/audit: bump the pinned `fast-xml-parser` override to the first patched release so `pnpm audit --prod --audit-level=high` no longer fails on the AWS Bedrock XML builder path. Thanks @vincentkoc.
|
||||
- Hooks/after_compaction: forward `sessionFile` for direct/manual compaction events and add `sessionFile` plus `sessionKey` to wired auto-compaction hook context so plugins receive the session metadata already declared in the hook types. (#40781) Thanks @jarimustonen.
|
||||
- Sessions/BlueBubbles/cron: persist outbound session routing and transcript mirroring for new targets, auto-create BlueBubbles chats before attachment sends, and only suppress isolated cron deliveries when the run started hours late instead of merely finishing late. (#50092)
|
||||
|
||||
### Breaking
|
||||
|
||||
|
||||
@@ -484,4 +484,94 @@ describe("sendBlueBubblesAttachment", () => {
|
||||
expect(bodyText).not.toContain('name="selectedMessageGuid"');
|
||||
expect(bodyText).not.toContain('name="partIndex"');
|
||||
});
|
||||
|
||||
it("auto-creates a new chat when sending to a phone number with no existing chat", async () => {
|
||||
// First call: resolveChatGuidForTarget queries chats, returns empty (no match)
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: () => Promise.resolve({ data: [] }),
|
||||
});
|
||||
// Second call: createChatForHandle creates new chat
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
text: () =>
|
||||
Promise.resolve(
|
||||
JSON.stringify({
|
||||
data: { chatGuid: "iMessage;-;+15559876543", guid: "iMessage;-;+15559876543" },
|
||||
}),
|
||||
),
|
||||
});
|
||||
// Third call: actual attachment send
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
text: () => Promise.resolve(JSON.stringify({ data: { guid: "attach-msg-1" } })),
|
||||
});
|
||||
|
||||
const result = await sendBlueBubblesAttachment({
|
||||
to: "+15559876543",
|
||||
buffer: new Uint8Array([1, 2, 3]),
|
||||
filename: "photo.jpg",
|
||||
contentType: "image/jpeg",
|
||||
opts: { serverUrl: "http://localhost:1234", password: "test" },
|
||||
});
|
||||
|
||||
expect(result.messageId).toBe("attach-msg-1");
|
||||
// Verify chat creation was called
|
||||
const createCallBody = JSON.parse(mockFetch.mock.calls[1][1].body);
|
||||
expect(createCallBody.addresses).toEqual(["+15559876543"]);
|
||||
// Verify attachment was sent to the newly created chat
|
||||
const attachBody = mockFetch.mock.calls[2][1]?.body as Uint8Array;
|
||||
const attachText = decodeBody(attachBody);
|
||||
expect(attachText).toContain("iMessage;-;+15559876543");
|
||||
});
|
||||
|
||||
it("retries chatGuid resolution after creating a chat with no returned guid", async () => {
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: () => Promise.resolve({ data: [] }),
|
||||
});
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
text: () => Promise.resolve(JSON.stringify({ data: {} })),
|
||||
});
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: () => Promise.resolve({ data: [{ guid: "iMessage;-;+15557654321" }] }),
|
||||
});
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
text: () => Promise.resolve(JSON.stringify({ data: { guid: "attach-msg-2" } })),
|
||||
});
|
||||
|
||||
const result = await sendBlueBubblesAttachment({
|
||||
to: "+15557654321",
|
||||
buffer: new Uint8Array([4, 5, 6]),
|
||||
filename: "photo.jpg",
|
||||
contentType: "image/jpeg",
|
||||
opts: { serverUrl: "http://localhost:1234", password: "test" },
|
||||
});
|
||||
|
||||
expect(result.messageId).toBe("attach-msg-2");
|
||||
const createCallBody = JSON.parse(mockFetch.mock.calls[1][1].body);
|
||||
expect(createCallBody.addresses).toEqual(["+15557654321"]);
|
||||
const attachBody = mockFetch.mock.calls[3][1]?.body as Uint8Array;
|
||||
const attachText = decodeBody(attachBody);
|
||||
expect(attachText).toContain("iMessage;-;+15557654321");
|
||||
});
|
||||
|
||||
it("still throws for non-handle targets when chatGuid is not found", async () => {
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: () => Promise.resolve({ data: [] }),
|
||||
});
|
||||
|
||||
await expect(
|
||||
sendBlueBubblesAttachment({
|
||||
to: "chat_id:999",
|
||||
buffer: new Uint8Array([1, 2, 3]),
|
||||
filename: "photo.jpg",
|
||||
opts: { serverUrl: "http://localhost:1234", password: "test" },
|
||||
}),
|
||||
).rejects.toThrow("chatGuid not found");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -10,7 +10,7 @@ import { resolveRequestUrl } from "./request-url.js";
|
||||
import type { OpenClawConfig } from "./runtime-api.js";
|
||||
import { getBlueBubblesRuntime, warnBlueBubbles } from "./runtime.js";
|
||||
import { extractBlueBubblesMessageId, resolveBlueBubblesSendTarget } from "./send-helpers.js";
|
||||
import { resolveChatGuidForTarget } from "./send.js";
|
||||
import { resolveChatGuidForTarget, createChatForHandle } from "./send.js";
|
||||
import {
|
||||
blueBubblesFetchWithTimeout,
|
||||
buildBlueBubblesApiUrl,
|
||||
@@ -180,16 +180,37 @@ export async function sendBlueBubblesAttachment(params: {
|
||||
}
|
||||
|
||||
const target = resolveBlueBubblesSendTarget(to);
|
||||
const chatGuid = await resolveChatGuidForTarget({
|
||||
let chatGuid = await resolveChatGuidForTarget({
|
||||
baseUrl,
|
||||
password,
|
||||
timeoutMs: opts.timeoutMs,
|
||||
target,
|
||||
});
|
||||
if (!chatGuid) {
|
||||
throw new Error(
|
||||
"BlueBubbles attachment send failed: chatGuid not found for target. Use a chat_guid target or ensure the chat exists.",
|
||||
);
|
||||
// For handle targets (phone numbers/emails), auto-create a new DM chat
|
||||
if (target.kind === "handle") {
|
||||
const created = await createChatForHandle({
|
||||
baseUrl,
|
||||
password,
|
||||
address: target.address,
|
||||
timeoutMs: opts.timeoutMs,
|
||||
});
|
||||
chatGuid = created.chatGuid;
|
||||
// If we still don't have a chatGuid, try resolving again (chat was created server-side)
|
||||
if (!chatGuid) {
|
||||
chatGuid = await resolveChatGuidForTarget({
|
||||
baseUrl,
|
||||
password,
|
||||
timeoutMs: opts.timeoutMs,
|
||||
target,
|
||||
});
|
||||
}
|
||||
}
|
||||
if (!chatGuid) {
|
||||
throw new Error(
|
||||
"BlueBubbles attachment send failed: chatGuid not found for target. Use a chat_guid target or ensure the chat exists.",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const url = buildBlueBubblesApiUrl({
|
||||
|
||||
@@ -3,7 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import "./test-mocks.js";
|
||||
import { getCachedBlueBubblesPrivateApiStatus } from "./probe.js";
|
||||
import { clearBlueBubblesRuntime, setBlueBubblesRuntime } from "./runtime.js";
|
||||
import { sendMessageBlueBubbles, resolveChatGuidForTarget } from "./send.js";
|
||||
import { sendMessageBlueBubbles, resolveChatGuidForTarget, createChatForHandle } from "./send.js";
|
||||
import {
|
||||
BLUE_BUBBLES_PRIVATE_API_STATUS,
|
||||
installBlueBubblesFetchTestHooks,
|
||||
@@ -781,4 +781,109 @@ describe("send", () => {
|
||||
expect(body.tempGuid.length).toBeGreaterThan(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("createChatForHandle", () => {
|
||||
it("creates a new chat and returns chatGuid from response", async () => {
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
text: () =>
|
||||
Promise.resolve(
|
||||
JSON.stringify({
|
||||
data: { guid: "iMessage;-;+15559876543", chatGuid: "iMessage;-;+15559876543" },
|
||||
}),
|
||||
),
|
||||
});
|
||||
|
||||
const result = await createChatForHandle({
|
||||
baseUrl: "http://localhost:1234",
|
||||
password: "test",
|
||||
address: "+15559876543",
|
||||
message: "Hello!",
|
||||
});
|
||||
|
||||
expect(result.chatGuid).toBe("iMessage;-;+15559876543");
|
||||
expect(result.messageId).toBeDefined();
|
||||
const body = JSON.parse(mockFetch.mock.calls[0][1].body);
|
||||
expect(body.addresses).toEqual(["+15559876543"]);
|
||||
expect(body.message).toBe("Hello!");
|
||||
});
|
||||
|
||||
it("creates a new chat without a message when message is omitted", async () => {
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
text: () =>
|
||||
Promise.resolve(
|
||||
JSON.stringify({
|
||||
data: { guid: "iMessage;-;+15559876543" },
|
||||
}),
|
||||
),
|
||||
});
|
||||
|
||||
const result = await createChatForHandle({
|
||||
baseUrl: "http://localhost:1234",
|
||||
password: "test",
|
||||
address: "+15559876543",
|
||||
});
|
||||
|
||||
expect(result.chatGuid).toBe("iMessage;-;+15559876543");
|
||||
const body = JSON.parse(mockFetch.mock.calls[0][1].body);
|
||||
expect(body.message).toBe("");
|
||||
});
|
||||
|
||||
it.each([
|
||||
["data.chatGuid", { data: { chatGuid: "shape-chat-guid" } }, "shape-chat-guid"],
|
||||
["data.guid", { data: { guid: "shape-guid" } }, "shape-guid"],
|
||||
[
|
||||
"data.chats[0].guid",
|
||||
{ data: { chats: [{ guid: "shape-array-guid" }] } },
|
||||
"shape-array-guid",
|
||||
],
|
||||
["data.chat.guid", { data: { chat: { guid: "shape-object-guid" } } }, "shape-object-guid"],
|
||||
])("extracts chatGuid from %s", async (_label, responseBody, expectedChatGuid) => {
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
text: () => Promise.resolve(JSON.stringify(responseBody)),
|
||||
});
|
||||
|
||||
const result = await createChatForHandle({
|
||||
baseUrl: "http://localhost:1234",
|
||||
password: "test",
|
||||
address: "+15559876543",
|
||||
});
|
||||
|
||||
expect(result.chatGuid).toBe(expectedChatGuid);
|
||||
});
|
||||
|
||||
it("throws when Private API is not enabled", async () => {
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: false,
|
||||
status: 403,
|
||||
text: () => Promise.resolve("Private API not enabled"),
|
||||
});
|
||||
|
||||
await expect(
|
||||
createChatForHandle({
|
||||
baseUrl: "http://localhost:1234",
|
||||
password: "test",
|
||||
address: "+15559876543",
|
||||
}),
|
||||
).rejects.toThrow("Private API must be enabled");
|
||||
});
|
||||
|
||||
it("returns null chatGuid when response has no chat data", async () => {
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
text: () => Promise.resolve(JSON.stringify({ data: {} })),
|
||||
});
|
||||
|
||||
const result = await createChatForHandle({
|
||||
baseUrl: "http://localhost:1234",
|
||||
password: "test",
|
||||
address: "+15559876543",
|
||||
message: "Hello",
|
||||
});
|
||||
|
||||
expect(result.chatGuid).toBeNull();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -312,16 +312,20 @@ export async function resolveChatGuidForTarget(params: {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new chat (DM) and optionally sends an initial message.
|
||||
* Creates a new DM chat for the given address and returns the chat GUID.
|
||||
* Requires Private API to be enabled in BlueBubbles.
|
||||
*
|
||||
* If a `message` is provided it is sent as the initial message in the new chat;
|
||||
* otherwise an empty-string message body is used (BlueBubbles still creates the
|
||||
* chat but will not deliver a visible bubble).
|
||||
*/
|
||||
async function createNewChatWithMessage(params: {
|
||||
export async function createChatForHandle(params: {
|
||||
baseUrl: string;
|
||||
password: string;
|
||||
address: string;
|
||||
message: string;
|
||||
message?: string;
|
||||
timeoutMs?: number;
|
||||
}): Promise<BlueBubblesSendResult> {
|
||||
}): Promise<{ chatGuid: string | null; messageId: string }> {
|
||||
const url = buildBlueBubblesApiUrl({
|
||||
baseUrl: params.baseUrl,
|
||||
path: "/api/v1/chat/new",
|
||||
@@ -329,7 +333,7 @@ async function createNewChatWithMessage(params: {
|
||||
});
|
||||
const payload = {
|
||||
addresses: [params.address],
|
||||
message: params.message,
|
||||
message: params.message ?? "",
|
||||
tempGuid: `temp-${crypto.randomUUID()}`,
|
||||
};
|
||||
const res = await blueBubblesFetchWithTimeout(
|
||||
@@ -343,7 +347,6 @@ async function createNewChatWithMessage(params: {
|
||||
);
|
||||
if (!res.ok) {
|
||||
const errorText = await res.text();
|
||||
// Check for Private API not enabled error
|
||||
if (
|
||||
res.status === 400 ||
|
||||
res.status === 403 ||
|
||||
@@ -355,7 +358,64 @@ async function createNewChatWithMessage(params: {
|
||||
}
|
||||
throw new Error(`BlueBubbles create chat failed (${res.status}): ${errorText || "unknown"}`);
|
||||
}
|
||||
return parseBlueBubblesMessageResponse(res);
|
||||
const body = await res.text();
|
||||
let messageId = "ok";
|
||||
let chatGuid: string | null = null;
|
||||
if (body) {
|
||||
try {
|
||||
const parsed = JSON.parse(body) as Record<string, unknown>;
|
||||
messageId = extractBlueBubblesMessageId(parsed);
|
||||
// Extract chatGuid from the response data
|
||||
const data = parsed.data as Record<string, unknown> | undefined;
|
||||
if (data) {
|
||||
chatGuid =
|
||||
(typeof data.chatGuid === "string" && data.chatGuid) ||
|
||||
(typeof data.guid === "string" && data.guid) ||
|
||||
null;
|
||||
// Also try nested chats array (some BB versions nest it)
|
||||
if (!chatGuid) {
|
||||
const chats = data.chats ?? data.chat;
|
||||
if (Array.isArray(chats) && chats.length > 0) {
|
||||
const first = chats[0] as Record<string, unknown> | undefined;
|
||||
chatGuid =
|
||||
(typeof first?.guid === "string" && first.guid) ||
|
||||
(typeof first?.chatGuid === "string" && first.chatGuid) ||
|
||||
null;
|
||||
} else if (chats && typeof chats === "object" && !Array.isArray(chats)) {
|
||||
const chatObj = chats as Record<string, unknown>;
|
||||
chatGuid =
|
||||
(typeof chatObj.guid === "string" && chatObj.guid) ||
|
||||
(typeof chatObj.chatGuid === "string" && chatObj.chatGuid) ||
|
||||
null;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// ignore parse errors
|
||||
}
|
||||
}
|
||||
return { chatGuid, messageId };
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new chat (DM) and sends an initial message.
|
||||
* Requires Private API to be enabled in BlueBubbles.
|
||||
*/
|
||||
async function createNewChatWithMessage(params: {
|
||||
baseUrl: string;
|
||||
password: string;
|
||||
address: string;
|
||||
message: string;
|
||||
timeoutMs?: number;
|
||||
}): Promise<BlueBubblesSendResult> {
|
||||
const result = await createChatForHandle({
|
||||
baseUrl: params.baseUrl,
|
||||
password: params.password,
|
||||
address: params.address,
|
||||
message: params.message,
|
||||
timeoutMs: params.timeoutMs,
|
||||
});
|
||||
return { messageId: result.messageId };
|
||||
}
|
||||
|
||||
export async function sendMessageBlueBubbles(
|
||||
|
||||
@@ -308,6 +308,84 @@ describe("slackPlugin agentPrompt", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("slackPlugin outbound new targets", () => {
|
||||
const cfg = {
|
||||
channels: {
|
||||
slack: {
|
||||
botToken: "xoxb-test",
|
||||
appToken: "xapp-test",
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
it("sends to a new user target via DM without erroring", async () => {
|
||||
const sendSlack = vi.fn().mockResolvedValue({ messageId: "m-new-user", channelId: "D999" });
|
||||
const sendText = slackPlugin.outbound?.sendText;
|
||||
expect(sendText).toBeDefined();
|
||||
|
||||
const result = await sendText!({
|
||||
cfg,
|
||||
to: "user:U99NEW",
|
||||
text: "hello new user",
|
||||
accountId: "default",
|
||||
deps: { sendSlack },
|
||||
});
|
||||
|
||||
expect(sendSlack).toHaveBeenCalledWith(
|
||||
"user:U99NEW",
|
||||
"hello new user",
|
||||
expect.objectContaining({ cfg }),
|
||||
);
|
||||
expect(result).toEqual({ channel: "slack", messageId: "m-new-user", channelId: "D999" });
|
||||
});
|
||||
|
||||
it("sends to a new channel target without erroring", async () => {
|
||||
const sendSlack = vi.fn().mockResolvedValue({ messageId: "m-new-chan", channelId: "C555" });
|
||||
const sendText = slackPlugin.outbound?.sendText;
|
||||
expect(sendText).toBeDefined();
|
||||
|
||||
const result = await sendText!({
|
||||
cfg,
|
||||
to: "channel:C555NEW",
|
||||
text: "hello channel",
|
||||
accountId: "default",
|
||||
deps: { sendSlack },
|
||||
});
|
||||
|
||||
expect(sendSlack).toHaveBeenCalledWith(
|
||||
"channel:C555NEW",
|
||||
"hello channel",
|
||||
expect.objectContaining({ cfg }),
|
||||
);
|
||||
expect(result).toEqual({ channel: "slack", messageId: "m-new-chan", channelId: "C555" });
|
||||
});
|
||||
|
||||
it("sends media to a new user target without erroring", async () => {
|
||||
const sendSlack = vi.fn().mockResolvedValue({ messageId: "m-new-media", channelId: "D888" });
|
||||
const sendMedia = slackPlugin.outbound?.sendMedia;
|
||||
expect(sendMedia).toBeDefined();
|
||||
|
||||
const result = await sendMedia!({
|
||||
cfg,
|
||||
to: "user:U88NEW",
|
||||
text: "here is a file",
|
||||
mediaUrl: "https://example.com/file.png",
|
||||
accountId: "default",
|
||||
deps: { sendSlack },
|
||||
});
|
||||
|
||||
expect(sendSlack).toHaveBeenCalledWith(
|
||||
"user:U88NEW",
|
||||
"here is a file",
|
||||
expect.objectContaining({
|
||||
cfg,
|
||||
mediaUrl: "https://example.com/file.png",
|
||||
}),
|
||||
);
|
||||
expect(result).toEqual({ channel: "slack", messageId: "m-new-media", channelId: "D888" });
|
||||
});
|
||||
});
|
||||
|
||||
describe("slackPlugin config", () => {
|
||||
it("treats HTTP mode accounts with bot token + signing secret as configured", async () => {
|
||||
const cfg: OpenClawConfig = {
|
||||
|
||||
@@ -425,6 +425,52 @@ describe("appendAssistantMessageToSessionTranscript", () => {
|
||||
expect(messageLine.message.content[0].text).toBe("Hello from delivery mirror!");
|
||||
});
|
||||
|
||||
it("finds session entry using normalized (lowercased) key", async () => {
|
||||
const sessionId = "test-session-normalized";
|
||||
// Store key is lowercase (as written by updateSessionStore/normalizeStoreSessionKey)
|
||||
const storeKey = "agent:main:bluebubbles:direct:+15551234567";
|
||||
const store = {
|
||||
[storeKey]: {
|
||||
sessionId,
|
||||
chatType: "direct",
|
||||
channel: "bluebubbles",
|
||||
},
|
||||
};
|
||||
fs.writeFileSync(fixture.storePath(), JSON.stringify(store), "utf-8");
|
||||
|
||||
// Pass a mixed-case key — append should still find the entry via normalization
|
||||
const result = await appendAssistantMessageToSessionTranscript({
|
||||
sessionKey: "agent:main:BlueBubbles:direct:+15551234567",
|
||||
text: "Hello normalized!",
|
||||
storePath: fixture.storePath(),
|
||||
});
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("finds Slack session entry using normalized (lowercased) key", async () => {
|
||||
const sessionId = "test-slack-session";
|
||||
// Slack session keys include channel type and target ID; store key is lowercase
|
||||
const storeKey = "agent:main:slack:direct:u12345abc";
|
||||
const store = {
|
||||
[storeKey]: {
|
||||
sessionId,
|
||||
chatType: "direct",
|
||||
channel: "slack",
|
||||
},
|
||||
};
|
||||
fs.writeFileSync(fixture.storePath(), JSON.stringify(store), "utf-8");
|
||||
|
||||
// Pass a mixed-case key (as resolveSlackSession might produce) — normalization should match
|
||||
const result = await appendAssistantMessageToSessionTranscript({
|
||||
sessionKey: "agent:main:slack:direct:U12345ABC",
|
||||
text: "Hello Slack user!",
|
||||
storePath: fixture.storePath(),
|
||||
});
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("ignores malformed transcript lines when checking mirror idempotency", async () => {
|
||||
writeTranscriptStore();
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ import {
|
||||
resolveSessionTranscriptPath,
|
||||
} from "./paths.js";
|
||||
import { resolveAndPersistSessionFile } from "./session-file.js";
|
||||
import { loadSessionStore } from "./store.js";
|
||||
import { loadSessionStore, normalizeStoreSessionKey } from "./store.js";
|
||||
import type { SessionEntry } from "./types.js";
|
||||
|
||||
function stripQuery(value: string): string {
|
||||
@@ -154,7 +154,8 @@ export async function appendAssistantMessageToSessionTranscript(params: {
|
||||
|
||||
const storePath = params.storePath ?? resolveDefaultSessionStorePath(params.agentId);
|
||||
const store = loadSessionStore(storePath, { skipCache: true });
|
||||
const entry = store[sessionKey] as SessionEntry | undefined;
|
||||
const normalizedKey = normalizeStoreSessionKey(sessionKey);
|
||||
const entry = (store[normalizedKey] ?? store[sessionKey]) as SessionEntry | undefined;
|
||||
if (!entry?.sessionId) {
|
||||
return { ok: false, reason: `unknown sessionKey: ${sessionKey}` };
|
||||
}
|
||||
|
||||
@@ -143,6 +143,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
vi.unstubAllEnvs();
|
||||
});
|
||||
|
||||
@@ -255,6 +256,59 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("skips stale cron deliveries while still suppressing fallback main summary", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-18T17:00:00.000Z"));
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
|
||||
const params = makeBaseParams({ synthesizedText: "Yesterday's morning briefing." });
|
||||
(params.job as { state?: { nextRunAtMs?: number } }).state = {
|
||||
nextRunAtMs: Date.now() - (3 * 60 * 60_000 + 1),
|
||||
};
|
||||
|
||||
const state = await dispatchCronDelivery(params);
|
||||
|
||||
expect(state.result).toEqual(
|
||||
expect.objectContaining({
|
||||
status: "ok",
|
||||
delivered: false,
|
||||
deliveryAttempted: true,
|
||||
}),
|
||||
);
|
||||
expect(deliverOutboundPayloads).not.toHaveBeenCalled();
|
||||
expect(
|
||||
shouldEnqueueCronMainSummary({
|
||||
summaryText: "Yesterday's morning briefing.",
|
||||
deliveryRequested: true,
|
||||
delivered: state.result?.delivered,
|
||||
deliveryAttempted: state.result?.deliveryAttempted,
|
||||
suppressMainSummary: false,
|
||||
isCronSystemEvent: () => true,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("still delivers when the run started on time but finished more than three hours later", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-18T17:00:00.000Z"));
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
|
||||
|
||||
const params = makeBaseParams({ synthesizedText: "Long running report finished." });
|
||||
params.runStartedAt = Date.now() - (3 * 60 * 60_000 + 1);
|
||||
(params.job as { state?: { nextRunAtMs?: number } }).state = {
|
||||
nextRunAtMs: params.runStartedAt,
|
||||
};
|
||||
|
||||
const state = await dispatchCronDelivery(params);
|
||||
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||
expect(state.delivered).toBe(true);
|
||||
expect(state.deliveryAttempted).toBe(true);
|
||||
});
|
||||
|
||||
it("text delivery fires exactly once (no double-deliver)", async () => {
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
|
||||
@@ -134,6 +134,8 @@ const PERMANENT_DIRECT_CRON_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [
|
||||
/outbound not configured for channel/i,
|
||||
];
|
||||
|
||||
const STALE_CRON_DELIVERY_MAX_START_DELAY_MS = 3 * 60 * 60_000;
|
||||
|
||||
type CompletedDirectCronDelivery = {
|
||||
ts: number;
|
||||
results: OutboundDeliveryResult[];
|
||||
@@ -174,6 +176,21 @@ function pruneCompletedDirectCronDeliveries(now: number) {
|
||||
}
|
||||
}
|
||||
|
||||
function resolveCronDeliveryScheduledAtMs(params: { job: CronJob; runStartedAt: number }): number {
|
||||
const scheduledAt = params.job.state?.nextRunAtMs;
|
||||
return typeof scheduledAt === "number" && Number.isFinite(scheduledAt)
|
||||
? scheduledAt
|
||||
: params.runStartedAt;
|
||||
}
|
||||
|
||||
function resolveCronDeliveryStartDelayMs(params: { job: CronJob; runStartedAt: number }): number {
|
||||
return params.runStartedAt - resolveCronDeliveryScheduledAtMs(params);
|
||||
}
|
||||
|
||||
function isStaleCronDelivery(params: { job: CronJob; runStartedAt: number }): boolean {
|
||||
return resolveCronDeliveryStartDelayMs(params) > STALE_CRON_DELIVERY_MAX_START_DELAY_MS;
|
||||
}
|
||||
|
||||
function rememberCompletedDirectCronDelivery(
|
||||
idempotencyKey: string,
|
||||
results: readonly OutboundDeliveryResult[],
|
||||
@@ -331,6 +348,35 @@ export async function dispatchCronDelivery(
|
||||
...params.telemetry,
|
||||
});
|
||||
}
|
||||
if (
|
||||
params.deliveryRequested &&
|
||||
isStaleCronDelivery({
|
||||
job: params.job,
|
||||
runStartedAt: params.runStartedAt,
|
||||
})
|
||||
) {
|
||||
deliveryAttempted = true;
|
||||
const nowMs = Date.now();
|
||||
const scheduledAtMs = resolveCronDeliveryScheduledAtMs({
|
||||
job: params.job,
|
||||
runStartedAt: params.runStartedAt,
|
||||
});
|
||||
const startDelayMs = resolveCronDeliveryStartDelayMs({
|
||||
job: params.job,
|
||||
runStartedAt: params.runStartedAt,
|
||||
});
|
||||
logWarn(
|
||||
`[cron:${params.job.id}] skipping stale delivery scheduled at ${new Date(scheduledAtMs).toISOString()}, started ${Math.round(startDelayMs / 60_000)}m late, current age ${Math.round((nowMs - scheduledAtMs) / 60_000)}m`,
|
||||
);
|
||||
return params.withRunSession({
|
||||
status: "ok",
|
||||
summary,
|
||||
outputText,
|
||||
deliveryAttempted,
|
||||
delivered: false,
|
||||
...params.telemetry,
|
||||
});
|
||||
}
|
||||
deliveryAttempted = true;
|
||||
const cachedResults = getCompletedDirectCronDelivery(deliveryIdempotencyKey);
|
||||
if (cachedResults) {
|
||||
|
||||
@@ -1,11 +1,31 @@
|
||||
import { parseDiscordTarget } from "../../../extensions/discord/src/targets.js";
|
||||
import {
|
||||
parseIMessageTarget,
|
||||
normalizeIMessageHandle,
|
||||
} from "../../../extensions/imessage/src/targets.js";
|
||||
import {
|
||||
looksLikeUuid,
|
||||
resolveSignalPeerId,
|
||||
resolveSignalRecipient,
|
||||
resolveSignalSender,
|
||||
} from "../../../extensions/signal/src/identity.js";
|
||||
import { resolveSlackAccount } from "../../../extensions/slack/src/accounts.js";
|
||||
import { createSlackWebClient } from "../../../extensions/slack/src/client.js";
|
||||
import { normalizeAllowListLower } from "../../../extensions/slack/src/monitor/allow-list.js";
|
||||
import { parseSlackTarget } from "../../../extensions/slack/src/targets.js";
|
||||
import { buildTelegramGroupPeerId } from "../../../extensions/telegram/src/bot/helpers.js";
|
||||
import { resolveTelegramTargetChatType } from "../../../extensions/telegram/src/inline-buttons.js";
|
||||
import { parseTelegramThreadId } from "../../../extensions/telegram/src/outbound-params.js";
|
||||
import { parseTelegramTarget } from "../../../extensions/telegram/src/targets.js";
|
||||
import type { MsgContext } from "../../auto-reply/templating.js";
|
||||
import type { ChatType } from "../../channels/chat-type.js";
|
||||
import { getChannelPlugin } from "../../channels/plugins/index.js";
|
||||
import type { ChannelId } from "../../channels/plugins/types.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { recordSessionMetaFromInbound, resolveStorePath } from "../../config/sessions.js";
|
||||
import type { RoutePeer } from "../../routing/resolve-route.js";
|
||||
import { buildOutboundBaseSessionKey } from "./base-session-key.js";
|
||||
import { buildAgentSessionKey, type RoutePeer } from "../../routing/resolve-route.js";
|
||||
import { resolveThreadSessionKeys } from "../../routing/session-key.js";
|
||||
import { isWhatsAppGroupJid, normalizeWhatsAppTarget } from "../../whatsapp/normalize.js";
|
||||
import type { ResolvedMessagingTarget } from "./target-resolver.js";
|
||||
|
||||
export type OutboundSessionRoute = {
|
||||
@@ -29,6 +49,23 @@ export type ResolveOutboundSessionRouteParams = {
|
||||
threadId?: string | number | null;
|
||||
};
|
||||
|
||||
// Cache Slack channel type lookups to avoid repeated API calls.
|
||||
const SLACK_CHANNEL_TYPE_CACHE = new Map<string, "channel" | "group" | "dm" | "unknown">();
|
||||
|
||||
function normalizeThreadId(value?: string | number | null): string | undefined {
|
||||
if (value == null) {
|
||||
return undefined;
|
||||
}
|
||||
if (typeof value === "number") {
|
||||
if (!Number.isFinite(value)) {
|
||||
return undefined;
|
||||
}
|
||||
return String(Math.trunc(value));
|
||||
}
|
||||
const trimmed = value.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
|
||||
function stripProviderPrefix(raw: string, channel: string): string {
|
||||
const trimmed = raw.trim();
|
||||
const lower = trimmed.toLowerCase();
|
||||
@@ -74,7 +111,779 @@ function buildBaseSessionKey(params: {
|
||||
accountId?: string | null;
|
||||
peer: RoutePeer;
|
||||
}): string {
|
||||
return buildOutboundBaseSessionKey(params);
|
||||
return buildAgentSessionKey({
|
||||
agentId: params.agentId,
|
||||
channel: params.channel,
|
||||
accountId: params.accountId,
|
||||
peer: params.peer,
|
||||
dmScope: params.cfg.session?.dmScope ?? "main",
|
||||
identityLinks: params.cfg.session?.identityLinks,
|
||||
});
|
||||
}
|
||||
|
||||
// Best-effort mpim detection: allowlist/config, then Slack API (if token available).
|
||||
async function resolveSlackChannelType(params: {
|
||||
cfg: OpenClawConfig;
|
||||
accountId?: string | null;
|
||||
channelId: string;
|
||||
}): Promise<"channel" | "group" | "dm" | "unknown"> {
|
||||
const channelId = params.channelId.trim();
|
||||
if (!channelId) {
|
||||
return "unknown";
|
||||
}
|
||||
const cached = SLACK_CHANNEL_TYPE_CACHE.get(`${params.accountId ?? "default"}:${channelId}`);
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
|
||||
const account = resolveSlackAccount({ cfg: params.cfg, accountId: params.accountId });
|
||||
const groupChannels = normalizeAllowListLower(account.dm?.groupChannels);
|
||||
const channelIdLower = channelId.toLowerCase();
|
||||
if (
|
||||
groupChannels.includes(channelIdLower) ||
|
||||
groupChannels.includes(`slack:${channelIdLower}`) ||
|
||||
groupChannels.includes(`channel:${channelIdLower}`) ||
|
||||
groupChannels.includes(`group:${channelIdLower}`) ||
|
||||
groupChannels.includes(`mpim:${channelIdLower}`)
|
||||
) {
|
||||
SLACK_CHANNEL_TYPE_CACHE.set(`${account.accountId}:${channelId}`, "group");
|
||||
return "group";
|
||||
}
|
||||
|
||||
const channelKeys = Object.keys(account.channels ?? {});
|
||||
if (
|
||||
channelKeys.some((key) => {
|
||||
const normalized = key.trim().toLowerCase();
|
||||
return (
|
||||
normalized === channelIdLower ||
|
||||
normalized === `channel:${channelIdLower}` ||
|
||||
normalized.replace(/^#/, "") === channelIdLower
|
||||
);
|
||||
})
|
||||
) {
|
||||
SLACK_CHANNEL_TYPE_CACHE.set(`${account.accountId}:${channelId}`, "channel");
|
||||
return "channel";
|
||||
}
|
||||
|
||||
const token = account.botToken?.trim() || account.userToken || "";
|
||||
if (!token) {
|
||||
SLACK_CHANNEL_TYPE_CACHE.set(`${account.accountId}:${channelId}`, "unknown");
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
try {
|
||||
const client = createSlackWebClient(token);
|
||||
const info = await client.conversations.info({ channel: channelId });
|
||||
const channel = info.channel as { is_im?: boolean; is_mpim?: boolean } | undefined;
|
||||
const type = channel?.is_im ? "dm" : channel?.is_mpim ? "group" : "channel";
|
||||
SLACK_CHANNEL_TYPE_CACHE.set(`${account.accountId}:${channelId}`, type);
|
||||
return type;
|
||||
} catch {
|
||||
SLACK_CHANNEL_TYPE_CACHE.set(`${account.accountId}:${channelId}`, "unknown");
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
async function resolveSlackSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): Promise<OutboundSessionRoute | null> {
|
||||
const parsed = parseSlackTarget(params.target, { defaultKind: "channel" });
|
||||
if (!parsed) {
|
||||
return null;
|
||||
}
|
||||
const isDm = parsed.kind === "user";
|
||||
let peerKind: ChatType = isDm ? "direct" : "channel";
|
||||
if (!isDm && /^G/i.test(parsed.id)) {
|
||||
// Slack mpim/group DMs share the G-prefix; detect to align session keys with inbound.
|
||||
const channelType = await resolveSlackChannelType({
|
||||
cfg: params.cfg,
|
||||
accountId: params.accountId,
|
||||
channelId: parsed.id,
|
||||
});
|
||||
if (channelType === "group") {
|
||||
peerKind = "group";
|
||||
}
|
||||
if (channelType === "dm") {
|
||||
peerKind = "direct";
|
||||
}
|
||||
}
|
||||
const peer: RoutePeer = {
|
||||
kind: peerKind,
|
||||
id: parsed.id,
|
||||
};
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "slack",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
const threadId = normalizeThreadId(params.threadId ?? params.replyToId);
|
||||
const threadKeys = resolveThreadSessionKeys({
|
||||
baseSessionKey,
|
||||
threadId,
|
||||
});
|
||||
return {
|
||||
sessionKey: threadKeys.sessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: peerKind === "direct" ? "direct" : "channel",
|
||||
from:
|
||||
peerKind === "direct"
|
||||
? `slack:${parsed.id}`
|
||||
: peerKind === "group"
|
||||
? `slack:group:${parsed.id}`
|
||||
: `slack:channel:${parsed.id}`,
|
||||
to: peerKind === "direct" ? `user:${parsed.id}` : `channel:${parsed.id}`,
|
||||
threadId,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveDiscordSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
const parsed = parseDiscordTarget(params.target, {
|
||||
defaultKind: resolveDiscordOutboundTargetKindHint(params),
|
||||
});
|
||||
if (!parsed) {
|
||||
return null;
|
||||
}
|
||||
const isDm = parsed.kind === "user";
|
||||
const peer: RoutePeer = {
|
||||
kind: isDm ? "direct" : "channel",
|
||||
id: parsed.id,
|
||||
};
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "discord",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
const explicitThreadId = normalizeThreadId(params.threadId);
|
||||
const threadCandidate = explicitThreadId ?? normalizeThreadId(params.replyToId);
|
||||
// Discord threads use their own channel id; avoid adding a :thread suffix.
|
||||
const threadKeys = resolveThreadSessionKeys({
|
||||
baseSessionKey,
|
||||
threadId: threadCandidate,
|
||||
useSuffix: false,
|
||||
});
|
||||
return {
|
||||
sessionKey: threadKeys.sessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isDm ? "direct" : "channel",
|
||||
from: isDm ? `discord:${parsed.id}` : `discord:channel:${parsed.id}`,
|
||||
to: isDm ? `user:${parsed.id}` : `channel:${parsed.id}`,
|
||||
threadId: explicitThreadId ?? undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveDiscordOutboundTargetKindHint(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): "user" | "channel" | undefined {
|
||||
const resolvedKind = params.resolvedTarget?.kind;
|
||||
if (resolvedKind === "user") {
|
||||
return "user";
|
||||
}
|
||||
if (resolvedKind === "group" || resolvedKind === "channel") {
|
||||
return "channel";
|
||||
}
|
||||
|
||||
const target = params.target.trim();
|
||||
if (/^channel:/i.test(target)) {
|
||||
return "channel";
|
||||
}
|
||||
if (/^(user:|discord:|@|<@!?)/i.test(target)) {
|
||||
return "user";
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function resolveTelegramSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
const parsed = parseTelegramTarget(params.target);
|
||||
const chatId = parsed.chatId.trim();
|
||||
if (!chatId) {
|
||||
return null;
|
||||
}
|
||||
const parsedThreadId = parsed.messageThreadId;
|
||||
const fallbackThreadId = normalizeThreadId(params.threadId);
|
||||
const resolvedThreadId = parsedThreadId ?? parseTelegramThreadId(fallbackThreadId);
|
||||
// Telegram topics are encoded in the peer id (chatId:topic:<id>).
|
||||
const chatType = resolveTelegramTargetChatType(params.target);
|
||||
// If the target is a username and we lack a resolvedTarget, default to DM to avoid group keys.
|
||||
const isGroup =
|
||||
chatType === "group" ||
|
||||
(chatType === "unknown" &&
|
||||
params.resolvedTarget?.kind &&
|
||||
params.resolvedTarget.kind !== "user");
|
||||
// For groups: include thread ID in peerId. For DMs: use simple chatId (thread handled via suffix).
|
||||
const peerId =
|
||||
isGroup && resolvedThreadId ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : chatId;
|
||||
const peer: RoutePeer = {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: peerId,
|
||||
};
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "telegram",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
// Use thread suffix for DM topics to match inbound session key format
|
||||
const threadKeys =
|
||||
resolvedThreadId && !isGroup
|
||||
? { sessionKey: `${baseSessionKey}:thread:${resolvedThreadId}` }
|
||||
: null;
|
||||
return {
|
||||
sessionKey: threadKeys?.sessionKey ?? baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isGroup ? "group" : "direct",
|
||||
from: isGroup
|
||||
? `telegram:group:${peerId}`
|
||||
: resolvedThreadId
|
||||
? `telegram:${chatId}:topic:${resolvedThreadId}`
|
||||
: `telegram:${chatId}`,
|
||||
to: `telegram:${chatId}`,
|
||||
threadId: resolvedThreadId,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveWhatsAppSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
const normalized = normalizeWhatsAppTarget(params.target);
|
||||
if (!normalized) {
|
||||
return null;
|
||||
}
|
||||
const isGroup = isWhatsAppGroupJid(normalized);
|
||||
const peer: RoutePeer = {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: normalized,
|
||||
};
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "whatsapp",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isGroup ? "group" : "direct",
|
||||
from: normalized,
|
||||
to: normalized,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveSignalSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
const stripped = stripProviderPrefix(params.target, "signal");
|
||||
const lowered = stripped.toLowerCase();
|
||||
if (lowered.startsWith("group:")) {
|
||||
const groupId = stripped.slice("group:".length).trim();
|
||||
if (!groupId) {
|
||||
return null;
|
||||
}
|
||||
const peer: RoutePeer = { kind: "group", id: groupId };
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "signal",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: "group",
|
||||
from: `group:${groupId}`,
|
||||
to: `group:${groupId}`,
|
||||
};
|
||||
}
|
||||
|
||||
let recipient = stripped.trim();
|
||||
if (lowered.startsWith("username:")) {
|
||||
recipient = stripped.slice("username:".length).trim();
|
||||
} else if (lowered.startsWith("u:")) {
|
||||
recipient = stripped.slice("u:".length).trim();
|
||||
}
|
||||
if (!recipient) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const uuidCandidate = recipient.toLowerCase().startsWith("uuid:")
|
||||
? recipient.slice("uuid:".length)
|
||||
: recipient;
|
||||
const sender = resolveSignalSender({
|
||||
sourceUuid: looksLikeUuid(uuidCandidate) ? uuidCandidate : null,
|
||||
sourceNumber: looksLikeUuid(uuidCandidate) ? null : recipient,
|
||||
});
|
||||
const peerId = sender ? resolveSignalPeerId(sender) : recipient;
|
||||
const displayRecipient = sender ? resolveSignalRecipient(sender) : recipient;
|
||||
const peer: RoutePeer = { kind: "direct", id: peerId };
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "signal",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: "direct",
|
||||
from: `signal:${displayRecipient}`,
|
||||
to: `signal:${displayRecipient}`,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveIMessageSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
const parsed = parseIMessageTarget(params.target);
|
||||
if (parsed.kind === "handle") {
|
||||
const handle = normalizeIMessageHandle(parsed.to);
|
||||
if (!handle) {
|
||||
return null;
|
||||
}
|
||||
const peer: RoutePeer = { kind: "direct", id: handle };
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "imessage",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: "direct",
|
||||
from: `imessage:${handle}`,
|
||||
to: `imessage:${handle}`,
|
||||
};
|
||||
}
|
||||
|
||||
const peerId =
|
||||
parsed.kind === "chat_id"
|
||||
? String(parsed.chatId)
|
||||
: parsed.kind === "chat_guid"
|
||||
? parsed.chatGuid
|
||||
: parsed.chatIdentifier;
|
||||
if (!peerId) {
|
||||
return null;
|
||||
}
|
||||
const peer: RoutePeer = { kind: "group", id: peerId };
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "imessage",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
const toPrefix =
|
||||
parsed.kind === "chat_id"
|
||||
? "chat_id"
|
||||
: parsed.kind === "chat_guid"
|
||||
? "chat_guid"
|
||||
: "chat_identifier";
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: "group",
|
||||
from: `imessage:group:${peerId}`,
|
||||
to: `${toPrefix}:${peerId}`,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveMatrixSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
const stripped = stripProviderPrefix(params.target, "matrix");
|
||||
const isUser =
|
||||
params.resolvedTarget?.kind === "user" || stripped.startsWith("@") || /^user:/i.test(stripped);
|
||||
const rawId = stripKindPrefix(stripped);
|
||||
if (!rawId) {
|
||||
return null;
|
||||
}
|
||||
const peer: RoutePeer = { kind: isUser ? "direct" : "channel", id: rawId };
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "matrix",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isUser ? "direct" : "channel",
|
||||
from: isUser ? `matrix:${rawId}` : `matrix:channel:${rawId}`,
|
||||
to: `room:${rawId}`,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveMSTeamsSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
let trimmed = params.target.trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
trimmed = trimmed.replace(/^(msteams|teams):/i, "").trim();
|
||||
|
||||
const lower = trimmed.toLowerCase();
|
||||
const isUser = lower.startsWith("user:");
|
||||
const rawId = stripKindPrefix(trimmed);
|
||||
if (!rawId) {
|
||||
return null;
|
||||
}
|
||||
const conversationId = rawId.split(";")[0] ?? rawId;
|
||||
const isChannel = !isUser && /@thread\.tacv2/i.test(conversationId);
|
||||
const peer: RoutePeer = {
|
||||
kind: isUser ? "direct" : isChannel ? "channel" : "group",
|
||||
id: conversationId,
|
||||
};
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "msteams",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isUser ? "direct" : isChannel ? "channel" : "group",
|
||||
from: isUser
|
||||
? `msteams:${conversationId}`
|
||||
: isChannel
|
||||
? `msteams:channel:${conversationId}`
|
||||
: `msteams:group:${conversationId}`,
|
||||
to: isUser ? `user:${conversationId}` : `conversation:${conversationId}`,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveMattermostSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
let trimmed = params.target.trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
trimmed = trimmed.replace(/^mattermost:/i, "").trim();
|
||||
const lower = trimmed.toLowerCase();
|
||||
const resolvedKind = params.resolvedTarget?.kind;
|
||||
const isUser =
|
||||
resolvedKind === "user" ||
|
||||
(resolvedKind !== "channel" &&
|
||||
resolvedKind !== "group" &&
|
||||
(lower.startsWith("user:") || trimmed.startsWith("@")));
|
||||
if (trimmed.startsWith("@")) {
|
||||
trimmed = trimmed.slice(1).trim();
|
||||
}
|
||||
const rawId = stripKindPrefix(trimmed);
|
||||
if (!rawId) {
|
||||
return null;
|
||||
}
|
||||
const peer: RoutePeer = { kind: isUser ? "direct" : "channel", id: rawId };
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "mattermost",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
const threadId = normalizeThreadId(params.replyToId ?? params.threadId);
|
||||
const threadKeys = resolveThreadSessionKeys({
|
||||
baseSessionKey,
|
||||
threadId,
|
||||
});
|
||||
return {
|
||||
sessionKey: threadKeys.sessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isUser ? "direct" : "channel",
|
||||
from: isUser ? `mattermost:${rawId}` : `mattermost:channel:${rawId}`,
|
||||
to: isUser ? `user:${rawId}` : `channel:${rawId}`,
|
||||
threadId,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveBlueBubblesSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
const stripped = stripProviderPrefix(params.target, "bluebubbles");
|
||||
const lower = stripped.toLowerCase();
|
||||
const isGroup =
|
||||
lower.startsWith("chat_id:") ||
|
||||
lower.startsWith("chat_guid:") ||
|
||||
lower.startsWith("chat_identifier:") ||
|
||||
lower.startsWith("group:");
|
||||
const rawPeerId = isGroup
|
||||
? stripKindPrefix(stripped)
|
||||
: stripped.replace(/^(imessage|sms|auto):/i, "");
|
||||
// BlueBubbles inbound group ids omit chat_* prefixes; strip them to align sessions.
|
||||
const peerId = isGroup
|
||||
? rawPeerId.replace(/^(chat_id|chat_guid|chat_identifier):/i, "")
|
||||
: rawPeerId;
|
||||
if (!peerId) {
|
||||
return null;
|
||||
}
|
||||
const peer: RoutePeer = {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: peerId,
|
||||
};
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "bluebubbles",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isGroup ? "group" : "direct",
|
||||
from: isGroup ? `group:${peerId}` : `bluebubbles:${peerId}`,
|
||||
to: `bluebubbles:${stripped}`,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveNextcloudTalkSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
let trimmed = params.target.trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
trimmed = trimmed.replace(/^(nextcloud-talk|nc-talk|nc):/i, "").trim();
|
||||
trimmed = trimmed.replace(/^room:/i, "").trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
const peer: RoutePeer = { kind: "group", id: trimmed };
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "nextcloud-talk",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: "group",
|
||||
from: `nextcloud-talk:room:${trimmed}`,
|
||||
to: `nextcloud-talk:${trimmed}`,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveZaloSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
return resolveZaloLikeSession(params, "zalo", /^(zl):/i);
|
||||
}
|
||||
|
||||
function resolveZaloLikeSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
channel: "zalo" | "zalouser",
|
||||
aliasPrefix: RegExp,
|
||||
): OutboundSessionRoute | null {
|
||||
const trimmed = stripProviderPrefix(params.target, channel).replace(aliasPrefix, "").trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
const isGroup = trimmed.toLowerCase().startsWith("group:");
|
||||
const peerId = stripKindPrefix(trimmed);
|
||||
const peer: RoutePeer = { kind: isGroup ? "group" : "direct", id: peerId };
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel,
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isGroup ? "group" : "direct",
|
||||
from: isGroup ? `${channel}:group:${peerId}` : `${channel}:${peerId}`,
|
||||
to: `${channel}:${peerId}`,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveZalouserSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
// Keep DM vs group aligned with inbound sessions for Zalo Personal.
|
||||
return resolveZaloLikeSession(params, "zalouser", /^(zlu):/i);
|
||||
}
|
||||
|
||||
function resolveNostrSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
const trimmed = stripProviderPrefix(params.target, "nostr").trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
const peer: RoutePeer = { kind: "direct", id: trimmed };
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "nostr",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: "direct",
|
||||
from: `nostr:${trimmed}`,
|
||||
to: `nostr:${trimmed}`,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeTlonShip(raw: string): string {
|
||||
const trimmed = raw.trim();
|
||||
if (!trimmed) {
|
||||
return trimmed;
|
||||
}
|
||||
return trimmed.startsWith("~") ? trimmed : `~${trimmed}`;
|
||||
}
|
||||
|
||||
function resolveTlonSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
let trimmed = stripProviderPrefix(params.target, "tlon");
|
||||
trimmed = trimmed.trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
const lower = trimmed.toLowerCase();
|
||||
let isGroup =
|
||||
lower.startsWith("group:") || lower.startsWith("room:") || lower.startsWith("chat/");
|
||||
let peerId = trimmed;
|
||||
if (lower.startsWith("group:") || lower.startsWith("room:")) {
|
||||
peerId = trimmed.replace(/^(group|room):/i, "").trim();
|
||||
if (!peerId.startsWith("chat/")) {
|
||||
const parts = peerId.split("/").filter(Boolean);
|
||||
if (parts.length === 2) {
|
||||
peerId = `chat/${normalizeTlonShip(parts[0])}/${parts[1]}`;
|
||||
}
|
||||
}
|
||||
isGroup = true;
|
||||
} else if (lower.startsWith("dm:")) {
|
||||
peerId = normalizeTlonShip(trimmed.slice("dm:".length));
|
||||
isGroup = false;
|
||||
} else if (lower.startsWith("chat/")) {
|
||||
peerId = trimmed;
|
||||
isGroup = true;
|
||||
} else if (trimmed.includes("/")) {
|
||||
const parts = trimmed.split("/").filter(Boolean);
|
||||
if (parts.length === 2) {
|
||||
peerId = `chat/${normalizeTlonShip(parts[0])}/${parts[1]}`;
|
||||
isGroup = true;
|
||||
}
|
||||
} else {
|
||||
peerId = normalizeTlonShip(trimmed);
|
||||
}
|
||||
|
||||
const peer: RoutePeer = { kind: isGroup ? "group" : "direct", id: peerId };
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "tlon",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isGroup ? "group" : "direct",
|
||||
from: isGroup ? `tlon:group:${peerId}` : `tlon:${peerId}`,
|
||||
to: `tlon:${peerId}`,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Feishu ID formats:
|
||||
* - oc_xxx: chat_id (can be group or DM, use chat_mode to distinguish or explicit dm:/group: prefix)
|
||||
* - ou_xxx: user open_id (DM)
|
||||
* - on_xxx: user union_id (DM)
|
||||
* - cli_xxx: app_id (not a valid send target)
|
||||
*/
|
||||
function resolveFeishuSession(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): OutboundSessionRoute | null {
|
||||
let trimmed = stripProviderPrefix(params.target, "feishu");
|
||||
trimmed = stripProviderPrefix(trimmed, "lark").trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const lower = trimmed.toLowerCase();
|
||||
let isGroup = false;
|
||||
let typeExplicit = false;
|
||||
|
||||
if (lower.startsWith("group:") || lower.startsWith("chat:")) {
|
||||
trimmed = trimmed.replace(/^(group|chat):/i, "").trim();
|
||||
isGroup = true;
|
||||
typeExplicit = true;
|
||||
} else if (lower.startsWith("user:") || lower.startsWith("dm:")) {
|
||||
trimmed = trimmed.replace(/^(user|dm):/i, "").trim();
|
||||
isGroup = false;
|
||||
typeExplicit = true;
|
||||
}
|
||||
|
||||
const idLower = trimmed.toLowerCase();
|
||||
// Only infer type from ID prefix if not explicitly specified
|
||||
// Note: oc_ is a chat_id and can be either group or DM (must check chat_mode from API)
|
||||
// Only ou_/on_ can be reliably identified as user IDs (always DM)
|
||||
if (!typeExplicit) {
|
||||
if (idLower.startsWith("ou_") || idLower.startsWith("on_")) {
|
||||
isGroup = false;
|
||||
}
|
||||
// oc_ requires explicit prefix: dm:oc_xxx or group:oc_xxx
|
||||
}
|
||||
|
||||
const peer: RoutePeer = {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: trimmed,
|
||||
};
|
||||
const baseSessionKey = buildBaseSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: "feishu",
|
||||
accountId: params.accountId,
|
||||
peer,
|
||||
});
|
||||
return {
|
||||
sessionKey: baseSessionKey,
|
||||
baseSessionKey,
|
||||
peer,
|
||||
chatType: isGroup ? "group" : "direct",
|
||||
from: isGroup ? `feishu:group:${trimmed}` : `feishu:${trimmed}`,
|
||||
to: trimmed,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveFallbackSession(
|
||||
@@ -115,6 +924,29 @@ function resolveFallbackSession(
|
||||
};
|
||||
}
|
||||
|
||||
type OutboundSessionResolver = (
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
) => OutboundSessionRoute | null | Promise<OutboundSessionRoute | null>;
|
||||
|
||||
const OUTBOUND_SESSION_RESOLVERS: Partial<Record<ChannelId, OutboundSessionResolver>> = {
|
||||
slack: resolveSlackSession,
|
||||
discord: resolveDiscordSession,
|
||||
telegram: resolveTelegramSession,
|
||||
whatsapp: resolveWhatsAppSession,
|
||||
signal: resolveSignalSession,
|
||||
imessage: resolveIMessageSession,
|
||||
matrix: resolveMatrixSession,
|
||||
msteams: resolveMSTeamsSession,
|
||||
mattermost: resolveMattermostSession,
|
||||
bluebubbles: resolveBlueBubblesSession,
|
||||
"nextcloud-talk": resolveNextcloudTalkSession,
|
||||
zalo: resolveZaloSession,
|
||||
zalouser: resolveZalouserSession,
|
||||
nostr: resolveNostrSession,
|
||||
tlon: resolveTlonSession,
|
||||
feishu: resolveFeishuSession,
|
||||
};
|
||||
|
||||
export async function resolveOutboundSessionRoute(
|
||||
params: ResolveOutboundSessionRouteParams,
|
||||
): Promise<OutboundSessionRoute | null> {
|
||||
@@ -123,21 +955,11 @@ export async function resolveOutboundSessionRoute(
|
||||
return null;
|
||||
}
|
||||
const nextParams = { ...params, target };
|
||||
const pluginRoute = await getChannelPlugin(
|
||||
params.channel,
|
||||
)?.messaging?.resolveOutboundSessionRoute?.({
|
||||
cfg: nextParams.cfg,
|
||||
agentId: nextParams.agentId,
|
||||
accountId: nextParams.accountId,
|
||||
target,
|
||||
resolvedTarget: nextParams.resolvedTarget,
|
||||
replyToId: nextParams.replyToId,
|
||||
threadId: nextParams.threadId,
|
||||
});
|
||||
if (pluginRoute) {
|
||||
return pluginRoute;
|
||||
const resolver = OUTBOUND_SESSION_RESOLVERS[params.channel];
|
||||
if (!resolver) {
|
||||
return resolveFallbackSession(nextParams);
|
||||
}
|
||||
return resolveFallbackSession(nextParams);
|
||||
return await resolver(nextParams);
|
||||
}
|
||||
|
||||
export async function ensureOutboundSessionEntry(params: {
|
||||
|
||||
@@ -1196,6 +1196,30 @@ describe("resolveOutboundSessionRoute", () => {
|
||||
chatType: "direct",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Slack user DM target",
|
||||
cfg: perChannelPeerCfg,
|
||||
channel: "slack",
|
||||
target: "user:U12345ABC",
|
||||
expected: {
|
||||
sessionKey: "agent:main:slack:direct:u12345abc",
|
||||
from: "slack:U12345ABC",
|
||||
to: "user:U12345ABC",
|
||||
chatType: "direct",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Slack channel target without thread",
|
||||
cfg: baseConfig,
|
||||
channel: "slack",
|
||||
target: "channel:C999XYZ",
|
||||
expected: {
|
||||
sessionKey: "agent:main:slack:channel:c999xyz",
|
||||
from: "slack:channel:C999XYZ",
|
||||
to: "channel:C999XYZ",
|
||||
chatType: "channel",
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
for (const testCase of cases) {
|
||||
|
||||
Reference in New Issue
Block a user