diff --git a/CHANGELOG.md b/CHANGELOG.md index b03cfb587f9..d84a2d14f33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ Docs: https://docs.openclaw.ai - CLI/media understanding: make `openclaw infer image describe --model ` execute the explicit image model instead of skipping description when that model supports native vision. - Usage/providers: keep plugin-owned usage auth enabled when manifest-declared provider auth env vars such as `MINIMAX_CODE_PLAN_KEY` are present, so `/usage` can resolve MiniMax billing credentials through the provider plugin. - Tlon/uploads: route both hosted Memex upload targets and custom-S3 presigned upload URLs through the shared SSRF guard so blocked private or loopback destinations fail before upload, while public upload URLs continue through the existing hosted upload flow. (#69794) Thanks @drobison00. +- Channels/thread routing: keep outbound replies in existing Slack, Mattermost, Matrix, Telegram, Discord, and QA-channel thread sessions by sharing the Plugin SDK thread-aware route builder across bundled plugins. ## 2026.4.20 diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 6a5f7e49f1f..0431cf58145 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -3a2cde4b15041b5456420b2052b572f9968a93690814d2cf924382fd2f54d1d3 plugin-sdk-api-baseline.json -38cd9086be93fc9531a8036812c197118c7830d52d40424be08dc9c6d51092e2 plugin-sdk-api-baseline.jsonl +d7f6e6ecdfb78c73760689af5a684c20ec7ca28509d4f63bf0d990a2d739c6ce plugin-sdk-api-baseline.json +584681e4436a4e84c2ff20196ff194a63915caf4dda70de9c27f34ab0d7bde0b plugin-sdk-api-baseline.jsonl diff --git a/docs/plugins/sdk-channel-plugins.md b/docs/plugins/sdk-channel-plugins.md index 49e428757f9..6f54b685bf0 100644 --- a/docs/plugins/sdk-channel-plugins.md +++ b/docs/plugins/sdk-channel-plugins.md @@ -176,6 +176,12 @@ surfaces: - `openclaw/plugin-sdk/outbound-media` and `openclaw/plugin-sdk/outbound-runtime` for media loading plus outbound identity/send delegates and payload planning +- `buildThreadAwareOutboundSessionRoute(...)` from + `openclaw/plugin-sdk/channel-core` when an outbound route should preserve an + explicit `replyToId`/`threadId` or recover the current `:thread:` session + after the base session key still matches. Provider plugins can override + precedence, suffix behavior, and thread id normalization when their platform + has native thread delivery semantics. - `openclaw/plugin-sdk/thread-bindings-runtime` for thread-binding lifecycle and adapter registration - `openclaw/plugin-sdk/agent-media-payload` only when a legacy agent/media diff --git a/extensions/discord/src/outbound-session-route.test.ts b/extensions/discord/src/outbound-session-route.test.ts new file mode 100644 index 00000000000..1d492163df6 --- /dev/null +++ b/extensions/discord/src/outbound-session-route.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, it } from "vitest"; +import { resolveDiscordOutboundSessionRoute } from "./outbound-session-route.js"; + +describe("resolveDiscordOutboundSessionRoute", () => { + it("keeps explicit delivery thread ids without adding a session suffix", () => { + const route = resolveDiscordOutboundSessionRoute({ + cfg: {}, + agentId: "main", + target: "channel:123", + threadId: "thread-1", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:discord:channel:123", + baseSessionKey: "agent:main:discord:channel:123", + threadId: "thread-1", + }); + }); + + it("does not promote replyToId into Discord delivery thread metadata", () => { + const route = resolveDiscordOutboundSessionRoute({ + cfg: {}, + agentId: "main", + target: "channel:123", + replyToId: "message-1", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:discord:channel:123", + baseSessionKey: "agent:main:discord:channel:123", + }); + expect(route?.threadId).toBeUndefined(); + }); +}); diff --git a/extensions/discord/src/outbound-session-route.ts b/extensions/discord/src/outbound-session-route.ts index 079425f624a..db9a02e404f 100644 --- a/extensions/discord/src/outbound-session-route.ts +++ b/extensions/discord/src/outbound-session-route.ts @@ -1,10 +1,6 @@ +import { buildThreadAwareOutboundSessionRoute } from "openclaw/plugin-sdk/channel-core"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; -import { - buildOutboundBaseSessionKey, - normalizeOutboundThreadId, - resolveThreadSessionKeys, - type RoutePeer, -} from "openclaw/plugin-sdk/routing"; +import { buildOutboundBaseSessionKey, type RoutePeer } from "openclaw/plugin-sdk/routing"; import { parseDiscordTarget } from "./target-parsing.js"; export type ResolveDiscordOutboundSessionRouteParams = { @@ -38,22 +34,19 @@ export function resolveDiscordOutboundSessionRoute( accountId: params.accountId, peer, }); - const explicitThreadId = normalizeOutboundThreadId(params.threadId); - const threadCandidate = explicitThreadId ?? normalizeOutboundThreadId(params.replyToId); - const threadKeys = resolveThreadSessionKeys({ - baseSessionKey, - threadId: threadCandidate, + return buildThreadAwareOutboundSessionRoute({ + route: { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: isDm ? ("direct" as const) : ("channel" as const), + from: isDm ? `discord:${parsed.id}` : `discord:channel:${parsed.id}`, + to: isDm ? `user:${parsed.id}` : `channel:${parsed.id}`, + }, + threadId: params.threadId, + precedence: ["threadId"], useSuffix: false, }); - return { - sessionKey: threadKeys.sessionKey, - baseSessionKey, - peer, - chatType: isDm ? ("direct" as const) : ("channel" as const), - from: isDm ? `discord:${parsed.id}` : `discord:channel:${parsed.id}`, - to: isDm ? `user:${parsed.id}` : `channel:${parsed.id}`, - threadId: explicitThreadId ?? undefined, - }; } function resolveDiscordOutboundTargetKindHint(params: { diff --git a/extensions/matrix/src/session-route.test.ts b/extensions/matrix/src/session-route.test.ts index 9b39617e551..30045046f2a 100644 --- a/extensions/matrix/src/session-route.test.ts +++ b/extensions/matrix/src/session-route.test.ts @@ -266,4 +266,71 @@ describe("resolveMatrixOutboundSessionRoute", () => { expectCurrentDmRoomRoute(route); }); + + it("recovers channel thread routes from currentSessionKey and preserves Matrix event-id case", () => { + const route = resolveMatrixOutboundSessionRoute({ + cfg: {}, + agentId: "main", + target: "room:!Ops:Example.Org", + currentSessionKey: "agent:main:matrix:channel:!ops:example.org:thread:$RootEvent:Example.Org", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:matrix:channel:!ops:example.org:thread:$RootEvent:Example.Org", + baseSessionKey: "agent:main:matrix:channel:!ops:example.org", + threadId: "$RootEvent:Example.Org", + }); + }); + + it("resolves per-room DM metadata from the base key when currentSessionKey has a thread suffix", () => { + const storedSession = createStoredDirectDmSession(); + const route = resolveUserRoute({ + cfg: createMatrixRouteConfig({ + [currentDmSessionKey]: storedSession, + }), + accountId: "ops", + target: "@alice:example.org", + }); + const threadedRoute = resolveMatrixOutboundSessionRoute({ + cfg: createMatrixRouteConfig({ + [route?.baseSessionKey ?? currentDmSessionKey]: storedSession, + }), + agentId: "main", + accountId: "ops", + target: "@alice:example.org", + resolvedTarget: { + to: "@alice:example.org", + kind: "user", + source: "normalized", + }, + currentSessionKey: `${route?.baseSessionKey}:thread:$DmRoot:Example.Org`, + }); + + expect(threadedRoute).toMatchObject({ + sessionKey: `${route?.baseSessionKey}:thread:$DmRoot:Example.Org`, + baseSessionKey: route?.baseSessionKey, + to: "room:!dm:example.org", + threadId: "$DmRoot:Example.Org", + }); + }); + + it('does not recover currentSessionKey threads for shared dmScope "main" DMs', () => { + const route = resolveMatrixOutboundSessionRoute({ + cfg: {}, + agentId: "main", + target: "@alice:example.org", + currentSessionKey: "agent:main:main:thread:$DmRoot:Example.Org", + resolvedTarget: { + to: "@alice:example.org", + kind: "user", + source: "normalized", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:main", + baseSessionKey: "agent:main:main", + }); + expect(route?.threadId).toBeUndefined(); + }); }); diff --git a/extensions/matrix/src/session-route.ts b/extensions/matrix/src/session-route.ts index cb40bf9e309..d1e5a9cc37c 100644 --- a/extensions/matrix/src/session-route.ts +++ b/extensions/matrix/src/session-route.ts @@ -1,6 +1,7 @@ import { normalizeAccountId } from "openclaw/plugin-sdk/account-id"; import { buildChannelOutboundSessionRoute, + buildThreadAwareOutboundSessionRoute, type ChannelOutboundSessionRouteParams, } from "openclaw/plugin-sdk/channel-core"; import { @@ -8,6 +9,7 @@ import { resolveSessionStoreEntry, resolveStorePath, } from "openclaw/plugin-sdk/config-runtime"; +import { parseThreadSessionSuffix } from "openclaw/plugin-sdk/routing"; import { resolveMatrixAccountConfig } from "./matrix/account-config.js"; import { resolveDefaultMatrixAccountId } from "./matrix/accounts.js"; import { resolveMatrixStoredSessionMeta } from "./matrix/session-store-metadata.js"; @@ -38,7 +40,9 @@ function resolveMatrixCurrentDmRoomId(params: { currentSessionKey?: string; targetUserId: string; }): string | undefined { - const sessionKey = params.currentSessionKey?.trim(); + const sessionKey = + parseThreadSessionSuffix(params.currentSessionKey).baseSessionKey ?? + params.currentSessionKey?.trim(); if (!sessionKey) { return undefined; } @@ -100,7 +104,7 @@ export function resolveMatrixOutboundSessionRoute(params: ChannelOutboundSession const from = target.kind === "user" ? `matrix:${target.id}` : `matrix:channel:${target.id}`; const to = `room:${roomScopedDmId ?? target.id}`; - return buildChannelOutboundSessionRoute({ + const baseRoute = buildChannelOutboundSessionRoute({ cfg: params.cfg, agentId: params.agentId, channel: "matrix", @@ -110,4 +114,13 @@ export function resolveMatrixOutboundSessionRoute(params: ChannelOutboundSession from, to, }); + return buildThreadAwareOutboundSessionRoute({ + route: baseRoute, + replyToId: params.replyToId, + threadId: params.threadId, + currentSessionKey: params.currentSessionKey, + normalizeThreadId: (threadId) => threadId, + canRecoverCurrentThread: ({ route }) => + route.peer.kind !== "direct" || (params.cfg.session?.dmScope ?? "main") !== "main", + }); } diff --git a/extensions/mattermost/src/session-route.test.ts b/extensions/mattermost/src/session-route.test.ts index 3ac96f7e179..c3eca662c38 100644 --- a/extensions/mattermost/src/session-route.test.ts +++ b/extensions/mattermost/src/session-route.test.ts @@ -41,6 +41,54 @@ describe("mattermost session route", () => { expect(route?.sessionKey).toContain("thread456"); }); + it("recovers channel thread routes from currentSessionKey", () => { + const route = resolveMattermostOutboundSessionRoute({ + cfg: {}, + agentId: "main", + accountId: "acct-1", + target: "mattermost:channel:chan123", + currentSessionKey: "agent:main:mattermost:channel:chan123:thread:root-post", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:mattermost:channel:chan123:thread:root-post", + baseSessionKey: "agent:main:mattermost:channel:chan123", + threadId: "root-post", + }); + }); + + it("keeps explicit replyToId ahead of recovered currentSessionKey thread", () => { + const route = resolveMattermostOutboundSessionRoute({ + cfg: {}, + agentId: "main", + accountId: "acct-1", + target: "mattermost:channel:chan123", + replyToId: "explicit-root", + currentSessionKey: "agent:main:mattermost:channel:chan123:thread:root-post", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:mattermost:channel:chan123:thread:explicit-root", + threadId: "explicit-root", + }); + }); + + it('does not recover currentSessionKey threads for shared dmScope "main" DMs', () => { + const route = resolveMattermostOutboundSessionRoute({ + cfg: {}, + agentId: "main", + accountId: "acct-1", + target: "@user123", + currentSessionKey: "agent:main:main:thread:root-post", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:main", + baseSessionKey: "agent:main:main", + }); + expect(route?.threadId).toBeUndefined(); + }); + it("returns null when the target is empty after normalization", () => { expect( resolveMattermostOutboundSessionRoute({ diff --git a/extensions/mattermost/src/session-route.ts b/extensions/mattermost/src/session-route.ts index 5bbd3e4da47..689e12c46da 100644 --- a/extensions/mattermost/src/session-route.ts +++ b/extensions/mattermost/src/session-route.ts @@ -1,11 +1,10 @@ import { buildChannelOutboundSessionRoute, - resolveThreadSessionKeys, + buildThreadAwareOutboundSessionRoute, stripChannelTargetPrefix, stripTargetKindPrefix, type ChannelOutboundSessionRouteParams, } from "openclaw/plugin-sdk/core"; -import { normalizeOutboundThreadId } from "openclaw/plugin-sdk/routing"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; export function resolveMattermostOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) { @@ -40,14 +39,12 @@ export function resolveMattermostOutboundSessionRoute(params: ChannelOutboundSes from: isUser ? `mattermost:${rawId}` : `mattermost:channel:${rawId}`, to: isUser ? `user:${rawId}` : `channel:${rawId}`, }); - const threadId = normalizeOutboundThreadId(params.replyToId ?? params.threadId); - const threadKeys = resolveThreadSessionKeys({ - baseSessionKey: baseRoute.baseSessionKey, - threadId, + return buildThreadAwareOutboundSessionRoute({ + route: baseRoute, + replyToId: params.replyToId, + threadId: params.threadId, + currentSessionKey: params.currentSessionKey, + canRecoverCurrentThread: ({ route }) => + route.chatType !== "direct" || (params.cfg.session?.dmScope ?? "main") !== "main", }); - return { - ...baseRoute, - sessionKey: threadKeys.sessionKey, - ...(threadId !== undefined ? { threadId } : {}), - }; } diff --git a/extensions/qa-channel/src/channel.test.ts b/extensions/qa-channel/src/channel.test.ts index 09793f4c510..77a65c3cf48 100644 --- a/extensions/qa-channel/src/channel.test.ts +++ b/extensions/qa-channel/src/channel.test.ts @@ -127,6 +127,53 @@ async function startQaChannelTestHarness(params?: { } describe("qa-channel plugin", () => { + it("derives thread-aware outbound session routes from explicit thread targets", async () => { + const route = await qaChannelPlugin.messaging?.resolveOutboundSessionRoute?.({ + cfg: {}, + agentId: "main", + accountId: "default", + target: "thread:qa-room/thread-1", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:qa-channel:channel:thread:qa-room/thread-1", + baseSessionKey: "agent:main:qa-channel:channel:thread:qa-room/thread-1", + }); + expect(route?.threadId).toBeUndefined(); + }); + + it("recovers thread-aware outbound session routes from currentSessionKey", async () => { + const route = await qaChannelPlugin.messaging?.resolveOutboundSessionRoute?.({ + cfg: {}, + agentId: "main", + accountId: "default", + target: "channel:qa-room", + currentSessionKey: "agent:main:qa-channel:channel:channel:qa-room:thread:thread-1", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:qa-channel:channel:channel:qa-room:thread:thread-1", + baseSessionKey: "agent:main:qa-channel:channel:channel:qa-room", + threadId: "thread-1", + }); + }); + + it('does not recover currentSessionKey threads for shared dmScope "main" DMs', async () => { + const route = await qaChannelPlugin.messaging?.resolveOutboundSessionRoute?.({ + cfg: {}, + agentId: "main", + accountId: "default", + target: "dm:alice", + currentSessionKey: "agent:main:main:thread:thread-1", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:main", + baseSessionKey: "agent:main:main", + }); + expect(route?.threadId).toBeUndefined(); + }); + it("roundtrips inbound DM traffic through the qa bus", { timeout: 20_000 }, async () => { const harness = await startQaChannelTestHarness({ allowFrom: ["*"] }); diff --git a/extensions/qa-channel/src/channel.ts b/extensions/qa-channel/src/channel.ts index ea941c59d79..3b3388bb4f7 100644 --- a/extensions/qa-channel/src/channel.ts +++ b/extensions/qa-channel/src/channel.ts @@ -1,5 +1,6 @@ import { buildChannelOutboundSessionRoute, + buildThreadAwareOutboundSessionRoute, createChatChannelPlugin, } from "openclaw/plugin-sdk/channel-core"; import { getChatChannelMeta } from "openclaw/plugin-sdk/channel-plugin-common"; @@ -66,9 +67,17 @@ export const qaChannelPlugin: ChannelPlugin = createCh /^((dm|channel):|thread:[^/]+\/)/i.test(raw.trim()) || raw.trim().length > 0, hint: "", }, - resolveOutboundSessionRoute: ({ cfg, agentId, accountId, target, threadId }) => { + resolveOutboundSessionRoute: ({ + cfg, + agentId, + accountId, + target, + replyToId, + threadId, + currentSessionKey, + }) => { const parsed = parseQaTarget(target); - return buildChannelOutboundSessionRoute({ + const baseRoute = buildChannelOutboundSessionRoute({ cfg, agentId, channel: CHANNEL_ID, @@ -80,7 +89,14 @@ export const qaChannelPlugin: ChannelPlugin = createCh chatType: parsed.chatType, from: `qa-channel:${accountId ?? DEFAULT_ACCOUNT_ID}`, to: buildQaTarget(parsed), - threadId: threadId ?? parsed.threadId, + }); + return buildThreadAwareOutboundSessionRoute({ + route: baseRoute, + replyToId, + threadId: threadId ?? (target.trim().startsWith("thread:") ? undefined : parsed.threadId), + currentSessionKey, + canRecoverCurrentThread: ({ route }) => + route.chatType !== "direct" || (cfg.session?.dmScope ?? "main") !== "main", }); }, }, diff --git a/extensions/slack/src/channel.ts b/extensions/slack/src/channel.ts index e00adaa12a0..9bdbba86438 100644 --- a/extensions/slack/src/channel.ts +++ b/extensions/slack/src/channel.ts @@ -4,7 +4,10 @@ import { createFlatAllowlistOverrideResolver, } from "openclaw/plugin-sdk/allowlist-config-edit"; import { adaptScopedAccountAccessor } from "openclaw/plugin-sdk/channel-config-helpers"; -import { createChatChannelPlugin } from "openclaw/plugin-sdk/channel-core"; +import { + buildThreadAwareOutboundSessionRoute, + createChatChannelPlugin, +} from "openclaw/plugin-sdk/channel-core"; import { createPairingPrefixStripper } from "openclaw/plugin-sdk/channel-pairing"; import { createChannelDirectoryAdapter, @@ -13,12 +16,7 @@ import { import { buildPassiveProbedChannelStatusSummary } from "openclaw/plugin-sdk/extension-shared"; import { createLazyRuntimeModule } from "openclaw/plugin-sdk/lazy-runtime"; import { resolveOutboundSendDep } from "openclaw/plugin-sdk/outbound-runtime"; -import { - buildOutboundBaseSessionKey, - normalizeOutboundThreadId, - resolveThreadSessionKeys, - type RoutePeer, -} from "openclaw/plugin-sdk/routing"; +import { buildOutboundBaseSessionKey, type RoutePeer } from "openclaw/plugin-sdk/routing"; import { createComputedAccountStatusAdapter, createDefaultChannelRuntimeState, @@ -186,6 +184,18 @@ function buildSlackBaseSessionKey(params: { return buildOutboundBaseSessionKey({ ...params, channel: "slack" }); } +function shouldRecoverSlackThreadFromCurrentSession(params: { + cfg: OpenClawConfig; + peerKind: RoutePeer["kind"]; +}): boolean { + // Shared DM sessions (dmScope="main") do not encode the DM peer in the base key, + // so inheriting a prior thread can bleed across unrelated direct-message targets. + if (params.peerKind === "direct" && (params.cfg.session?.dmScope ?? "main") === "main") { + return false; + } + return true; +} + async function resolveSlackOutboundSessionRoute(params: { cfg: OpenClawConfig; agentId: string; @@ -193,6 +203,7 @@ async function resolveSlackOutboundSessionRoute(params: { target: string; replyToId?: string | null; threadId?: string | number | null; + currentSessionKey?: string | null; }) { const parsed = parseSlackTarget(params.target, { defaultKind: "channel" }); if (!parsed) { @@ -223,25 +234,29 @@ async function resolveSlackOutboundSessionRoute(params: { accountId: params.accountId, peer, }); - const threadId = normalizeOutboundThreadId(params.threadId ?? params.replyToId); - const threadKeys = resolveThreadSessionKeys({ - baseSessionKey, - threadId, + return buildThreadAwareOutboundSessionRoute({ + route: { + sessionKey: baseSessionKey, + baseSessionKey, + peer, + chatType: peerKind === "direct" ? ("direct" as const) : ("channel" as const), + from: + peerKind === "direct" + ? `slack:${parsed.id}` + : peerKind === "group" + ? `slack:group:${parsed.id}` + : `slack:channel:${parsed.id}`, + to: peerKind === "direct" ? `user:${parsed.id}` : `channel:${parsed.id}`, + }, + replyToId: params.replyToId, + threadId: params.threadId, + currentSessionKey: params.currentSessionKey, + canRecoverCurrentThread: () => + shouldRecoverSlackThreadFromCurrentSession({ + cfg: params.cfg, + peerKind, + }), }); - return { - sessionKey: threadKeys.sessionKey, - baseSessionKey, - peer, - chatType: peerKind === "direct" ? ("direct" as const) : ("channel" as const), - from: - peerKind === "direct" - ? `slack:${parsed.id}` - : peerKind === "group" - ? `slack:group:${parsed.id}` - : `slack:channel:${parsed.id}`, - to: peerKind === "direct" ? `user:${parsed.id}` : `channel:${parsed.id}`, - threadId, - }; } function formatSlackScopeDiagnostic(params: { diff --git a/extensions/telegram/src/channel.ts b/extensions/telegram/src/channel.ts index 21f471583fd..28002323623 100644 --- a/extensions/telegram/src/channel.ts +++ b/extensions/telegram/src/channel.ts @@ -4,7 +4,12 @@ import { createNestedAllowlistOverrideResolver, } from "openclaw/plugin-sdk/allowlist-config-edit"; import type { ChannelMessageActionAdapter } from "openclaw/plugin-sdk/channel-contract"; -import { clearAccountEntryFields, createChatChannelPlugin } from "openclaw/plugin-sdk/channel-core"; +import { + buildChannelOutboundSessionRoute, + buildThreadAwareOutboundSessionRoute, + clearAccountEntryFields, + createChatChannelPlugin, +} from "openclaw/plugin-sdk/channel-core"; import { createAccountStatusSink } from "openclaw/plugin-sdk/channel-lifecycle"; import { createPairingPrefixStripper } from "openclaw/plugin-sdk/channel-pairing"; import { attachChannelToResult } from "openclaw/plugin-sdk/channel-send-result"; @@ -21,12 +26,7 @@ import { resolveOutboundSendDep, type OutboundSendDeps, } from "openclaw/plugin-sdk/outbound-runtime"; -import { - buildOutboundBaseSessionKey, - normalizeOutboundThreadId, - resolveThreadSessionKeys, - type RoutePeer, -} from "openclaw/plugin-sdk/routing"; +import { type RoutePeer } from "openclaw/plugin-sdk/routing"; import { createComputedAccountStatusAdapter, createDefaultChannelRuntimeState, @@ -445,30 +445,22 @@ function shouldStripTelegramThreadFromAnnounceOrigin(params: { return entryTarget.to !== requesterTarget.to; } -function buildTelegramBaseSessionKey(params: { - cfg: OpenClawConfig; - agentId: string; - accountId?: string | null; - peer: RoutePeer; -}) { - return buildOutboundBaseSessionKey({ ...params, channel: "telegram" }); -} - function resolveTelegramOutboundSessionRoute(params: { cfg: OpenClawConfig; agentId: string; accountId?: string | null; target: string; resolvedTarget?: { kind: string }; + replyToId?: string | null; threadId?: string | number | null; + currentSessionKey?: string | null; }) { const parsed = parseTelegramTarget(params.target); const chatId = parsed.chatId.trim(); if (!chatId) { return null; } - const fallbackThreadId = normalizeOutboundThreadId(params.threadId); - const resolvedThreadId = parsed.messageThreadId ?? parseTelegramThreadId(fallbackThreadId); + const resolvedThreadId = parsed.messageThreadId ?? parseTelegramThreadId(params.threadId); const isGroup = parsed.chatType === "group" || (parsed.chatType === "unknown" && @@ -480,20 +472,12 @@ function resolveTelegramOutboundSessionRoute(params: { kind: isGroup ? "group" : "direct", id: peerId, }; - const baseSessionKey = buildTelegramBaseSessionKey({ + const baseRoute = buildChannelOutboundSessionRoute({ cfg: params.cfg, agentId: params.agentId, + channel: "telegram", accountId: params.accountId, peer, - }); - const threadKeys = - resolvedThreadId && !isGroup - ? resolveThreadSessionKeys({ baseSessionKey, threadId: String(resolvedThreadId) }) - : null; - return { - sessionKey: threadKeys?.sessionKey ?? baseSessionKey, - baseSessionKey, - peer, chatType: isGroup ? ("group" as const) : ("direct" as const), from: isGroup ? `telegram:group:${peerId}` @@ -501,7 +485,25 @@ function resolveTelegramOutboundSessionRoute(params: { ? `telegram:${chatId}:topic:${resolvedThreadId}` : `telegram:${chatId}`, to: `telegram:${chatId}`, + ...(isGroup && resolvedThreadId !== undefined ? { threadId: resolvedThreadId } : {}), + }); + if (isGroup) { + return baseRoute; + } + const route = buildThreadAwareOutboundSessionRoute({ + route: baseRoute, threadId: resolvedThreadId, + currentSessionKey: params.currentSessionKey, + precedence: ["threadId", "currentSession"], + canRecoverCurrentThread: ({ route }) => + route.chatType !== "direct" || (params.cfg.session?.dmScope ?? "main") !== "main", + }); + return { + ...route, + from: + route.threadId !== undefined + ? `telegram:${chatId}:topic:${route.threadId}` + : `telegram:${chatId}`, }; } diff --git a/extensions/telegram/src/session-route.test.ts b/extensions/telegram/src/session-route.test.ts new file mode 100644 index 00000000000..24b37d615ce --- /dev/null +++ b/extensions/telegram/src/session-route.test.ts @@ -0,0 +1,62 @@ +import { describe, expect, it } from "vitest"; +import { telegramPlugin } from "./channel.js"; + +describe("telegram session route", () => { + it("keeps direct topic thread ids in a thread session suffix", async () => { + const route = await telegramPlugin.messaging?.resolveOutboundSessionRoute?.({ + cfg: {}, + agentId: "main", + target: "12345:topic:99", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:main:thread:99", + baseSessionKey: "agent:main:main", + threadId: 99, + }); + }); + + it("recovers direct topic thread routes from currentSessionKey when the DM scope is isolated", async () => { + const route = await telegramPlugin.messaging?.resolveOutboundSessionRoute?.({ + cfg: { session: { dmScope: "per-channel-peer" } }, + agentId: "main", + target: "12345", + currentSessionKey: "agent:main:telegram:direct:12345:thread:12345:99", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:telegram:direct:12345:thread:12345:99", + baseSessionKey: "agent:main:telegram:direct:12345", + threadId: "12345:99", + }); + }); + + it('does not recover currentSessionKey threads for shared dmScope "main" DMs', async () => { + const route = await telegramPlugin.messaging?.resolveOutboundSessionRoute?.({ + cfg: {}, + agentId: "main", + target: "12345", + currentSessionKey: "agent:main:main:thread:12345:99", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:main", + baseSessionKey: "agent:main:main", + }); + expect(route?.threadId).toBeUndefined(); + }); + + it("keeps group topic ids in the group peer route instead of adding a thread suffix", async () => { + const route = await telegramPlugin.messaging?.resolveOutboundSessionRoute?.({ + cfg: {}, + agentId: "main", + target: "-100:topic:99", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:telegram:group:-100:topic:99", + baseSessionKey: "agent:main:telegram:group:-100:topic:99", + threadId: 99, + }); + }); +}); diff --git a/src/infra/outbound/outbound-session.test-helpers.ts b/src/infra/outbound/outbound-session.test-helpers.ts index a17b4859e8d..6b4a60dfe5c 100644 --- a/src/infra/outbound/outbound-session.test-helpers.ts +++ b/src/infra/outbound/outbound-session.test-helpers.ts @@ -2,6 +2,7 @@ import type { ChannelPlugin } from "../../channels/plugins/types.plugin.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { buildChannelOutboundSessionRoute, + buildThreadAwareOutboundSessionRoute, stripChannelTargetPrefix, stripTargetKindPrefix, type ChannelOutboundSessionRouteParams, @@ -9,7 +10,6 @@ import { import { buildOutboundBaseSessionKey, normalizeOutboundThreadId, - resolveThreadSessionKeys, type RoutePeer, } from "../../plugin-sdk/routing.js"; import { setActivePluginRegistry } from "../../plugins/runtime.js"; @@ -62,21 +62,19 @@ function buildThreadedChannelRoute(params: { accountId: params.accountId, peer: params.peer, }); - const normalizedThreadId = normalizeOutboundThreadId(params.threadId); - const threadKeys = resolveThreadSessionKeys({ - baseSessionKey, - threadId: normalizedThreadId, + return buildThreadAwareOutboundSessionRoute({ + route: { + sessionKey: baseSessionKey, + baseSessionKey, + peer: params.peer, + chatType: params.chatType, + from: params.from, + to: params.to, + }, + threadId: params.threadId, useSuffix: params.useSuffix, + precedence: ["threadId", "replyToId", "currentSession"], }); - return { - sessionKey: threadKeys.sessionKey, - baseSessionKey, - peer: params.peer, - chatType: params.chatType, - from: params.from, - to: params.to, - ...(normalizedThreadId !== undefined ? { threadId: params.threadId } : {}), - }; } function parseForumTargetForTest(raw: string): { diff --git a/src/plugin-sdk/channel-core.ts b/src/plugin-sdk/channel-core.ts index 06e9e4e054f..b4ddcfdc905 100644 --- a/src/plugin-sdk/channel-core.ts +++ b/src/plugin-sdk/channel-core.ts @@ -16,11 +16,13 @@ export const createChannelPluginBase: typeof createChannelPluginBaseFromCore = ( export { buildChannelConfigSchema, buildChannelOutboundSessionRoute, + buildThreadAwareOutboundSessionRoute, clearAccountEntryFields, createChatChannelPlugin, defineChannelPluginEntry, defineSetupPluginEntry, parseOptionalDelimitedEntries, + recoverCurrentThreadSessionId, stripChannelTargetPrefix, stripTargetKindPrefix, tryReadSecretFileSync, diff --git a/src/plugin-sdk/core.ts b/src/plugin-sdk/core.ts index 6f4648a1b1b..a38db81b67b 100644 --- a/src/plugin-sdk/core.ts +++ b/src/plugin-sdk/core.ts @@ -25,11 +25,17 @@ import type { ReplyToMode } from "../config/types.base.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { buildOutboundBaseSessionKey } from "../infra/outbound/base-session-key.js"; import type { OutboundDeliveryResult } from "../infra/outbound/deliver.js"; +import { normalizeOutboundThreadId } from "../infra/outbound/thread-id.js"; import { resolveBundledPluginsDir } from "../plugins/bundled-dir.js"; import type { ProviderRuntimeModel } from "../plugins/provider-runtime-model.types.js"; import type { PluginRuntime } from "../plugins/runtime/types.js"; import type { OpenClawPluginApi } from "../plugins/types.js"; -import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; +import { resolveThreadSessionKeys } from "../routing/session-key.js"; +import { parseThreadSessionSuffix } from "../sessions/session-key-utils.js"; +import { + normalizeLowercaseStringOrEmpty, + normalizeOptionalLowercaseString, +} from "../shared/string-coerce.js"; export type { AgentHarness, @@ -309,6 +315,96 @@ export function buildChannelOutboundSessionRoute(params: { }; } +export type ThreadAwareOutboundSessionRouteThreadSource = + | "replyToId" + | "threadId" + | "currentSession"; + +export type ThreadAwareOutboundSessionRouteRecoveryContext = { + route: ChannelOutboundSessionRoute; + currentBaseSessionKey: string; + currentThreadId: string; +}; + +export function recoverCurrentThreadSessionId(params: { + route: ChannelOutboundSessionRoute; + currentSessionKey?: string | null; + canRecover?: (context: ThreadAwareOutboundSessionRouteRecoveryContext) => boolean; +}): string | undefined { + const current = parseThreadSessionSuffix(params.currentSessionKey); + if (!current.baseSessionKey || !current.threadId) { + return undefined; + } + if ( + normalizeOptionalLowercaseString(current.baseSessionKey) !== + normalizeOptionalLowercaseString(params.route.baseSessionKey) + ) { + return undefined; + } + const context = { + route: params.route, + currentBaseSessionKey: current.baseSessionKey, + currentThreadId: current.threadId, + }; + if (params.canRecover && !params.canRecover(context)) { + return undefined; + } + return current.threadId; +} + +export function buildThreadAwareOutboundSessionRoute(params: { + route: ChannelOutboundSessionRoute; + replyToId?: string | number | null; + threadId?: string | number | null; + currentSessionKey?: string | null; + precedence?: readonly ThreadAwareOutboundSessionRouteThreadSource[]; + useSuffix?: boolean; + parentSessionKey?: string; + normalizeThreadId?: (threadId: string) => string; + canRecoverCurrentThread?: (context: ThreadAwareOutboundSessionRouteRecoveryContext) => boolean; +}): ChannelOutboundSessionRoute { + const recoveredThreadId = recoverCurrentThreadSessionId({ + route: params.route, + currentSessionKey: params.currentSessionKey, + canRecover: params.canRecoverCurrentThread, + }); + const candidates: Record< + ThreadAwareOutboundSessionRouteThreadSource, + { routeThreadId: string | number; sessionThreadId: string } | undefined + > = { + replyToId: resolveThreadAwareOutboundCandidate(params.replyToId), + threadId: resolveThreadAwareOutboundCandidate(params.threadId), + currentSession: resolveThreadAwareOutboundCandidate(recoveredThreadId), + }; + const precedence = params.precedence ?? ["replyToId", "threadId", "currentSession"]; + const candidate = precedence.map((source) => candidates[source]).find(Boolean); + const threadKeys = resolveThreadSessionKeys({ + baseSessionKey: params.route.baseSessionKey, + threadId: candidate?.sessionThreadId, + parentSessionKey: candidate ? params.parentSessionKey : undefined, + useSuffix: params.useSuffix, + normalizeThreadId: params.normalizeThreadId, + }); + return { + ...params.route, + sessionKey: threadKeys.sessionKey, + ...(candidate !== undefined ? { threadId: candidate.routeThreadId } : {}), + }; +} + +function resolveThreadAwareOutboundCandidate( + threadId?: string | number | null, +): { routeThreadId: string | number; sessionThreadId: string } | undefined { + const sessionThreadId = normalizeOutboundThreadId(threadId); + if (sessionThreadId === undefined) { + return undefined; + } + return { + routeThreadId: typeof threadId === "number" ? threadId : sessionThreadId, + sessionThreadId, + }; +} + /** Options for a channel plugin entry that should register a channel capability. */ type ChannelEntryConfigSchema = TPlugin extends ChannelPlugin diff --git a/src/plugin-sdk/routing.ts b/src/plugin-sdk/routing.ts index b1c6d3b55ed..7d7dd40c477 100644 --- a/src/plugin-sdk/routing.ts +++ b/src/plugin-sdk/routing.ts @@ -20,6 +20,7 @@ export { normalizeMainKey, normalizeOptionalAccountId, parseAgentSessionKey, + parseThreadSessionSuffix, resolveAgentIdFromSessionKey, resolveThreadSessionKeys, sanitizeAgentId, diff --git a/src/plugin-sdk/thread-aware-outbound-session-route.test.ts b/src/plugin-sdk/thread-aware-outbound-session-route.test.ts new file mode 100644 index 00000000000..b629d52b553 --- /dev/null +++ b/src/plugin-sdk/thread-aware-outbound-session-route.test.ts @@ -0,0 +1,116 @@ +import { describe, expect, it } from "vitest"; +import { + buildThreadAwareOutboundSessionRoute, + recoverCurrentThreadSessionId, + type ChannelOutboundSessionRoute, +} from "./core.js"; + +function baseRoute( + overrides: Partial = {}, +): ChannelOutboundSessionRoute { + return { + sessionKey: "agent:main:workspace:channel:c123", + baseSessionKey: "agent:main:workspace:channel:c123", + peer: { kind: "channel", id: "c123" }, + chatType: "channel", + from: "workspace:channel:c123", + to: "channel:c123", + ...overrides, + }; +} + +describe("buildThreadAwareOutboundSessionRoute", () => { + it("uses replyToId before threadId and recovered current-session thread by default", () => { + const route = buildThreadAwareOutboundSessionRoute({ + route: baseRoute(), + replyToId: "reply-1", + threadId: "thread-1", + currentSessionKey: "agent:main:workspace:channel:c123:thread:current-1", + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:workspace:channel:c123:thread:reply-1", + threadId: "reply-1", + }); + }); + + it("supports provider-specific threadId-first precedence", () => { + const route = buildThreadAwareOutboundSessionRoute({ + route: baseRoute(), + replyToId: "reply-1", + threadId: "thread-1", + precedence: ["threadId", "replyToId", "currentSession"], + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:workspace:channel:c123:thread:thread-1", + threadId: "thread-1", + }); + }); + + it("keeps numeric delivery thread ids on the route while stringifying the session suffix", () => { + const route = buildThreadAwareOutboundSessionRoute({ + route: baseRoute(), + threadId: 99, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:workspace:channel:c123:thread:99", + threadId: 99, + }); + }); + + it("recovers a current-session thread only when the base session matches", () => { + expect( + recoverCurrentThreadSessionId({ + route: baseRoute(), + currentSessionKey: "agent:main:workspace:channel:c123:thread:current-1", + }), + ).toBe("current-1"); + expect( + recoverCurrentThreadSessionId({ + route: baseRoute(), + currentSessionKey: "agent:main:workspace:channel:other:thread:current-1", + }), + ).toBeUndefined(); + }); + + it("lets providers veto current-session recovery", () => { + const route = buildThreadAwareOutboundSessionRoute({ + route: baseRoute(), + currentSessionKey: "agent:main:workspace:channel:c123:thread:current-1", + canRecoverCurrentThread: () => false, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:workspace:channel:c123", + }); + expect(route.threadId).toBeUndefined(); + }); + + it("preserves provider-specific thread case when requested", () => { + const route = buildThreadAwareOutboundSessionRoute({ + route: baseRoute(), + threadId: "$EventID:Example.Org", + normalizeThreadId: (threadId) => threadId, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:workspace:channel:c123:thread:$EventID:Example.Org", + threadId: "$EventID:Example.Org", + }); + }); + + it("can carry a delivery thread without adding a session suffix", () => { + const route = buildThreadAwareOutboundSessionRoute({ + route: baseRoute(), + threadId: "thread-1", + useSuffix: false, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:workspace:channel:c123", + threadId: "thread-1", + }); + }); +}); diff --git a/src/routing/session-key.ts b/src/routing/session-key.ts index 9d3e67896bc..5e56066a7ca 100644 --- a/src/routing/session-key.ts +++ b/src/routing/session-key.ts @@ -9,6 +9,7 @@ export { isAcpSessionKey, isSubagentSessionKey, parseAgentSessionKey, + parseThreadSessionSuffix, type ParsedAgentSessionKey, } from "../sessions/session-key-utils.js"; export {