diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ca261347d6..9278da61896 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -200,6 +200,7 @@ Docs: https://docs.openclaw.ai - Agents/video generation: accept `agents.defaults.videoGenerationModel` in strict config validation and `openclaw config set/get`, so gateways using `video_generate` no longer fail to boot after enabling a video model. - Discord/image generation: persist volatile workspace-generated media into durable outbound media before final reply delivery so generated image replies stop failing with missing local workspace paths. - Matrix: move legacy top-level `avatarUrl` into the default account during multi-account promotion and keep env-backed account setup avatar config persisted. (#61437) Thanks @gumadeiras. +- Matrix/DM sessions: add `channels.matrix.dm.sessionScope`, shared-session collision notices, and aligned outbound session reuse so separate Matrix DM rooms can keep distinct context when configured. (#61373) Thanks @gumadeiras. ## 2026.4.2 diff --git a/docs/channels/matrix.md b/docs/channels/matrix.md index 543cff92b42..d58c713f423 100644 --- a/docs/channels/matrix.md +++ b/docs/channels/matrix.md @@ -152,6 +152,7 @@ This is a practical baseline config with DM pairing, room allowlist, and E2EE en dm: { policy: "pairing", + sessionScope: "per-room", threadReplies: "off", }, @@ -522,12 +523,17 @@ The repair flow does not delete old rooms automatically. It only picks the healt Matrix supports native Matrix threads for both automatic replies and message-tool sends. +- `dm.sessionScope: "per-user"` (default) keeps Matrix DM routing sender-scoped, so multiple DM rooms can share one session when they resolve to the same peer. +- `dm.sessionScope: "per-room"` isolates each Matrix DM room into its own session key while still using normal DM auth and allowlist checks. +- Explicit Matrix conversation bindings still win over `dm.sessionScope`, so bound rooms and threads keep their chosen target session. - `threadReplies: "off"` keeps replies top-level and keeps inbound threaded messages on the parent session. - `threadReplies: "inbound"` replies inside a thread only when the inbound message was already in that thread. - `threadReplies: "always"` keeps room replies in a thread rooted at the triggering message and routes that conversation through the matching thread-scoped session from the first triggering message. - `dm.threadReplies` overrides the top-level setting for DMs only. For example, you can keep room threads isolated while keeping DMs flat. - Inbound threaded messages include the thread root message as extra agent context. - Message-tool sends now auto-inherit the current Matrix thread when the target is the same room, or the same DM user target, unless an explicit `threadId` is provided. +- Same-session DM user-target reuse only kicks in when the current session metadata proves the same DM peer on the same Matrix account; otherwise OpenClaw falls back to normal user-scoped routing. +- When OpenClaw sees a Matrix DM room collide with another DM room on the same shared Matrix DM session, it posts a one-time `m.notice` in that room with the `/focus` escape hatch when thread bindings are enabled and the `dm.sessionScope` hint. - Runtime thread bindings are supported for Matrix. `/focus`, `/unfocus`, `/agents`, `/session idle`, `/session max-age`, and thread-bound `/acp spawn` now work in Matrix rooms and DMs. - Top-level Matrix room/DM `/focus` creates a new Matrix thread and binds it to the target session when `threadBindings.spawnSubagentSessions=true`. - Running `/focus` or `/acp spawn --thread here` inside an existing Matrix thread binds that current thread instead. @@ -842,8 +848,9 @@ Live directory lookup uses the logged-in Matrix account: - `mediaMaxMb`: media size cap in MB for Matrix media handling. It applies to outbound sends and inbound media processing. - `autoJoin`: invite auto-join policy (`always`, `allowlist`, `off`). Default: `off`. - `autoJoinAllowlist`: rooms/aliases allowed when `autoJoin` is `allowlist`. Alias entries are resolved to room IDs during invite handling; OpenClaw does not trust alias state claimed by the invited room. -- `dm`: DM policy block (`enabled`, `policy`, `allowFrom`, `threadReplies`). +- `dm`: DM policy block (`enabled`, `policy`, `allowFrom`, `sessionScope`, `threadReplies`). - `dm.allowFrom` entries should be full Matrix user IDs unless you already resolved them through live directory lookup. +- `dm.sessionScope`: `per-user` (default) or `per-room`. Use `per-room` when you want each Matrix DM room to keep separate context even if the peer is the same. - `dm.threadReplies`: DM-only thread policy override (`off`, `inbound`, `always`). It overrides the top-level `threadReplies` setting for both reply placement and session isolation in DMs. - `execApprovals`: Matrix-native exec approval delivery (`enabled`, `approvers`, `target`, `agentFilter`, `sessionFilter`). - `execApprovals.approvers`: Matrix user IDs allowed to approve exec requests. Optional when `dm.allowFrom` already identifies the approvers. diff --git a/docs/gateway/configuration-reference.md b/docs/gateway/configuration-reference.md index 50ac90b172e..63224be8c07 100644 --- a/docs/gateway/configuration-reference.md +++ b/docs/gateway/configuration-reference.md @@ -655,6 +655,7 @@ Matrix is extension-backed and configured under `channels.matrix`. - `sessionFilter`: optional session key patterns (substring or regex). - `target`: where to send approval prompts. `"dm"` (default), `"channel"` (originating room), or `"both"`. - Per-account overrides: `channels.matrix.accounts..execApprovals`. +- `channels.matrix.dm.sessionScope` controls how Matrix DMs group into sessions: `per-user` (default) shares by routed peer, while `per-room` isolates each DM room. - Matrix status probes and live directory lookups use the same proxy policy as runtime traffic. - Full Matrix configuration, targeting rules, and setup examples are documented in [Matrix](/channels/matrix). diff --git a/extensions/matrix/src/config-schema.test.ts b/extensions/matrix/src/config-schema.test.ts index 806c66ceb78..b9c11aa3a02 100644 --- a/extensions/matrix/src/config-schema.test.ts +++ b/extensions/matrix/src/config-schema.test.ts @@ -31,6 +31,18 @@ describe("MatrixConfigSchema SecretInput", () => { expect(result.success).toBe(true); }); + it("accepts dm sessionScope overrides", () => { + const result = MatrixConfigSchema.safeParse({ + homeserver: "https://matrix.example.org", + accessToken: "token", + dm: { + policy: "pairing", + sessionScope: "per-room", + }, + }); + expect(result.success).toBe(true); + }); + it("accepts room-level account assignments", () => { const result = MatrixConfigSchema.safeParse({ homeserver: "https://matrix.example.org", diff --git a/extensions/matrix/src/config-schema.ts b/extensions/matrix/src/config-schema.ts index 716e68f764f..3247ecb69da 100644 --- a/extensions/matrix/src/config-schema.ts +++ b/extensions/matrix/src/config-schema.ts @@ -104,6 +104,7 @@ export const MatrixConfigSchema = z.object({ autoJoinAllowlist: AllowFromListSchema, groupAllowFrom: AllowFromListSchema, dm: buildNestedDmConfigSchema({ + sessionScope: z.enum(["per-user", "per-room"]).optional(), threadReplies: z.enum(["off", "inbound", "always"]).optional(), }), execApprovals: matrixExecApprovalsSchema, diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index 02b424c04a9..393b64ab682 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -31,6 +31,7 @@ type MatrixHandlerTestHarnessOptions = { replyToMode?: ReplyToMode; threadReplies?: "off" | "inbound" | "always"; dmThreadReplies?: "off" | "inbound" | "always"; + dmSessionScope?: "per-user" | "per-room"; streaming?: "partial" | "off"; blockStreamingEnabled?: boolean; dmEnabled?: boolean; @@ -214,6 +215,7 @@ export function createMatrixHandlerTestHarness( replyToMode: options.replyToMode ?? "off", threadReplies: options.threadReplies ?? "inbound", dmThreadReplies: options.dmThreadReplies, + dmSessionScope: options.dmSessionScope, streaming: options.streaming ?? "off", blockStreamingEnabled: options.blockStreamingEnabled ?? false, dmEnabled: options.dmEnabled ?? true, diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 15b7e829f98..d652840cdb6 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -1,3 +1,7 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { recordSessionMetaFromInbound } from "openclaw/plugin-sdk/config-runtime"; import { __testing as sessionBindingTesting, registerSessionBindingAdapter, @@ -682,6 +686,423 @@ describe("matrix monitor handler pairing account scope", () => { ); }); + it("posts a one-time notice when another Matrix DM room already owns the shared DM session", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-shared-notice-")); + const storePath = path.join(tempDir, "sessions.json"); + const sendNotice = vi.fn(async () => "$notice"); + + try { + await recordSessionMetaFromInbound({ + storePath, + sessionKey: "agent:ops:main", + ctx: { + SessionKey: "agent:ops:main", + AccountId: "ops", + ChatType: "direct", + Provider: "matrix", + Surface: "matrix", + From: "matrix:@user:example.org", + To: "room:!other:example.org", + NativeChannelId: "!other:example.org", + OriginatingChannel: "matrix", + OriginatingTo: "room:!other:example.org", + }, + }); + + const { handler } = createMatrixHandlerTestHarness({ + isDirectMessage: true, + resolveStorePath: () => storePath, + client: { + sendMessage: sendNotice, + }, + }); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$dm1", + body: "follow up", + }), + ); + + expect(sendNotice).toHaveBeenCalledWith( + "!dm:example.org", + expect.objectContaining({ + msgtype: "m.notice", + body: expect.stringContaining("channels.matrix.dm.sessionScope"), + }), + ); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$dm2", + body: "again", + }), + ); + + expect(sendNotice).toHaveBeenCalledTimes(1); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + + it("checks flat DM collision notices against the current DM session key", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-flat-notice-")); + const storePath = path.join(tempDir, "sessions.json"); + const sendNotice = vi.fn(async () => "$notice"); + + try { + await recordSessionMetaFromInbound({ + storePath, + sessionKey: "agent:ops:matrix:direct:@user:example.org", + ctx: { + SessionKey: "agent:ops:matrix:direct:@user:example.org", + AccountId: "ops", + ChatType: "direct", + Provider: "matrix", + Surface: "matrix", + From: "matrix:@user:example.org", + To: "room:!other:example.org", + NativeChannelId: "!other:example.org", + OriginatingChannel: "matrix", + OriginatingTo: "room:!other:example.org", + }, + }); + + const { handler } = createMatrixHandlerTestHarness({ + isDirectMessage: true, + resolveStorePath: () => storePath, + resolveAgentRoute: () => ({ + agentId: "ops", + channel: "matrix", + accountId: "ops", + sessionKey: "agent:ops:matrix:direct:@user:example.org", + mainSessionKey: "agent:ops:main", + matchedBy: "binding.account" as const, + }), + client: { + sendMessage: sendNotice, + }, + }); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$dm-flat-1", + body: "follow up", + }), + ); + + expect(sendNotice).toHaveBeenCalledWith( + "!dm:example.org", + expect.objectContaining({ + msgtype: "m.notice", + body: expect.stringContaining("channels.matrix.dm.sessionScope"), + }), + ); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + + it("checks threaded DM collision notices against the parent DM session", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-thread-notice-")); + const storePath = path.join(tempDir, "sessions.json"); + const sendNotice = vi.fn(async () => "$notice"); + + try { + await recordSessionMetaFromInbound({ + storePath, + sessionKey: "agent:ops:main", + ctx: { + SessionKey: "agent:ops:main", + AccountId: "ops", + ChatType: "direct", + Provider: "matrix", + Surface: "matrix", + From: "matrix:@user:example.org", + To: "room:!other:example.org", + NativeChannelId: "!other:example.org", + OriginatingChannel: "matrix", + OriginatingTo: "room:!other:example.org", + }, + }); + + const { handler } = createMatrixHandlerTestHarness({ + isDirectMessage: true, + threadReplies: "always", + resolveStorePath: () => storePath, + client: { + sendMessage: sendNotice, + getEvent: async (_roomId, eventId) => + eventId === "$root" + ? createMatrixTextMessageEvent({ + eventId: "$root", + sender: "@alice:example.org", + body: "Root topic", + }) + : ({ sender: "@bot:example.org" } as never), + }, + getMemberDisplayName: async (_roomId, userId) => + userId === "@alice:example.org" ? "Alice" : "sender", + }); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$reply1", + body: "follow up", + relatesTo: { + rel_type: "m.thread", + event_id: "$root", + "m.in_reply_to": { event_id: "$root" }, + }, + }), + ); + + expect(sendNotice).toHaveBeenCalledWith( + "!dm:example.org", + expect.objectContaining({ + msgtype: "m.notice", + body: expect.stringContaining("channels.matrix.dm.sessionScope"), + }), + ); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + + it("keeps the shared-session notice after user-target outbound metadata overwrites latest room fields", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-shared-notice-stable-")); + const storePath = path.join(tempDir, "sessions.json"); + const sendNotice = vi.fn(async () => "$notice"); + + try { + await recordSessionMetaFromInbound({ + storePath, + sessionKey: "agent:ops:main", + ctx: { + SessionKey: "agent:ops:main", + AccountId: "ops", + ChatType: "direct", + Provider: "matrix", + Surface: "matrix", + From: "matrix:@user:example.org", + To: "room:!other:example.org", + NativeChannelId: "!other:example.org", + OriginatingChannel: "matrix", + OriginatingTo: "room:!other:example.org", + }, + }); + await recordSessionMetaFromInbound({ + storePath, + sessionKey: "agent:ops:main", + ctx: { + SessionKey: "agent:ops:main", + AccountId: "ops", + ChatType: "direct", + Provider: "matrix", + Surface: "matrix", + From: "matrix:@other:example.org", + To: "room:@other:example.org", + NativeDirectUserId: "@user:example.org", + OriginatingChannel: "matrix", + OriginatingTo: "room:@other:example.org", + }, + }); + + const { handler } = createMatrixHandlerTestHarness({ + isDirectMessage: true, + resolveStorePath: () => storePath, + client: { + sendMessage: sendNotice, + }, + }); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$dm1", + body: "follow up", + }), + ); + + expect(sendNotice).toHaveBeenCalledWith( + "!dm:example.org", + expect.objectContaining({ + msgtype: "m.notice", + body: expect.stringContaining("channels.matrix.dm.sessionScope"), + }), + ); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + + it("skips the shared-session notice when the prior Matrix session metadata is not a DM", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-shared-notice-room-")); + const storePath = path.join(tempDir, "sessions.json"); + const sendNotice = vi.fn(async () => "$notice"); + + try { + await recordSessionMetaFromInbound({ + storePath, + sessionKey: "agent:ops:main", + ctx: { + SessionKey: "agent:ops:main", + AccountId: "ops", + ChatType: "group", + Provider: "matrix", + Surface: "matrix", + From: "matrix:channel:!group:example.org", + To: "room:!group:example.org", + NativeChannelId: "!group:example.org", + OriginatingChannel: "matrix", + OriginatingTo: "room:!group:example.org", + }, + }); + + const { handler } = createMatrixHandlerTestHarness({ + isDirectMessage: true, + resolveStorePath: () => storePath, + client: { + sendMessage: sendNotice, + }, + }); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$dm1", + body: "follow up", + }), + ); + + expect(sendNotice).not.toHaveBeenCalled(); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + + it("skips the shared-session notice when Matrix DMs are isolated per room", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-room-scope-")); + const storePath = path.join(tempDir, "sessions.json"); + fs.writeFileSync( + storePath, + JSON.stringify({ + "agent:ops:main": { + sessionId: "sess-main", + updatedAt: Date.now(), + deliveryContext: { + channel: "matrix", + to: "room:!other:example.org", + accountId: "ops", + }, + }, + }), + "utf8", + ); + const sendNotice = vi.fn(async () => "$notice"); + + try { + const { handler, recordInboundSession } = createMatrixHandlerTestHarness({ + isDirectMessage: true, + dmSessionScope: "per-room", + resolveStorePath: () => storePath, + client: { + sendMessage: sendNotice, + }, + }); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$dm1", + body: "follow up", + }), + ); + + expect(sendNotice).not.toHaveBeenCalled(); + expect(recordInboundSession).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "agent:ops:matrix:channel:!dm:example.org", + }), + ); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + + it("skips the shared-session notice when a Matrix DM is explicitly bound", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-bound-notice-")); + const storePath = path.join(tempDir, "sessions.json"); + fs.writeFileSync( + storePath, + JSON.stringify({ + "agent:bound:session-1": { + sessionId: "sess-bound", + updatedAt: Date.now(), + deliveryContext: { + channel: "matrix", + to: "room:!other:example.org", + accountId: "ops", + }, + }, + }), + "utf8", + ); + const sendNotice = vi.fn(async () => "$notice"); + const touch = vi.fn(); + registerSessionBindingAdapter({ + channel: "matrix", + accountId: "ops", + listBySession: () => [], + resolveByConversation: (ref) => + ref.conversationId === "!dm:example.org" + ? { + bindingId: "ops:!dm:example.org", + targetSessionKey: "agent:bound:session-1", + targetKind: "session", + conversation: { + channel: "matrix", + accountId: "ops", + conversationId: "!dm:example.org", + }, + status: "active", + boundAt: Date.now(), + metadata: { + boundBy: "user-1", + }, + } + : null, + touch, + }); + + try { + const { handler } = createMatrixHandlerTestHarness({ + isDirectMessage: true, + resolveStorePath: () => storePath, + client: { + sendMessage: sendNotice, + }, + }); + + await handler( + "!dm:example.org", + createMatrixTextMessageEvent({ + eventId: "$dm-bound-1", + body: "follow up", + }), + ); + + expect(sendNotice).not.toHaveBeenCalled(); + expect(touch).toHaveBeenCalledOnce(); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + it("uses stable room ids instead of room-declared aliases in group context", async () => { const { handler, finalizeInboundContext } = createMatrixHandlerTestHarness({ isDirectMessage: false, diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 66514bafb2c..190bef919fd 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -1,5 +1,9 @@ import { resolveControlCommandGate } from "openclaw/plugin-sdk/command-auth"; -import { resolveChannelContextVisibilityMode } from "openclaw/plugin-sdk/config-runtime"; +import { + loadSessionStore, + resolveChannelContextVisibilityMode, + resolveSessionStoreEntry, +} from "openclaw/plugin-sdk/config-runtime"; import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-runtime"; import { evaluateSupplementalContextVisibility } from "openclaw/plugin-sdk/security-runtime"; import type { CoreConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js"; @@ -27,6 +31,7 @@ import { sendReadReceiptMatrix, sendTypingMatrix, } from "../send.js"; +import { resolveMatrixStoredSessionMeta } from "../session-store-metadata.js"; import { resolveMatrixMonitorAccessState } from "./access-state.js"; import { resolveMatrixAckReactionConfig } from "./ack-config.js"; import { resolveMatrixAllowListMatch } from "./allowlist.js"; @@ -68,6 +73,7 @@ import { isMatrixVerificationRoomMessage } from "./verification-utils.js"; const ALLOW_FROM_STORE_CACHE_TTL_MS = 30_000; const PAIRING_REPLY_COOLDOWN_MS = 5 * 60_000; const MAX_TRACKED_PAIRING_REPLY_SENDERS = 512; +const MAX_TRACKED_SHARED_DM_CONTEXT_NOTICES = 512; type MatrixAllowBotsMode = "off" | "mentions" | "all"; export type MatrixMonitorHandlerParams = { @@ -88,6 +94,8 @@ export type MatrixMonitorHandlerParams = { threadReplies: "off" | "inbound" | "always"; /** DM-specific threadReplies override. Falls back to threadReplies when absent. */ dmThreadReplies?: "off" | "inbound" | "always"; + /** DM session grouping behavior. */ + dmSessionScope?: "per-user" | "per-room"; streaming: "partial" | "off"; blockStreamingEnabled: boolean; dmEnabled: boolean; @@ -163,6 +171,73 @@ function resolveMatrixInboundBodyText(params: { }); } +function markTrackedRoomIfFirst(set: Set, roomId: string): boolean { + if (set.has(roomId)) { + return false; + } + set.add(roomId); + if (set.size > MAX_TRACKED_SHARED_DM_CONTEXT_NOTICES) { + const oldest = set.keys().next().value; + if (typeof oldest === "string") { + set.delete(oldest); + } + } + return true; +} + +function resolveMatrixSharedDmContextNotice(params: { + storePath: string; + sessionKey: string; + roomId: string; + accountId: string; + dmSessionScope?: "per-user" | "per-room"; + sentRooms: Set; + logVerboseMessage: (message: string) => void; +}): string | null { + if ((params.dmSessionScope ?? "per-user") === "per-room") { + return null; + } + if (params.sentRooms.has(params.roomId)) { + return null; + } + + try { + const store = loadSessionStore(params.storePath); + const currentSession = resolveMatrixStoredSessionMeta( + resolveSessionStoreEntry({ + store, + sessionKey: params.sessionKey, + }).existing, + ); + if (!currentSession) { + return null; + } + if (currentSession.channel && currentSession.channel !== "matrix") { + return null; + } + if (currentSession.accountId && currentSession.accountId !== params.accountId) { + return null; + } + if (!currentSession.directUserId) { + return null; + } + if (!currentSession.roomId || currentSession.roomId === params.roomId) { + return null; + } + + return [ + "This Matrix DM is sharing a session with another Matrix DM room.", + "Use /focus here for a one-off isolated thread session when thread bindings are enabled, or set", + "channels.matrix.dm.sessionScope to per-room to isolate each Matrix DM room.", + ].join(" "); + } catch (err) { + params.logVerboseMessage( + `matrix: failed checking shared DM session notice room=${params.roomId} (${String(err)})`, + ); + return null; + } +} + function resolveMatrixPendingHistoryText(params: { mentionPrecheckText: string; content: RoomMessageEventContent; @@ -214,6 +289,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam replyToMode, threadReplies, dmThreadReplies, + dmSessionScope, streaming, blockStreamingEnabled, dmEnabled, @@ -252,6 +328,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }); const roomHistoryTracker = createRoomHistoryTracker(); const roomIngressTails = new Map>(); + const sharedDmContextNoticeRooms = new Set(); const readStoreAllowFrom = async (): Promise => { const now = Date.now(); @@ -672,10 +749,12 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam roomId, senderId, isDirectMessage, + dmSessionScope, threadId: thread.threadId, eventTs: eventTs ?? undefined, resolveAgentRoute: core.channel.routing.resolveAgentRoute, }); + const hasExplicitSessionBinding = _configuredBinding !== null || _runtimeBindingId !== null; const agentMentionRegexes = core.channel.mentions.buildMentionRegexes(cfg, _route.agentId); const selfDisplayName = content.formatted_body ? await getMemberDisplayName(roomId, selfUserId).catch(() => undefined) @@ -870,6 +949,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam return { route: _route, + hasExplicitSessionBinding, roomConfig, isDirectMessage, isRoom, @@ -922,6 +1002,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const { route: _route, + hasExplicitSessionBinding, roomConfig, isDirectMessage, isRoom, @@ -1023,6 +1104,22 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam storePath, sessionKey: _route.sessionKey, }); + const sharedDmNoticeSessionKey = threadTarget + ? _route.mainSessionKey || _route.sessionKey + : _route.sessionKey; + const sharedDmContextNotice = isDirectMessage + ? hasExplicitSessionBinding + ? null + : resolveMatrixSharedDmContextNotice({ + storePath, + sessionKey: sharedDmNoticeSessionKey, + roomId, + accountId: _route.accountId, + dmSessionScope, + sentRooms: sharedDmContextNoticeRooms, + logVerboseMessage, + }) + : null; const body = core.channel.reply.formatAgentEnvelope({ channel: "Matrix", from: envelopeFrom, @@ -1065,6 +1162,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam ...locationPayload?.context, CommandAuthorized: commandAuthorized, CommandSource: "text" as const, + NativeChannelId: roomId, + NativeDirectUserId: isDirectMessage ? senderId : undefined, OriginatingChannel: "matrix" as const, OriginatingTo: `room:${roomId}`, }); @@ -1090,6 +1189,19 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }, }); + if (sharedDmContextNotice && markTrackedRoomIfFirst(sharedDmContextNoticeRooms, roomId)) { + client + .sendMessage(roomId, { + msgtype: "m.notice", + body: sharedDmContextNotice, + }) + .catch((err) => { + logVerboseMessage( + `matrix: failed sending shared DM session notice room=${roomId}: ${String(err)}`, + ); + }); + } + const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n"); logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`); diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 40f1768ee13..5c70504c2ce 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -207,6 +207,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const dmEnabled = dmConfig?.enabled ?? true; const dmPolicyRaw = dmConfig?.policy ?? "pairing"; const dmPolicy = allowlistOnly && dmPolicyRaw !== "disabled" ? "allowlist" : dmPolicyRaw; + const dmSessionScope = dmConfig?.sessionScope ?? "per-user"; const textLimit = core.channel.text.resolveTextChunkLimit(cfg, "matrix", effectiveAccountId); const globalGroupChatHistoryLimit = ( cfg.messages as { groupChat?: { historyLimit?: number } } | undefined @@ -271,6 +272,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi replyToMode, threadReplies, dmThreadReplies, + dmSessionScope, streaming, blockStreamingEnabled, dmEnabled, diff --git a/extensions/matrix/src/matrix/monitor/reaction-events.ts b/extensions/matrix/src/matrix/monitor/reaction-events.ts index f475f9ef975..46924f2609d 100644 --- a/extensions/matrix/src/matrix/monitor/reaction-events.ts +++ b/extensions/matrix/src/matrix/monitor/reaction-events.ts @@ -169,6 +169,7 @@ export async function handleInboundMatrixReaction(params: { roomId: params.roomId, senderId: params.senderId, isDirectMessage: params.isDirectMessage, + dmSessionScope: accountConfig.dm?.sessionScope ?? "per-user", threadId: thread.threadId, eventTs: params.event.origin_server_ts, resolveAgentRoute: params.core.channel.routing.resolveAgentRoute, diff --git a/extensions/matrix/src/matrix/monitor/route.test.ts b/extensions/matrix/src/matrix/monitor/route.test.ts index 0ae00df7dd9..1eefd21f960 100644 --- a/extensions/matrix/src/matrix/monitor/route.test.ts +++ b/extensions/matrix/src/matrix/monitor/route.test.ts @@ -17,13 +17,19 @@ const baseCfg = { }, } satisfies OpenClawConfig; -function resolveDmRoute(cfg: OpenClawConfig) { +function resolveDmRoute( + cfg: OpenClawConfig, + opts: { + dmSessionScope?: "per-user" | "per-room"; + } = {}, +) { return resolveMatrixInboundRoute({ cfg, accountId: "ops", roomId: "!dm:example.org", senderId: "@alice:example.org", isDirectMessage: true, + dmSessionScope: opts.dmSessionScope, resolveAgentRoute, }); } @@ -97,6 +103,33 @@ describe("resolveMatrixInboundRoute", () => { expect(route.sessionKey).toBe("agent:room-agent:main"); }); + it("can isolate Matrix DMs per room without changing agent selection", () => { + const cfg = { + ...baseCfg, + bindings: [ + { + agentId: "sender-agent", + match: { + channel: "matrix", + accountId: "ops", + peer: { kind: "direct", id: "@alice:example.org" }, + }, + }, + ], + } satisfies OpenClawConfig; + + const { route, configuredBinding } = resolveDmRoute(cfg, { + dmSessionScope: "per-room", + }); + + expect(configuredBinding).toBeNull(); + expect(route.agentId).toBe("sender-agent"); + expect(route.matchedBy).toBe("binding.peer"); + expect(route.sessionKey).toBe("agent:sender-agent:matrix:channel:!dm:example.org"); + expect(route.mainSessionKey).toBe("agent:sender-agent:main"); + expect(route.lastRoutePolicy).toBe("session"); + }); + it("lets configured ACP room bindings override DM parent-peer routing", () => { const cfg = { ...baseCfg, @@ -130,6 +163,42 @@ describe("resolveMatrixInboundRoute", () => { expect(route.lastRoutePolicy).toBe("session"); }); + it("keeps configured ACP room bindings ahead of per-room DM session scope", () => { + const cfg = { + ...baseCfg, + bindings: [ + { + agentId: "room-agent", + match: { + channel: "matrix", + accountId: "ops", + peer: { kind: "channel", id: "!dm:example.org" }, + }, + }, + { + type: "acp", + agentId: "acp-agent", + match: { + channel: "matrix", + accountId: "ops", + peer: { kind: "channel", id: "!dm:example.org" }, + }, + }, + ], + } satisfies OpenClawConfig; + + const { route, configuredBinding } = resolveDmRoute(cfg, { + dmSessionScope: "per-room", + }); + + expect(configuredBinding?.spec.agentId).toBe("acp-agent"); + expect(route.agentId).toBe("acp-agent"); + expect(route.matchedBy).toBe("binding.channel"); + expect(route.sessionKey).toContain("agent:acp-agent:acp:binding:matrix:ops:"); + expect(route.sessionKey).not.toBe("agent:acp-agent:matrix:channel:!dm:example.org"); + expect(route.lastRoutePolicy).toBe("session"); + }); + it("lets runtime conversation bindings override both sender and room route matches", () => { const touch = vi.fn(); registerSessionBindingAdapter({ diff --git a/extensions/matrix/src/matrix/monitor/route.ts b/extensions/matrix/src/matrix/monitor/route.ts index bb052574cd7..000cf4d65ef 100644 --- a/extensions/matrix/src/matrix/monitor/route.ts +++ b/extensions/matrix/src/matrix/monitor/route.ts @@ -1,4 +1,4 @@ -import { deriveLastRoutePolicy } from "openclaw/plugin-sdk/routing"; +import { buildAgentSessionKey, deriveLastRoutePolicy } from "openclaw/plugin-sdk/routing"; import { getSessionBindingService, resolveAgentIdFromSessionKey, @@ -10,12 +10,41 @@ import { resolveMatrixThreadSessionKeys } from "./threads.js"; type MatrixResolvedRoute = ReturnType; +function resolveMatrixDmSessionKey(params: { + accountId: string; + agentId: string; + roomId: string; + dmSessionScope?: "per-user" | "per-room"; + fallbackSessionKey: string; +}): string { + if (params.dmSessionScope !== "per-room") { + return params.fallbackSessionKey; + } + return buildAgentSessionKey({ + agentId: params.agentId, + channel: "matrix", + accountId: params.accountId, + peer: { + kind: "channel", + id: params.roomId, + }, + }).toLowerCase(); +} + +function shouldApplyMatrixPerRoomDmSessionScope(params: { + isDirectMessage: boolean; + configuredSessionKey?: string; +}): boolean { + return params.isDirectMessage && !params.configuredSessionKey; +} + export function resolveMatrixInboundRoute(params: { cfg: CoreConfig; accountId: string; roomId: string; senderId: string; isDirectMessage: boolean; + dmSessionScope?: "per-user" | "per-room"; threadId?: string; eventTs?: number; resolveAgentRoute: PluginRuntime["channel"]["routing"]["resolveAgentRoute"]; @@ -98,21 +127,42 @@ export function resolveMatrixInboundRoute(params: { } : baseRoute; + const dmSessionKey = shouldApplyMatrixPerRoomDmSessionScope({ + isDirectMessage: params.isDirectMessage, + configuredSessionKey, + }) + ? resolveMatrixDmSessionKey({ + accountId: params.accountId, + agentId: effectiveRoute.agentId, + roomId: params.roomId, + dmSessionScope: params.dmSessionScope, + fallbackSessionKey: effectiveRoute.sessionKey, + }) + : effectiveRoute.sessionKey; + const routeWithDmScope = + dmSessionKey === effectiveRoute.sessionKey + ? effectiveRoute + : { + ...effectiveRoute, + sessionKey: dmSessionKey, + lastRoutePolicy: "session" as const, + }; + // When no binding overrides the session key, isolate threads into their own sessions. if (!configuredBinding && !configuredSessionKey && params.threadId) { const threadKeys = resolveMatrixThreadSessionKeys({ - baseSessionKey: effectiveRoute.sessionKey, + baseSessionKey: routeWithDmScope.sessionKey, threadId: params.threadId, - parentSessionKey: effectiveRoute.sessionKey, + parentSessionKey: routeWithDmScope.sessionKey, }); return { route: { - ...effectiveRoute, + ...routeWithDmScope, sessionKey: threadKeys.sessionKey, - mainSessionKey: threadKeys.parentSessionKey ?? effectiveRoute.sessionKey, + mainSessionKey: threadKeys.parentSessionKey ?? routeWithDmScope.sessionKey, lastRoutePolicy: deriveLastRoutePolicy({ sessionKey: threadKeys.sessionKey, - mainSessionKey: threadKeys.parentSessionKey ?? effectiveRoute.sessionKey, + mainSessionKey: threadKeys.parentSessionKey ?? routeWithDmScope.sessionKey, }), }, configuredBinding, @@ -121,7 +171,7 @@ export function resolveMatrixInboundRoute(params: { } return { - route: effectiveRoute, + route: routeWithDmScope, configuredBinding, runtimeBindingId: null, }; diff --git a/extensions/matrix/src/matrix/session-store-metadata.ts b/extensions/matrix/src/matrix/session-store-metadata.ts new file mode 100644 index 00000000000..9663dbcba43 --- /dev/null +++ b/extensions/matrix/src/matrix/session-store-metadata.ts @@ -0,0 +1,108 @@ +import { normalizeAccountId } from "openclaw/plugin-sdk/account-id"; +import { resolveMatrixDirectUserId, resolveMatrixTargetIdentity } from "./target-ids.js"; + +export function trimMaybeString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; +} + +function resolveMatrixRoomTargetId(value: unknown): string | undefined { + const trimmed = trimMaybeString(value); + if (!trimmed) { + return undefined; + } + const target = resolveMatrixTargetIdentity(trimmed); + return target?.kind === "room" && target.id.startsWith("!") ? target.id : undefined; +} + +export function resolveMatrixSessionAccountId(value: unknown): string | undefined { + const trimmed = trimMaybeString(value); + return trimmed ? normalizeAccountId(trimmed) : undefined; +} + +export function resolveMatrixStoredRoomId(params: { + deliveryTo?: unknown; + lastTo?: unknown; + originNativeChannelId?: unknown; + originTo?: unknown; +}): string | undefined { + return ( + resolveMatrixRoomTargetId(params.deliveryTo) ?? + resolveMatrixRoomTargetId(params.lastTo) ?? + resolveMatrixRoomTargetId(params.originNativeChannelId) ?? + resolveMatrixRoomTargetId(params.originTo) + ); +} + +type MatrixStoredSessionEntryLike = { + deliveryContext?: { + channel?: unknown; + to?: unknown; + accountId?: unknown; + }; + origin?: { + provider?: unknown; + from?: unknown; + to?: unknown; + nativeChannelId?: unknown; + nativeDirectUserId?: unknown; + accountId?: unknown; + chatType?: unknown; + }; + lastChannel?: unknown; + lastTo?: unknown; + lastAccountId?: unknown; + chatType?: unknown; +}; + +export function resolveMatrixStoredSessionMeta(entry?: MatrixStoredSessionEntryLike): { + channel?: string; + accountId?: string; + roomId?: string; + directUserId?: string; +} | null { + if (!entry) { + return null; + } + const channel = + trimMaybeString(entry.deliveryContext?.channel) ?? + trimMaybeString(entry.lastChannel) ?? + trimMaybeString(entry.origin?.provider); + const accountId = + resolveMatrixSessionAccountId( + entry.deliveryContext?.accountId ?? entry.lastAccountId ?? entry.origin?.accountId, + ) ?? undefined; + const roomId = resolveMatrixStoredRoomId({ + deliveryTo: entry.deliveryContext?.to, + lastTo: entry.lastTo, + originNativeChannelId: entry.origin?.nativeChannelId, + originTo: entry.origin?.to, + }); + const chatType = + trimMaybeString(entry.origin?.chatType) ?? trimMaybeString(entry.chatType) ?? undefined; + const directUserId = + chatType === "direct" + ? (trimMaybeString(entry.origin?.nativeDirectUserId) ?? + resolveMatrixDirectUserId({ + from: trimMaybeString(entry.origin?.from), + to: + (roomId ? `room:${roomId}` : undefined) ?? + trimMaybeString(entry.deliveryContext?.to) ?? + trimMaybeString(entry.lastTo) ?? + trimMaybeString(entry.origin?.to), + chatType, + })) + : undefined; + if (!channel && !accountId && !roomId && !directUserId) { + return null; + } + return { + ...(channel ? { channel } : {}), + ...(accountId ? { accountId } : {}), + ...(roomId ? { roomId } : {}), + ...(directUserId ? { directUserId } : {}), + }; +} diff --git a/extensions/matrix/src/session-route.test.ts b/extensions/matrix/src/session-route.test.ts new file mode 100644 index 00000000000..efd649ea97a --- /dev/null +++ b/extensions/matrix/src/session-route.test.ts @@ -0,0 +1,483 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import type { OpenClawConfig } from "./runtime-api.js"; +import { resolveMatrixOutboundSessionRoute } from "./session-route.js"; + +const tempDirs = new Set(); + +function createTempStore(entries: Record): string { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-session-route-")); + tempDirs.add(tempDir); + const storePath = path.join(tempDir, "sessions.json"); + fs.writeFileSync(storePath, JSON.stringify(entries), "utf8"); + return storePath; +} + +afterEach(() => { + for (const tempDir of tempDirs) { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + tempDirs.clear(); +}); + +describe("resolveMatrixOutboundSessionRoute", () => { + it("reuses the current DM room session for same-user sends when Matrix DMs are per-room", () => { + const storePath = createTempStore({ + "agent:main:matrix:channel:!dm:example.org": { + sessionId: "sess-1", + updatedAt: Date.now(), + chatType: "direct", + origin: { + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:!dm:example.org", + accountId: "ops", + }, + deliveryContext: { + channel: "matrix", + to: "room:!dm:example.org", + accountId: "ops", + }, + }, + }); + const cfg = { + session: { + store: storePath, + }, + channels: { + matrix: { + dm: { + sessionScope: "per-room", + }, + }, + }, + } satisfies OpenClawConfig; + + const route = resolveMatrixOutboundSessionRoute({ + cfg, + agentId: "main", + accountId: "ops", + currentSessionKey: "agent:main:matrix:channel:!dm:example.org", + target: "@alice:example.org", + resolvedTarget: { + to: "@alice:example.org", + kind: "user", + source: "normalized", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:matrix:channel:!dm:example.org", + baseSessionKey: "agent:main:matrix:channel:!dm:example.org", + peer: { kind: "channel", id: "!dm:example.org" }, + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:!dm:example.org", + }); + }); + + it("falls back to user-scoped routing when the current session is for another DM peer", () => { + const storePath = createTempStore({ + "agent:main:matrix:channel:!dm:example.org": { + sessionId: "sess-1", + updatedAt: Date.now(), + chatType: "direct", + origin: { + chatType: "direct", + from: "matrix:@bob:example.org", + to: "room:!dm:example.org", + accountId: "ops", + }, + deliveryContext: { + channel: "matrix", + to: "room:!dm:example.org", + accountId: "ops", + }, + }, + }); + const cfg = { + session: { + store: storePath, + }, + channels: { + matrix: { + dm: { + sessionScope: "per-room", + }, + }, + }, + } satisfies OpenClawConfig; + + const route = resolveMatrixOutboundSessionRoute({ + cfg, + agentId: "main", + accountId: "ops", + currentSessionKey: "agent:main:matrix:channel:!dm:example.org", + target: "@alice:example.org", + resolvedTarget: { + to: "@alice:example.org", + kind: "user", + source: "normalized", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:main", + baseSessionKey: "agent:main:main", + peer: { kind: "direct", id: "@alice:example.org" }, + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:@alice:example.org", + }); + }); + + it("falls back to user-scoped routing when the current session belongs to another Matrix account", () => { + const storePath = createTempStore({ + "agent:main:matrix:channel:!dm:example.org": { + sessionId: "sess-1", + updatedAt: Date.now(), + chatType: "direct", + origin: { + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:!dm:example.org", + accountId: "ops", + }, + deliveryContext: { + channel: "matrix", + to: "room:!dm:example.org", + accountId: "ops", + }, + }, + }); + const cfg = { + session: { + store: storePath, + }, + channels: { + matrix: { + dm: { + sessionScope: "per-room", + }, + }, + }, + } satisfies OpenClawConfig; + + const route = resolveMatrixOutboundSessionRoute({ + cfg, + agentId: "main", + accountId: "support", + currentSessionKey: "agent:main:matrix:channel:!dm:example.org", + target: "@alice:example.org", + resolvedTarget: { + to: "@alice:example.org", + kind: "user", + source: "normalized", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:main", + baseSessionKey: "agent:main:main", + peer: { kind: "direct", id: "@alice:example.org" }, + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:@alice:example.org", + }); + }); + + it("reuses the canonical DM room after user-target outbound metadata overwrites latest to fields", () => { + const storePath = createTempStore({ + "agent:main:matrix:channel:!dm:example.org": { + sessionId: "sess-1", + updatedAt: Date.now(), + chatType: "direct", + origin: { + chatType: "direct", + from: "matrix:@bob:example.org", + to: "room:@bob:example.org", + nativeChannelId: "!dm:example.org", + nativeDirectUserId: "@alice:example.org", + accountId: "ops", + }, + deliveryContext: { + channel: "matrix", + to: "room:@bob:example.org", + accountId: "ops", + }, + lastTo: "room:@bob:example.org", + lastAccountId: "ops", + }, + }); + const cfg = { + session: { + store: storePath, + }, + channels: { + matrix: { + dm: { + sessionScope: "per-room", + }, + }, + }, + } satisfies OpenClawConfig; + + const route = resolveMatrixOutboundSessionRoute({ + cfg, + agentId: "main", + accountId: "ops", + currentSessionKey: "agent:main:matrix:channel:!dm:example.org", + target: "@alice:example.org", + resolvedTarget: { + to: "@alice:example.org", + kind: "user", + source: "normalized", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:matrix:channel:!dm:example.org", + baseSessionKey: "agent:main:matrix:channel:!dm:example.org", + peer: { kind: "channel", id: "!dm:example.org" }, + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:!dm:example.org", + }); + }); + + it("does not reuse the canonical DM room for a different Matrix user after latest metadata drift", () => { + const storePath = createTempStore({ + "agent:main:matrix:channel:!dm:example.org": { + sessionId: "sess-1", + updatedAt: Date.now(), + chatType: "direct", + origin: { + chatType: "direct", + from: "matrix:@bob:example.org", + to: "room:@bob:example.org", + nativeChannelId: "!dm:example.org", + nativeDirectUserId: "@alice:example.org", + accountId: "ops", + }, + deliveryContext: { + channel: "matrix", + to: "room:@bob:example.org", + accountId: "ops", + }, + lastTo: "room:@bob:example.org", + lastAccountId: "ops", + }, + }); + const cfg = { + session: { + store: storePath, + }, + channels: { + matrix: { + dm: { + sessionScope: "per-room", + }, + }, + }, + } satisfies OpenClawConfig; + + const route = resolveMatrixOutboundSessionRoute({ + cfg, + agentId: "main", + accountId: "ops", + currentSessionKey: "agent:main:matrix:channel:!dm:example.org", + target: "@bob:example.org", + resolvedTarget: { + to: "@bob:example.org", + kind: "user", + source: "normalized", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:main", + baseSessionKey: "agent:main:main", + peer: { kind: "direct", id: "@bob:example.org" }, + chatType: "direct", + from: "matrix:@bob:example.org", + to: "room:@bob:example.org", + }); + }); + + it("does not reuse a room after the session metadata was overwritten by a non-DM Matrix send", () => { + const storePath = createTempStore({ + "agent:main:matrix:channel:!dm:example.org": { + sessionId: "sess-1", + updatedAt: Date.now(), + chatType: "channel", + origin: { + chatType: "channel", + from: "matrix:channel:!ops:example.org", + to: "room:!ops:example.org", + nativeChannelId: "!ops:example.org", + nativeDirectUserId: "@alice:example.org", + accountId: "ops", + }, + deliveryContext: { + channel: "matrix", + to: "room:!ops:example.org", + accountId: "ops", + }, + lastTo: "room:!ops:example.org", + lastAccountId: "ops", + }, + }); + const cfg = { + session: { + store: storePath, + }, + channels: { + matrix: { + dm: { + sessionScope: "per-room", + }, + }, + }, + } satisfies OpenClawConfig; + + const route = resolveMatrixOutboundSessionRoute({ + cfg, + agentId: "main", + accountId: "ops", + currentSessionKey: "agent:main:matrix:channel:!dm:example.org", + target: "@alice:example.org", + resolvedTarget: { + to: "@alice:example.org", + kind: "user", + source: "normalized", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:main", + baseSessionKey: "agent:main:main", + peer: { kind: "direct", id: "@alice:example.org" }, + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:@alice:example.org", + }); + }); + + it("uses the effective default Matrix account when accountId is omitted", () => { + const storePath = createTempStore({ + "agent:main:matrix:channel:!dm:example.org": { + sessionId: "sess-1", + updatedAt: Date.now(), + chatType: "direct", + origin: { + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:!dm:example.org", + accountId: "ops", + }, + deliveryContext: { + channel: "matrix", + to: "room:!dm:example.org", + accountId: "ops", + }, + }, + }); + const cfg = { + session: { + store: storePath, + }, + channels: { + matrix: { + defaultAccount: "ops", + accounts: { + ops: { + dm: { + sessionScope: "per-room", + }, + }, + }, + }, + }, + } satisfies OpenClawConfig; + + const route = resolveMatrixOutboundSessionRoute({ + cfg, + agentId: "main", + currentSessionKey: "agent:main:matrix:channel:!dm:example.org", + target: "@alice:example.org", + resolvedTarget: { + to: "@alice:example.org", + kind: "user", + source: "normalized", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:matrix:channel:!dm:example.org", + baseSessionKey: "agent:main:matrix:channel:!dm:example.org", + peer: { kind: "channel", id: "!dm:example.org" }, + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:!dm:example.org", + }); + }); + + it("reuses the current DM room when stored account metadata is missing", () => { + const storePath = createTempStore({ + "agent:main:matrix:channel:!dm:example.org": { + sessionId: "sess-1", + updatedAt: Date.now(), + chatType: "direct", + origin: { + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:!dm:example.org", + }, + deliveryContext: { + channel: "matrix", + to: "room:!dm:example.org", + }, + }, + }); + const cfg = { + session: { + store: storePath, + }, + channels: { + matrix: { + defaultAccount: "ops", + accounts: { + ops: { + dm: { + sessionScope: "per-room", + }, + }, + }, + }, + }, + } satisfies OpenClawConfig; + + const route = resolveMatrixOutboundSessionRoute({ + cfg, + agentId: "main", + currentSessionKey: "agent:main:matrix:channel:!dm:example.org", + target: "@alice:example.org", + resolvedTarget: { + to: "@alice:example.org", + kind: "user", + source: "normalized", + }, + }); + + expect(route).toMatchObject({ + sessionKey: "agent:main:matrix:channel:!dm:example.org", + baseSessionKey: "agent:main:matrix:channel:!dm:example.org", + peer: { kind: "channel", id: "!dm:example.org" }, + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:!dm:example.org", + }); + }); +}); diff --git a/extensions/matrix/src/session-route.ts b/extensions/matrix/src/session-route.ts index 60ac0984060..cb40bf9e309 100644 --- a/extensions/matrix/src/session-route.ts +++ b/extensions/matrix/src/session-route.ts @@ -1,29 +1,113 @@ +import { normalizeAccountId } from "openclaw/plugin-sdk/account-id"; import { buildChannelOutboundSessionRoute, - stripChannelTargetPrefix, - stripTargetKindPrefix, type ChannelOutboundSessionRouteParams, } from "openclaw/plugin-sdk/channel-core"; +import { + loadSessionStore, + resolveSessionStoreEntry, + resolveStorePath, +} from "openclaw/plugin-sdk/config-runtime"; +import { resolveMatrixAccountConfig } from "./matrix/account-config.js"; +import { resolveDefaultMatrixAccountId } from "./matrix/accounts.js"; +import { resolveMatrixStoredSessionMeta } from "./matrix/session-store-metadata.js"; +import { resolveMatrixTargetIdentity } from "./matrix/target-ids.js"; + +function resolveEffectiveMatrixAccountId( + params: Pick, +): string { + return normalizeAccountId(params.accountId ?? resolveDefaultMatrixAccountId(params.cfg)); +} + +function resolveMatrixDmSessionScope(params: { + cfg: ChannelOutboundSessionRouteParams["cfg"]; + accountId: string; +}): "per-user" | "per-room" { + return ( + resolveMatrixAccountConfig({ + cfg: params.cfg, + accountId: params.accountId, + }).dm?.sessionScope ?? "per-user" + ); +} + +function resolveMatrixCurrentDmRoomId(params: { + cfg: ChannelOutboundSessionRouteParams["cfg"]; + agentId: string; + accountId: string; + currentSessionKey?: string; + targetUserId: string; +}): string | undefined { + const sessionKey = params.currentSessionKey?.trim(); + if (!sessionKey) { + return undefined; + } + try { + const storePath = resolveStorePath(params.cfg.session?.store, { + agentId: params.agentId, + }); + const store = loadSessionStore(storePath); + const existing = resolveSessionStoreEntry({ + store, + sessionKey, + }).existing; + const currentSession = resolveMatrixStoredSessionMeta(existing); + if (!currentSession) { + return undefined; + } + if (currentSession.accountId && currentSession.accountId !== params.accountId) { + return undefined; + } + if (!currentSession.directUserId || currentSession.directUserId !== params.targetUserId) { + return undefined; + } + return currentSession.roomId; + } catch { + return undefined; + } +} export function resolveMatrixOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) { - const stripped = stripChannelTargetPrefix(params.target, "matrix"); - const isUser = - params.resolvedTarget?.kind === "user" || stripped.startsWith("@") || /^user:/i.test(stripped); - const rawId = stripTargetKindPrefix(stripped); - if (!rawId) { + const target = + resolveMatrixTargetIdentity(params.resolvedTarget?.to ?? params.target) ?? + resolveMatrixTargetIdentity(params.target); + if (!target) { return null; } + const effectiveAccountId = resolveEffectiveMatrixAccountId(params); + const roomScopedDmId = + target.kind === "user" && + resolveMatrixDmSessionScope({ + cfg: params.cfg, + accountId: effectiveAccountId, + }) === "per-room" + ? resolveMatrixCurrentDmRoomId({ + cfg: params.cfg, + agentId: params.agentId, + accountId: effectiveAccountId, + currentSessionKey: params.currentSessionKey, + targetUserId: target.id, + }) + : undefined; + const peer = + roomScopedDmId !== undefined + ? { kind: "channel" as const, id: roomScopedDmId } + : { + kind: target.kind === "user" ? ("direct" as const) : ("channel" as const), + id: target.id, + }; + const chatType = target.kind === "user" ? "direct" : "channel"; + const from = target.kind === "user" ? `matrix:${target.id}` : `matrix:channel:${target.id}`; + const to = `room:${roomScopedDmId ?? target.id}`; + return buildChannelOutboundSessionRoute({ cfg: params.cfg, agentId: params.agentId, channel: "matrix", - accountId: params.accountId, - peer: { - kind: isUser ? "direct" : "channel", - id: rawId, - }, - chatType: isUser ? "direct" : "channel", - from: isUser ? `matrix:${rawId}` : `matrix:channel:${rawId}`, - to: `room:${rawId}`, + accountId: effectiveAccountId, + peer, + chatType, + from, + to, }); } diff --git a/extensions/matrix/src/types.ts b/extensions/matrix/src/types.ts index 91e6818f8f6..1d8b4c254b2 100644 --- a/extensions/matrix/src/types.ts +++ b/extensions/matrix/src/types.ts @@ -16,6 +16,12 @@ export type MatrixDmConfig = { policy?: DmPolicy; /** Allowlist for DM senders (matrix user IDs or "*"). */ allowFrom?: Array; + /** + * How Matrix DMs map to sessions. + * - `per-user` (default): all DM rooms with the same routed peer share one DM session. + * - `per-room`: each Matrix DM room gets its own session key. + */ + sessionScope?: "per-user" | "per-room"; /** Per-DM thread reply behavior override (off|inbound|always). Overrides top-level threadReplies for direct messages. */ threadReplies?: "off" | "inbound" | "always"; }; diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index dfacdf40d61..29f377e1d3d 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -161,6 +161,8 @@ export type MsgContext = { MessageThreadId?: string | number; /** Platform-native channel/conversation id (e.g. Slack DM channel "D…" id). */ NativeChannelId?: string; + /** Stable provider-native direct-peer id when a DM room/user mapping must survive later writes. */ + NativeDirectUserId?: string; /** Telegram forum supergroup marker. */ IsForum?: boolean; /** Warning: DM has topics enabled but this message is not in a topic. */ diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index f111ee0e873..2ce38d8e575 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -534,6 +534,7 @@ export type ChannelMessagingAdapter = { agentId: string; accountId?: string | null; target: string; + currentSessionKey?: string; resolvedTarget?: { to: string; kind: ChannelDirectoryEntryKind | "channel"; diff --git a/src/config/sessions/metadata.ts b/src/config/sessions/metadata.ts index b93cfcb5372..1be93e1b83b 100644 --- a/src/config/sessions/metadata.ts +++ b/src/config/sessions/metadata.ts @@ -32,6 +32,12 @@ const mergeOrigin = ( if (next?.to) { merged.to = next.to; } + if (next?.nativeChannelId) { + merged.nativeChannelId = next.nativeChannelId; + } + if (next?.nativeDirectUserId) { + merged.nativeDirectUserId = next.nativeDirectUserId; + } if (next?.accountId) { merged.accountId = next.accountId; } @@ -53,6 +59,8 @@ export function deriveSessionOrigin(ctx: MsgContext): SessionOrigin | undefined const from = ctx.From?.trim(); const to = (typeof ctx.OriginatingTo === "string" ? ctx.OriginatingTo : ctx.To)?.trim() ?? undefined; + const nativeChannelId = ctx.NativeChannelId?.trim(); + const nativeDirectUserId = ctx.NativeDirectUserId?.trim(); const accountId = ctx.AccountId?.trim(); const threadId = ctx.MessageThreadId ?? undefined; @@ -75,6 +83,12 @@ export function deriveSessionOrigin(ctx: MsgContext): SessionOrigin | undefined if (to) { origin.to = to; } + if (nativeChannelId) { + origin.nativeChannelId = nativeChannelId; + } + if (nativeDirectUserId) { + origin.nativeDirectUserId = nativeDirectUserId; + } if (accountId) { origin.accountId = accountId; } diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 27223acbf0a..fa325a3a078 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -18,6 +18,8 @@ export type SessionOrigin = { chatType?: SessionChatType; from?: string; to?: string; + nativeChannelId?: string; + nativeDirectUserId?: string; accountId?: string; threadId?: string | number; }; diff --git a/src/gateway/server-methods/send.test.ts b/src/gateway/server-methods/send.test.ts index cc1e6c6eaeb..19b4d2c9319 100644 --- a/src/gateway/server-methods/send.test.ts +++ b/src/gateway/server-methods/send.test.ts @@ -597,6 +597,74 @@ describe("gateway send mirroring", () => { }); }); + it("still resolves outbound routing metadata when a sessionKey is provided", async () => { + mockDeliverySuccess("m-matrix-session-route"); + mocks.resolveOutboundSessionRoute.mockResolvedValueOnce({ + sessionKey: "agent:main:matrix:channel:!dm:example.org", + baseSessionKey: "agent:main:matrix:channel:!dm:example.org", + peer: { kind: "channel", id: "!dm:example.org" }, + chatType: "direct", + from: "matrix:@alice:example.org", + to: "room:!dm:example.org", + }); + + await runSend({ + to: "@alice:example.org", + message: "hello", + channel: "matrix", + sessionKey: "agent:main:matrix:channel:!dm:example.org", + idempotencyKey: "idem-matrix-session-route", + }); + + expect(mocks.resolveOutboundSessionRoute).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "matrix", + target: "resolved", + currentSessionKey: "agent:main:matrix:channel:!dm:example.org", + }), + ); + expect(mocks.ensureOutboundSessionEntry).toHaveBeenCalledWith( + expect.objectContaining({ + route: expect.objectContaining({ + sessionKey: "agent:main:matrix:channel:!dm:example.org", + baseSessionKey: "agent:main:matrix:channel:!dm:example.org", + to: "room:!dm:example.org", + }), + }), + ); + expectDeliverySessionMirror({ + agentId: "main", + sessionKey: "agent:main:matrix:channel:!dm:example.org", + }); + }); + + it("falls back to the provided sessionKey when outbound route lookup returns null", async () => { + mockDeliverySuccess("m-session-fallback"); + mocks.resolveOutboundSessionRoute.mockResolvedValueOnce(null); + + await runSend({ + to: "channel:C1", + message: "hello", + channel: "slack", + sessionKey: "agent:work:slack:channel:c1", + idempotencyKey: "idem-session-fallback", + }); + + expect(mocks.ensureOutboundSessionEntry).not.toHaveBeenCalled(); + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + session: expect.objectContaining({ + agentId: "work", + key: "agent:work:slack:channel:c1", + }), + mirror: expect.objectContaining({ + sessionKey: "agent:work:slack:channel:c1", + agentId: "work", + }), + }), + ); + }); + it("prefers explicit agentId over sessionKey agent for delivery and mirror", async () => { mockDeliverySuccess("m-agent-precedence"); diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index 706c8540673..2f53144a458 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -231,31 +231,39 @@ export const sendHandlers: GatewayRequestHandlers = { : undefined; const defaultAgentId = resolveSessionAgentId({ config: cfg }); const effectiveAgentId = explicitAgentId ?? sessionAgentId ?? defaultAgentId; - // If callers omit sessionKey, derive a target session key from the outbound route. - const derivedRoute = !providedSessionKey - ? await resolveOutboundSessionRoute({ - cfg, - channel, - agentId: effectiveAgentId, - accountId, - target: deliveryTarget, - resolvedTarget: idLikeTarget, - threadId, - }) + const derivedRoute = await resolveOutboundSessionRoute({ + cfg, + channel, + agentId: effectiveAgentId, + accountId, + target: deliveryTarget, + currentSessionKey: providedSessionKey, + resolvedTarget: idLikeTarget, + threadId, + }); + const outboundRoute = derivedRoute + ? providedSessionKey + ? { + ...derivedRoute, + sessionKey: providedSessionKey, + baseSessionKey: providedSessionKey, + } + : derivedRoute : null; - if (derivedRoute) { + if (outboundRoute) { await ensureOutboundSessionEntry({ cfg, agentId: effectiveAgentId, channel, accountId, - route: derivedRoute, + route: outboundRoute, }); } + const outboundSessionKey = outboundRoute?.sessionKey ?? providedSessionKey; const outboundSession = buildOutboundSessionContext({ cfg, agentId: effectiveAgentId, - sessionKey: providedSessionKey ?? derivedRoute?.sessionKey, + sessionKey: outboundSessionKey, }); const results = await deliverOutboundPayloads({ cfg, @@ -268,23 +276,15 @@ export const sendHandlers: GatewayRequestHandlers = { threadId: threadId ?? null, deps: outboundDeps, gatewayClientScopes: client?.connect?.scopes ?? [], - mirror: providedSessionKey + mirror: outboundSessionKey ? { - sessionKey: providedSessionKey, + sessionKey: outboundSessionKey, agentId: effectiveAgentId, text: mirrorText || message, mediaUrls: mirrorMediaUrls.length > 0 ? mirrorMediaUrls : undefined, idempotencyKey: idem, } - : derivedRoute - ? { - sessionKey: derivedRoute.sessionKey, - agentId: effectiveAgentId, - text: mirrorText || message, - mediaUrls: mirrorMediaUrls.length > 0 ? mirrorMediaUrls : undefined, - idempotencyKey: idem, - } - : undefined, + : undefined, }); const result = results.at(-1); diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index d6018acf5fd..70532e45745 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -502,6 +502,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise