From 6cf3e3d5b47c487684fe681cdf5093c19c7b7d47 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Sun, 5 Apr 2026 10:18:09 -0400 Subject: [PATCH] fix(matrix): align DM room session routing --- CHANGELOG.md | 1 + docs/channels/matrix.md | 7 +- docs/gateway/configuration-reference.md | 1 + extensions/matrix/src/config-schema.test.ts | 12 ++ extensions/matrix/src/config-schema.ts | 1 + .../matrix/monitor/handler.test-helpers.ts | 2 + .../matrix/src/matrix/monitor/handler.test.ts | 111 ++++++++++++++ .../matrix/src/matrix/monitor/handler.ts | 134 ++++++++++++++++- extensions/matrix/src/matrix/monitor/index.ts | 2 + .../src/matrix/monitor/reaction-events.ts | 1 + .../matrix/src/matrix/monitor/route.test.ts | 35 ++++- extensions/matrix/src/matrix/monitor/route.ts | 54 ++++++- extensions/matrix/src/session-route.test.ts | 135 ++++++++++++++++++ extensions/matrix/src/session-route.ts | 124 ++++++++++++++-- extensions/matrix/src/types.ts | 6 + src/channels/plugins/types.core.ts | 1 + src/infra/outbound/message-action-runner.ts | 1 + .../outbound/message-action-threading.ts | 2 + src/infra/outbound/outbound-session.ts | 1 + 19 files changed, 607 insertions(+), 24 deletions(-) create mode 100644 extensions/matrix/src/session-route.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ca261347d6..9278da61896 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -200,6 +200,7 @@ Docs: https://docs.openclaw.ai - Agents/video generation: accept `agents.defaults.videoGenerationModel` in strict config validation and `openclaw config set/get`, so gateways using `video_generate` no longer fail to boot after enabling a video model. - Discord/image generation: persist volatile workspace-generated media into durable outbound media before final reply delivery so generated image replies stop failing with missing local workspace paths. - Matrix: move legacy top-level `avatarUrl` into the default account during multi-account promotion and keep env-backed account setup avatar config persisted. (#61437) Thanks @gumadeiras. +- Matrix/DM sessions: add `channels.matrix.dm.sessionScope`, shared-session collision notices, and aligned outbound session reuse so separate Matrix DM rooms can keep distinct context when configured. (#61373) Thanks @gumadeiras. ## 2026.4.2 diff --git a/docs/channels/matrix.md b/docs/channels/matrix.md index 543cff92b42..87b33582b20 100644 --- a/docs/channels/matrix.md +++ b/docs/channels/matrix.md @@ -152,6 +152,7 @@ This is a practical baseline config with DM pairing, room allowlist, and E2EE en dm: { policy: "pairing", + sessionScope: "per-room", threadReplies: "off", }, @@ -522,12 +523,15 @@ The repair flow does not delete old rooms automatically. It only picks the healt Matrix supports native Matrix threads for both automatic replies and message-tool sends. +- `dm.sessionScope: "per-user"` (default) keeps Matrix DM routing sender-scoped, so multiple DM rooms can share one session when they resolve to the same peer. +- `dm.sessionScope: "per-room"` isolates each Matrix DM room into its own session key while still using normal DM auth and allowlist checks. - `threadReplies: "off"` keeps replies top-level and keeps inbound threaded messages on the parent session. - `threadReplies: "inbound"` replies inside a thread only when the inbound message was already in that thread. - `threadReplies: "always"` keeps room replies in a thread rooted at the triggering message and routes that conversation through the matching thread-scoped session from the first triggering message. - `dm.threadReplies` overrides the top-level setting for DMs only. For example, you can keep room threads isolated while keeping DMs flat. - Inbound threaded messages include the thread root message as extra agent context. - Message-tool sends now auto-inherit the current Matrix thread when the target is the same room, or the same DM user target, unless an explicit `threadId` is provided. +- When OpenClaw sees a Matrix DM room collide with another DM room on the same shared Matrix DM session, it posts a one-time `m.notice` in that room with the `/focus` escape hatch when thread bindings are enabled and the `dm.sessionScope` hint. - Runtime thread bindings are supported for Matrix. `/focus`, `/unfocus`, `/agents`, `/session idle`, `/session max-age`, and thread-bound `/acp spawn` now work in Matrix rooms and DMs. - Top-level Matrix room/DM `/focus` creates a new Matrix thread and binds it to the target session when `threadBindings.spawnSubagentSessions=true`. - Running `/focus` or `/acp spawn --thread here` inside an existing Matrix thread binds that current thread instead. @@ -842,8 +846,9 @@ Live directory lookup uses the logged-in Matrix account: - `mediaMaxMb`: media size cap in MB for Matrix media handling. It applies to outbound sends and inbound media processing. - `autoJoin`: invite auto-join policy (`always`, `allowlist`, `off`). Default: `off`. - `autoJoinAllowlist`: rooms/aliases allowed when `autoJoin` is `allowlist`. Alias entries are resolved to room IDs during invite handling; OpenClaw does not trust alias state claimed by the invited room. -- `dm`: DM policy block (`enabled`, `policy`, `allowFrom`, `threadReplies`). +- `dm`: DM policy block (`enabled`, `policy`, `allowFrom`, `sessionScope`, `threadReplies`). - `dm.allowFrom` entries should be full Matrix user IDs unless you already resolved them through live directory lookup. +- `dm.sessionScope`: `per-user` (default) or `per-room`. Use `per-room` when you want each Matrix DM room to keep separate context even if the peer is the same. - `dm.threadReplies`: DM-only thread policy override (`off`, `inbound`, `always`). It overrides the top-level `threadReplies` setting for both reply placement and session isolation in DMs. - `execApprovals`: Matrix-native exec approval delivery (`enabled`, `approvers`, `target`, `agentFilter`, `sessionFilter`). - `execApprovals.approvers`: Matrix user IDs allowed to approve exec requests. Optional when `dm.allowFrom` already identifies the approvers. diff --git a/docs/gateway/configuration-reference.md b/docs/gateway/configuration-reference.md index 50ac90b172e..63224be8c07 100644 --- a/docs/gateway/configuration-reference.md +++ b/docs/gateway/configuration-reference.md @@ -655,6 +655,7 @@ Matrix is extension-backed and configured under `channels.matrix`. - `sessionFilter`: optional session key patterns (substring or regex). - `target`: where to send approval prompts. `"dm"` (default), `"channel"` (originating room), or `"both"`. - Per-account overrides: `channels.matrix.accounts..execApprovals`. +- `channels.matrix.dm.sessionScope` controls how Matrix DMs group into sessions: `per-user` (default) shares by routed peer, while `per-room` isolates each DM room. - Matrix status probes and live directory lookups use the same proxy policy as runtime traffic. - Full Matrix configuration, targeting rules, and setup examples are documented in [Matrix](/channels/matrix). diff --git a/extensions/matrix/src/config-schema.test.ts b/extensions/matrix/src/config-schema.test.ts index 806c66ceb78..b9c11aa3a02 100644 --- a/extensions/matrix/src/config-schema.test.ts +++ b/extensions/matrix/src/config-schema.test.ts @@ -31,6 +31,18 @@ describe("MatrixConfigSchema SecretInput", () => { expect(result.success).toBe(true); }); + it("accepts dm sessionScope overrides", () => { + const result = MatrixConfigSchema.safeParse({ + homeserver: "https://matrix.example.org", + accessToken: "token", + dm: { + policy: "pairing", + sessionScope: "per-room", + }, + }); + expect(result.success).toBe(true); + }); + it("accepts room-level account assignments", () => { const result = MatrixConfigSchema.safeParse({ homeserver: "https://matrix.example.org", diff --git a/extensions/matrix/src/config-schema.ts b/extensions/matrix/src/config-schema.ts index 716e68f764f..3247ecb69da 100644 --- a/extensions/matrix/src/config-schema.ts +++ b/extensions/matrix/src/config-schema.ts @@ -104,6 +104,7 @@ export const MatrixConfigSchema = z.object({ autoJoinAllowlist: AllowFromListSchema, groupAllowFrom: AllowFromListSchema, dm: buildNestedDmConfigSchema({ + sessionScope: z.enum(["per-user", "per-room"]).optional(), threadReplies: z.enum(["off", "inbound", "always"]).optional(), }), execApprovals: matrixExecApprovalsSchema, diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index 02b424c04a9..393b64ab682 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -31,6 +31,7 @@ type MatrixHandlerTestHarnessOptions = { replyToMode?: ReplyToMode; threadReplies?: "off" | "inbound" | "always"; dmThreadReplies?: "off" | "inbound" | "always"; + dmSessionScope?: "per-user" | "per-room"; streaming?: "partial" | "off"; blockStreamingEnabled?: boolean; dmEnabled?: boolean; @@ -214,6 +215,7 @@ export function createMatrixHandlerTestHarness( replyToMode: options.replyToMode ?? "off", threadReplies: options.threadReplies ?? "inbound", dmThreadReplies: options.dmThreadReplies, + dmSessionScope: options.dmSessionScope, streaming: options.streaming ?? "off", blockStreamingEnabled: options.blockStreamingEnabled ?? false, dmEnabled: options.dmEnabled ?? true, diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 15b7e829f98..9f85584b524 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -1,3 +1,6 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; import { __testing as sessionBindingTesting, registerSessionBindingAdapter, @@ -682,6 +685,114 @@ describe("matrix monitor handler pairing account scope", () => { ); }); + it("posts a one-time notice when another Matrix DM room already owns the shared DM session", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-shared-notice-")); + const storePath = path.join(tempDir, "sessions.json"); + fs.writeFileSync( + storePath, + JSON.stringify({ + "agent:ops:main": { + sessionId: "sess-main", + updatedAt: Date.now(), + deliveryContext: { + channel: "matrix", + to: "room:!other:example.org", + accountId: "ops", + }, + }, + }), + "utf8", + ); + const sendNotice = vi.fn(async () => "$notice"); + + try { + const { handler } = createMatrixHandlerTestHarness({ + isDirectMessage: true, + resolveStorePath: () => storePath, + client: { + sendMessage: sendNotice, + }, + }); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$dm1", + body: "follow up", + }), + ); + + expect(sendNotice).toHaveBeenCalledWith( + "!dm:example.org", + expect.objectContaining({ + msgtype: "m.notice", + body: expect.stringContaining("channels.matrix.dm.sessionScope"), + }), + ); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$dm2", + body: "again", + }), + ); + + expect(sendNotice).toHaveBeenCalledTimes(1); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + + it("skips the shared-session notice when Matrix DMs are isolated per room", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-room-scope-")); + const storePath = path.join(tempDir, "sessions.json"); + fs.writeFileSync( + storePath, + JSON.stringify({ + "agent:ops:main": { + sessionId: "sess-main", + updatedAt: Date.now(), + deliveryContext: { + channel: "matrix", + to: "room:!other:example.org", + accountId: "ops", + }, + }, + }), + "utf8", + ); + const sendNotice = vi.fn(async () => "$notice"); + + try { + const { handler, recordInboundSession } = createMatrixHandlerTestHarness({ + isDirectMessage: true, + dmSessionScope: "per-room", + resolveStorePath: () => storePath, + client: { + sendMessage: sendNotice, + }, + }); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$dm1", + body: "follow up", + }), + ); + + expect(sendNotice).not.toHaveBeenCalled(); + expect(recordInboundSession).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "agent:ops:matrix:channel:!dm:example.org", + }), + ); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + it("uses stable room ids instead of room-declared aliases in group context", async () => { const { handler, finalizeInboundContext } = createMatrixHandlerTestHarness({ isDirectMessage: false, diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 66514bafb2c..7381c859492 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -1,5 +1,9 @@ import { resolveControlCommandGate } from "openclaw/plugin-sdk/command-auth"; -import { resolveChannelContextVisibilityMode } from "openclaw/plugin-sdk/config-runtime"; +import { + loadSessionStore, + resolveChannelContextVisibilityMode, + resolveSessionStoreEntry, +} from "openclaw/plugin-sdk/config-runtime"; import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-runtime"; import { evaluateSupplementalContextVisibility } from "openclaw/plugin-sdk/security-runtime"; import type { CoreConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js"; @@ -68,6 +72,7 @@ import { isMatrixVerificationRoomMessage } from "./verification-utils.js"; const ALLOW_FROM_STORE_CACHE_TTL_MS = 30_000; const PAIRING_REPLY_COOLDOWN_MS = 5 * 60_000; const MAX_TRACKED_PAIRING_REPLY_SENDERS = 512; +const MAX_TRACKED_SHARED_DM_CONTEXT_NOTICES = 512; type MatrixAllowBotsMode = "off" | "mentions" | "all"; export type MatrixMonitorHandlerParams = { @@ -88,6 +93,8 @@ export type MatrixMonitorHandlerParams = { threadReplies: "off" | "inbound" | "always"; /** DM-specific threadReplies override. Falls back to threadReplies when absent. */ dmThreadReplies?: "off" | "inbound" | "always"; + /** DM session grouping behavior. */ + dmSessionScope?: "per-user" | "per-room"; streaming: "partial" | "off"; blockStreamingEnabled: boolean; dmEnabled: boolean; @@ -163,6 +170,103 @@ function resolveMatrixInboundBodyText(params: { }); } +function trimMaybeString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; +} + +function rememberTrackedRoom(set: Set, roomId: string): void { + set.add(roomId); + if (set.size > MAX_TRACKED_SHARED_DM_CONTEXT_NOTICES) { + const oldest = set.keys().next().value; + if (typeof oldest === "string") { + set.delete(oldest); + } + } +} + +function extractMatrixRoomIdFromTarget(value: unknown): string | undefined { + const trimmed = trimMaybeString(value); + if (!trimmed) { + return undefined; + } + return trimmed.startsWith("room:") ? trimMaybeString(trimmed.slice("room:".length)) : undefined; +} + +function resolveMatrixSharedDmContextNotice(params: { + storePath: string; + sessionKey: string; + roomId: string; + accountId: string; + dmSessionScope?: "per-user" | "per-room"; + sentRooms: Set; + logVerboseMessage: (message: string) => void; +}): string | null { + if ((params.dmSessionScope ?? "per-user") === "per-room") { + return null; + } + if (params.sentRooms.has(params.roomId)) { + return null; + } + + try { + const store = loadSessionStore(params.storePath); + const existing = resolveSessionStoreEntry({ + store, + sessionKey: params.sessionKey, + }).existing as + | { + deliveryContext?: { channel?: unknown; to?: unknown; accountId?: unknown }; + origin?: { provider?: unknown; to?: unknown; accountId?: unknown }; + lastChannel?: unknown; + lastTo?: unknown; + lastAccountId?: unknown; + } + | undefined; + if (!existing) { + return null; + } + + const priorChannel = + trimMaybeString(existing.deliveryContext?.channel) ?? + trimMaybeString(existing.lastChannel) ?? + trimMaybeString(existing.origin?.provider); + if (priorChannel && priorChannel !== "matrix") { + return null; + } + + const priorAccountId = + trimMaybeString(existing.deliveryContext?.accountId) ?? + trimMaybeString(existing.lastAccountId) ?? + trimMaybeString(existing.origin?.accountId); + if (priorAccountId && priorAccountId !== params.accountId) { + return null; + } + + const priorRoomId = + extractMatrixRoomIdFromTarget(existing.deliveryContext?.to) ?? + extractMatrixRoomIdFromTarget(existing.lastTo) ?? + extractMatrixRoomIdFromTarget(existing.origin?.to); + if (!priorRoomId || priorRoomId === params.roomId) { + return null; + } + + return [ + "This Matrix DM is sharing a session with another Matrix DM room.", + "Use /focus here for a one-off isolated thread session when thread bindings are enabled, or set", + "channels.matrix.dm.sessionScope to per-room to isolate each Matrix DM room.", + ].join(" "); + } catch (err) { + params.logVerboseMessage( + `matrix: failed checking shared DM session notice room=${params.roomId} (${String(err)})`, + ); + return null; + } +} + function resolveMatrixPendingHistoryText(params: { mentionPrecheckText: string; content: RoomMessageEventContent; @@ -214,6 +318,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam replyToMode, threadReplies, dmThreadReplies, + dmSessionScope, streaming, blockStreamingEnabled, dmEnabled, @@ -252,6 +357,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }); const roomHistoryTracker = createRoomHistoryTracker(); const roomIngressTails = new Map>(); + const sharedDmContextNoticeRooms = new Set(); const readStoreAllowFrom = async (): Promise => { const now = Date.now(); @@ -672,6 +778,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam roomId, senderId, isDirectMessage, + dmSessionScope, threadId: thread.threadId, eventTs: eventTs ?? undefined, resolveAgentRoute: core.channel.routing.resolveAgentRoute, @@ -1023,6 +1130,17 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam storePath, sessionKey: _route.sessionKey, }); + const sharedDmContextNotice = isDirectMessage + ? resolveMatrixSharedDmContextNotice({ + storePath, + sessionKey: _route.sessionKey, + roomId, + accountId: _route.accountId, + dmSessionScope, + sentRooms: sharedDmContextNoticeRooms, + logVerboseMessage, + }) + : null; const body = core.channel.reply.formatAgentEnvelope({ channel: "Matrix", from: envelopeFrom, @@ -1090,6 +1208,20 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }, }); + if (sharedDmContextNotice) { + rememberTrackedRoom(sharedDmContextNoticeRooms, roomId); + client + .sendMessage(roomId, { + msgtype: "m.notice", + body: sharedDmContextNotice, + }) + .catch((err) => { + logVerboseMessage( + `matrix: failed sending shared DM session notice room=${roomId}: ${String(err)}`, + ); + }); + } + const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n"); logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`); diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 40f1768ee13..5c70504c2ce 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -207,6 +207,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const dmEnabled = dmConfig?.enabled ?? true; const dmPolicyRaw = dmConfig?.policy ?? "pairing"; const dmPolicy = allowlistOnly && dmPolicyRaw !== "disabled" ? "allowlist" : dmPolicyRaw; + const dmSessionScope = dmConfig?.sessionScope ?? "per-user"; const textLimit = core.channel.text.resolveTextChunkLimit(cfg, "matrix", effectiveAccountId); const globalGroupChatHistoryLimit = ( cfg.messages as { groupChat?: { historyLimit?: number } } | undefined @@ -271,6 +272,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi replyToMode, threadReplies, dmThreadReplies, + dmSessionScope, streaming, blockStreamingEnabled, dmEnabled, diff --git a/extensions/matrix/src/matrix/monitor/reaction-events.ts b/extensions/matrix/src/matrix/monitor/reaction-events.ts index f475f9ef975..46924f2609d 100644 --- a/extensions/matrix/src/matrix/monitor/reaction-events.ts +++ b/extensions/matrix/src/matrix/monitor/reaction-events.ts @@ -169,6 +169,7 @@ export async function handleInboundMatrixReaction(params: { roomId: params.roomId, senderId: params.senderId, isDirectMessage: params.isDirectMessage, + dmSessionScope: accountConfig.dm?.sessionScope ?? "per-user", threadId: thread.threadId, eventTs: params.event.origin_server_ts, resolveAgentRoute: params.core.channel.routing.resolveAgentRoute, diff --git a/extensions/matrix/src/matrix/monitor/route.test.ts b/extensions/matrix/src/matrix/monitor/route.test.ts index 0ae00df7dd9..bf58d726c19 100644 --- a/extensions/matrix/src/matrix/monitor/route.test.ts +++ b/extensions/matrix/src/matrix/monitor/route.test.ts @@ -17,13 +17,19 @@ const baseCfg = { }, } satisfies OpenClawConfig; -function resolveDmRoute(cfg: OpenClawConfig) { +function resolveDmRoute( + cfg: OpenClawConfig, + opts: { + dmSessionScope?: "per-user" | "per-room"; + } = {}, +) { return resolveMatrixInboundRoute({ cfg, accountId: "ops", roomId: "!dm:example.org", senderId: "@alice:example.org", isDirectMessage: true, + dmSessionScope: opts.dmSessionScope, resolveAgentRoute, }); } @@ -97,6 +103,33 @@ describe("resolveMatrixInboundRoute", () => { expect(route.sessionKey).toBe("agent:room-agent:main"); }); + it("can isolate Matrix DMs per room without changing agent selection", () => { + const cfg = { + ...baseCfg, + bindings: [ + { + agentId: "sender-agent", + match: { + channel: "matrix", + accountId: "ops", + peer: { kind: "direct", id: "@alice:example.org" }, + }, + }, + ], + } satisfies OpenClawConfig; + + const { route, configuredBinding } = resolveDmRoute(cfg, { + dmSessionScope: "per-room", + }); + + expect(configuredBinding).toBeNull(); + expect(route.agentId).toBe("sender-agent"); + expect(route.matchedBy).toBe("binding.peer"); + expect(route.sessionKey).toBe("agent:sender-agent:matrix:channel:!dm:example.org"); + expect(route.mainSessionKey).toBe("agent:sender-agent:main"); + expect(route.lastRoutePolicy).toBe("session"); + }); + it("lets configured ACP room bindings override DM parent-peer routing", () => { const cfg = { ...baseCfg, diff --git a/extensions/matrix/src/matrix/monitor/route.ts b/extensions/matrix/src/matrix/monitor/route.ts index bb052574cd7..4e08c836386 100644 --- a/extensions/matrix/src/matrix/monitor/route.ts +++ b/extensions/matrix/src/matrix/monitor/route.ts @@ -1,4 +1,4 @@ -import { deriveLastRoutePolicy } from "openclaw/plugin-sdk/routing"; +import { buildAgentSessionKey, deriveLastRoutePolicy } from "openclaw/plugin-sdk/routing"; import { getSessionBindingService, resolveAgentIdFromSessionKey, @@ -10,12 +10,34 @@ import { resolveMatrixThreadSessionKeys } from "./threads.js"; type MatrixResolvedRoute = ReturnType; +function resolveMatrixDmSessionKey(params: { + accountId: string; + agentId: string; + roomId: string; + dmSessionScope?: "per-user" | "per-room"; + fallbackSessionKey: string; +}): string { + if (params.dmSessionScope !== "per-room") { + return params.fallbackSessionKey; + } + return buildAgentSessionKey({ + agentId: params.agentId, + channel: "matrix", + accountId: params.accountId, + peer: { + kind: "channel", + id: params.roomId, + }, + }).toLowerCase(); +} + export function resolveMatrixInboundRoute(params: { cfg: CoreConfig; accountId: string; roomId: string; senderId: string; isDirectMessage: boolean; + dmSessionScope?: "per-user" | "per-room"; threadId?: string; eventTs?: number; resolveAgentRoute: PluginRuntime["channel"]["routing"]["resolveAgentRoute"]; @@ -98,21 +120,39 @@ export function resolveMatrixInboundRoute(params: { } : baseRoute; + const dmSessionKey = params.isDirectMessage + ? resolveMatrixDmSessionKey({ + accountId: params.accountId, + agentId: effectiveRoute.agentId, + roomId: params.roomId, + dmSessionScope: params.dmSessionScope, + fallbackSessionKey: effectiveRoute.sessionKey, + }) + : effectiveRoute.sessionKey; + const routeWithDmScope = + dmSessionKey === effectiveRoute.sessionKey + ? effectiveRoute + : { + ...effectiveRoute, + sessionKey: dmSessionKey, + lastRoutePolicy: "session" as const, + }; + // When no binding overrides the session key, isolate threads into their own sessions. if (!configuredBinding && !configuredSessionKey && params.threadId) { const threadKeys = resolveMatrixThreadSessionKeys({ - baseSessionKey: effectiveRoute.sessionKey, + baseSessionKey: routeWithDmScope.sessionKey, threadId: params.threadId, - parentSessionKey: effectiveRoute.sessionKey, + parentSessionKey: routeWithDmScope.sessionKey, }); return { route: { - ...effectiveRoute, + ...routeWithDmScope, sessionKey: threadKeys.sessionKey, - mainSessionKey: threadKeys.parentSessionKey ?? effectiveRoute.sessionKey, + mainSessionKey: threadKeys.parentSessionKey ?? routeWithDmScope.sessionKey, lastRoutePolicy: deriveLastRoutePolicy({ sessionKey: threadKeys.sessionKey, - mainSessionKey: threadKeys.parentSessionKey ?? effectiveRoute.sessionKey, + mainSessionKey: threadKeys.parentSessionKey ?? routeWithDmScope.sessionKey, }), }, configuredBinding, @@ -121,7 +161,7 @@ export function resolveMatrixInboundRoute(params: { } return { - route: effectiveRoute, + route: routeWithDmScope, configuredBinding, runtimeBindingId: null, }; diff --git a/extensions/matrix/src/session-route.test.ts b/extensions/matrix/src/session-route.test.ts new file mode 100644 index 00000000000..e9711c2172b --- /dev/null +++ b/extensions/matrix/src/session-route.test.ts @@ -0,0 +1,135 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import type { OpenClawConfig } from "./runtime-api.js"; +import { resolveMatrixOutboundSessionRoute } from "./session-route.js"; + +const tempDirs = new Set(); + +function createTempStore(entries: Record): string { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-session-route-")); + tempDirs.add(tempDir); + const storePath = path.join(tempDir, "sessions.json"); + fs.writeFileSync(storePath, JSON.stringify(entries), "utf8"); + return storePath; +} + +afterEach(() => { + for (const tempDir of tempDirs) { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + tempDirs.clear(); +}); + +describe("resolveMatrixOutboundSessionRoute", () => { + it("reuses the current DM room session for same-user sends when Matrix DMs are per-room", () => { + const storePath = createTempStore({ + "agent:main:matrix:channel:!dm:example.org": { + sessionId: "sess-1", + updatedAt: Date.now(), + chatType: "direct", + origin: { + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:!dm:example.org", + accountId: "ops", + }, + deliveryContext: { + channel: "matrix", + to: "room:!dm:example.org", + accountId: "ops", + }, + }, + }); + const cfg = { + session: { + store: storePath, + }, + channels: { + matrix: { + dm: { + sessionScope: "per-room", + }, + }, + }, + } satisfies OpenClawConfig; + + const route = resolveMatrixOutboundSessionRoute({ + cfg, + agentId: "main", + accountId: "ops", + currentSessionKey: "agent:main:matrix:channel:!dm:example.org", + target: "@alice:example.org", + resolvedTarget: { + to: "@alice:example.org", + kind: "user", + source: "normalized", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:matrix:channel:!dm:example.org", + baseSessionKey: "agent:main:matrix:channel:!dm:example.org", + peer: { kind: "channel", id: "!dm:example.org" }, + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:!dm:example.org", + }); + }); + + it("falls back to user-scoped routing when the current session is for another DM peer", () => { + const storePath = createTempStore({ + "agent:main:matrix:channel:!dm:example.org": { + sessionId: "sess-1", + updatedAt: Date.now(), + chatType: "direct", + origin: { + chatType: "direct", + from: "matrix:@bob:example.org", + to: "room:!dm:example.org", + accountId: "ops", + }, + deliveryContext: { + channel: "matrix", + to: "room:!dm:example.org", + accountId: "ops", + }, + }, + }); + const cfg = { + session: { + store: storePath, + }, + channels: { + matrix: { + dm: { + sessionScope: "per-room", + }, + }, + }, + } satisfies OpenClawConfig; + + const route = resolveMatrixOutboundSessionRoute({ + cfg, + agentId: "main", + accountId: "ops", + currentSessionKey: "agent:main:matrix:channel:!dm:example.org", + target: "@alice:example.org", + resolvedTarget: { + to: "@alice:example.org", + kind: "user", + source: "normalized", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:main", + baseSessionKey: "agent:main:main", + peer: { kind: "direct", id: "@alice:example.org" }, + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:@alice:example.org", + }); + }); +}); diff --git a/extensions/matrix/src/session-route.ts b/extensions/matrix/src/session-route.ts index 60ac0984060..384d42c6b15 100644 --- a/extensions/matrix/src/session-route.ts +++ b/extensions/matrix/src/session-route.ts @@ -1,29 +1,125 @@ +import { + loadSessionStore, + resolveSessionStoreEntry, + resolveStorePath, +} from "openclaw/plugin-sdk/config-runtime"; import { buildChannelOutboundSessionRoute, - stripChannelTargetPrefix, - stripTargetKindPrefix, type ChannelOutboundSessionRouteParams, } from "openclaw/plugin-sdk/channel-core"; +import { normalizeAccountId } from "openclaw/plugin-sdk/account-id"; +import { resolveMatrixAccountConfig } from "./matrix/account-config.js"; +import { resolveMatrixDirectUserId, resolveMatrixTargetIdentity } from "./matrix/target-ids.js"; + +function resolveMatrixDmSessionScope( + params: Pick, +): "per-user" | "per-room" { + return ( + resolveMatrixAccountConfig({ + cfg: params.cfg, + accountId: params.accountId, + }).dm?.sessionScope ?? "per-user" + ); +} + +function resolveMatrixRoomTargetId(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const target = resolveMatrixTargetIdentity(value); + return target?.kind === "room" && target.id.startsWith("!") ? target.id : undefined; +} + +function resolveMatrixSessionAccountId(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed ? normalizeAccountId(trimmed) : undefined; +} + +function resolveMatrixCurrentDmRoomId(params: { + cfg: ChannelOutboundSessionRouteParams["cfg"]; + agentId: string; + accountId?: string | null; + currentSessionKey?: string; + targetUserId: string; +}): string | undefined { + const sessionKey = params.currentSessionKey?.trim(); + if (!sessionKey) { + return undefined; + } + try { + const storePath = resolveStorePath(params.cfg.session?.store, { + agentId: params.agentId, + }); + const store = loadSessionStore(storePath); + const existing = resolveSessionStoreEntry({ + store, + sessionKey, + }).existing; + if (!existing) { + return undefined; + } + const currentAccountId = + resolveMatrixSessionAccountId( + existing.deliveryContext?.accountId ?? existing.lastAccountId ?? existing.origin?.accountId, + ) ?? undefined; + if (!currentAccountId || currentAccountId !== normalizeAccountId(params.accountId)) { + return undefined; + } + const currentUserId = resolveMatrixDirectUserId({ + from: existing.origin?.from, + to: existing.deliveryContext?.to ?? existing.lastTo ?? existing.origin?.to, + chatType: existing.origin?.chatType ?? existing.chatType, + }); + if (!currentUserId || currentUserId !== params.targetUserId) { + return undefined; + } + return resolveMatrixRoomTargetId( + existing.deliveryContext?.to ?? existing.lastTo ?? existing.origin?.to, + ); + } catch { + return undefined; + } +} export function resolveMatrixOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) { - const stripped = stripChannelTargetPrefix(params.target, "matrix"); - const isUser = - params.resolvedTarget?.kind === "user" || stripped.startsWith("@") || /^user:/i.test(stripped); - const rawId = stripTargetKindPrefix(stripped); - if (!rawId) { + const target = + resolveMatrixTargetIdentity(params.resolvedTarget?.to ?? params.target) ?? + resolveMatrixTargetIdentity(params.target); + if (!target) { return null; } + const roomScopedDmId = + target.kind === "user" && resolveMatrixDmSessionScope(params) === "per-room" + ? resolveMatrixCurrentDmRoomId({ + cfg: params.cfg, + agentId: params.agentId, + accountId: params.accountId, + currentSessionKey: params.currentSessionKey, + targetUserId: target.id, + }) + : undefined; + const peer = + roomScopedDmId !== undefined + ? { kind: "channel" as const, id: roomScopedDmId } + : { + kind: target.kind === "user" ? ("direct" as const) : ("channel" as const), + id: target.id, + }; + const chatType = target.kind === "user" ? "direct" : "channel"; + const from = target.kind === "user" ? `matrix:${target.id}` : `matrix:channel:${target.id}`; + const to = `room:${roomScopedDmId ?? target.id}`; + return buildChannelOutboundSessionRoute({ cfg: params.cfg, agentId: params.agentId, channel: "matrix", accountId: params.accountId, - peer: { - kind: isUser ? "direct" : "channel", - id: rawId, - }, - chatType: isUser ? "direct" : "channel", - from: isUser ? `matrix:${rawId}` : `matrix:channel:${rawId}`, - to: `room:${rawId}`, + peer, + chatType, + from, + to, }); } diff --git a/extensions/matrix/src/types.ts b/extensions/matrix/src/types.ts index 91e6818f8f6..1d8b4c254b2 100644 --- a/extensions/matrix/src/types.ts +++ b/extensions/matrix/src/types.ts @@ -16,6 +16,12 @@ export type MatrixDmConfig = { policy?: DmPolicy; /** Allowlist for DM senders (matrix user IDs or "*"). */ allowFrom?: Array; + /** + * How Matrix DMs map to sessions. + * - `per-user` (default): all DM rooms with the same routed peer share one DM session. + * - `per-room`: each Matrix DM room gets its own session key. + */ + sessionScope?: "per-user" | "per-room"; /** Per-DM thread reply behavior override (off|inbound|always). Overrides top-level threadReplies for direct messages. */ threadReplies?: "off" | "inbound" | "always"; }; diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index f111ee0e873..2ce38d8e575 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -534,6 +534,7 @@ export type ChannelMessagingAdapter = { agentId: string; accountId?: string | null; target: string; + currentSessionKey?: string; resolvedTarget?: { to: string; kind: ChannelDirectoryEntryKind | "channel"; diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index d6018acf5fd..70532e45745 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -502,6 +502,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise