From 96d7e4552df4d7ff13798074ef8fed3cd3e66bca Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Sun, 8 Mar 2026 18:43:15 -0400 Subject: [PATCH] matrix-js: add account-aware bindings and ACP routing --- docs/channels/matrix-js.md | 16 + .../src/monitor/thread-bindings.manager.ts | 16 + .../src/actions.account-propagation.test.ts | 85 +++ extensions/matrix-js/src/actions.ts | 166 ++--- extensions/matrix-js/src/config-migration.ts | 2 + extensions/matrix-js/src/config-schema.ts | 11 + .../matrix-js/src/matrix/actions/messages.ts | 1 + .../src/matrix/monitor/handler.test.ts | 152 +++- .../matrix-js/src/matrix/monitor/handler.ts | 56 +- .../matrix-js/src/matrix/monitor/index.ts | 26 + .../src/matrix/thread-bindings.test.ts | 225 ++++++ .../matrix-js/src/matrix/thread-bindings.ts | 681 ++++++++++++++++++ extensions/matrix-js/src/tool-actions.test.ts | 78 ++ extensions/matrix-js/src/tool-actions.ts | 19 +- extensions/matrix-js/src/types.ts | 10 + extensions/telegram/src/thread-bindings.ts | 22 + src/auto-reply/reply/channel-context.ts | 4 + .../reply/commands-acp/context.test.ts | 39 + src/auto-reply/reply/commands-acp/context.ts | 28 + .../reply/commands-session-lifecycle.test.ts | 305 +++++--- src/auto-reply/reply/commands-session.ts | 314 ++++---- .../reply/commands-subagents-focus.test.ts | 33 +- .../reply/commands-subagents/action-agents.ts | 7 +- .../reply/commands-subagents/action-focus.ts | 61 +- .../commands-subagents/action-unfocus.ts | 47 +- .../reply/commands-subagents/shared.ts | 8 + src/auto-reply/reply/matrix-context.ts | 54 ++ src/config/schema.help.ts | 12 +- src/config/schema.labels.ts | 6 + src/config/zod-schema.agents.ts | 9 +- src/infra/outbound/session-binding-service.ts | 50 ++ src/plugin-sdk/matrix-js.ts | 18 + 32 files changed, 2194 insertions(+), 367 deletions(-) create mode 100644 extensions/matrix-js/src/actions.account-propagation.test.ts create mode 100644 extensions/matrix-js/src/matrix/thread-bindings.test.ts create mode 100644 extensions/matrix-js/src/matrix/thread-bindings.ts create mode 100644 src/auto-reply/reply/matrix-context.ts diff --git a/docs/channels/matrix-js.md b/docs/channels/matrix-js.md index 6f3d640e1e9..c1a4ad262df 100644 --- a/docs/channels/matrix-js.md +++ b/docs/channels/matrix-js.md @@ -241,6 +241,21 @@ Matrix-js supports native Matrix threads for both automatic replies and message- - `threadReplies: "always"` keeps room replies in a thread rooted at the triggering message. - Inbound threaded messages include the thread root message as extra agent context. - Message-tool sends now auto-inherit the current Matrix thread when the target is the same room, or the same DM user target, unless an explicit `threadId` is provided. +- Runtime thread bindings are supported for Matrix-js. `/focus`, `/unfocus`, `/agents`, `/session idle`, `/session max-age`, and thread-bound `/acp spawn` now work in Matrix rooms and DMs. +- Top-level Matrix room/DM `/focus` creates a new Matrix thread and binds it to the target session. +- Running `/focus` or `/acp spawn --thread here` inside an existing Matrix thread binds that current thread instead. + +### Thread Binding Config + +Matrix-js inherits global defaults from `session.threadBindings`, and also supports per-channel overrides: + +- `threadBindings.enabled` +- `threadBindings.idleHours` +- `threadBindings.maxAgeHours` +- `threadBindings.spawnSubagentSessions` +- `threadBindings.spawnAcpSessions` + +For Matrix-js, spawn flags default to enabled unless you turn them off explicitly. ## Reactions @@ -345,6 +360,7 @@ See [Groups](/channels/groups) for mention-gating and allowlist behavior. - `groupAllowFrom`: allowlist of user IDs for room traffic. - `replyToMode`: `off`, `first`, or `all`. - `threadReplies`: `off`, `inbound`, or `always`. +- `threadBindings`: per-channel overrides for thread-bound session routing and lifecycle. - `startupVerification`: automatic self-verification request mode on startup (`if-unverified`, `off`). - `startupVerificationCooldownHours`: cooldown before retrying automatic startup verification requests. - `textChunkLimit`: outbound message chunk size. diff --git a/extensions/discord/src/monitor/thread-bindings.manager.ts b/extensions/discord/src/monitor/thread-bindings.manager.ts index 5c37ac4bbf0..96e9ffc809a 100644 --- a/extensions/discord/src/monitor/thread-bindings.manager.ts +++ b/extensions/discord/src/monitor/thread-bindings.manager.ts @@ -20,6 +20,10 @@ import { resolveChannelIdForBinding, summarizeDiscordError, } from "./thread-bindings.discord-api.js"; +import { + setThreadBindingIdleTimeoutBySessionKey, + setThreadBindingMaxAgeBySessionKey, +} from "./thread-bindings.lifecycle.js"; import { resolveThreadBindingFarewellText, resolveThreadBindingThreadName, @@ -651,6 +655,18 @@ export function createThreadBindingManager( const binding = manager.getByThreadId(ref.conversationId); return binding ? toSessionBindingRecord(binding, { idleTimeoutMs, maxAgeMs }) : null; }, + setIdleTimeoutBySession: ({ targetSessionKey, idleTimeoutMs: nextIdleTimeoutMs }) => + setThreadBindingIdleTimeoutBySessionKey({ + targetSessionKey, + accountId, + idleTimeoutMs: nextIdleTimeoutMs, + }).map((entry) => toSessionBindingRecord(entry, { idleTimeoutMs, maxAgeMs })), + setMaxAgeBySession: ({ targetSessionKey, maxAgeMs: nextMaxAgeMs }) => + setThreadBindingMaxAgeBySessionKey({ + targetSessionKey, + accountId, + maxAgeMs: nextMaxAgeMs, + }).map((entry) => toSessionBindingRecord(entry, { idleTimeoutMs, maxAgeMs })), touch: (bindingId, at) => { const threadId = resolveThreadBindingConversationIdFromBindingId({ accountId, diff --git a/extensions/matrix-js/src/actions.account-propagation.test.ts b/extensions/matrix-js/src/actions.account-propagation.test.ts new file mode 100644 index 00000000000..4218088fbcd --- /dev/null +++ b/extensions/matrix-js/src/actions.account-propagation.test.ts @@ -0,0 +1,85 @@ +import type { ChannelMessageActionContext } from "openclaw/plugin-sdk/matrix-js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { CoreConfig } from "./types.js"; + +const mocks = vi.hoisted(() => ({ + handleMatrixAction: vi.fn(), +})); + +vi.mock("./tool-actions.js", () => ({ + handleMatrixAction: mocks.handleMatrixAction, +})); + +const { matrixMessageActions } = await import("./actions.js"); + +function createContext( + overrides: Partial, +): ChannelMessageActionContext { + return { + channel: "matrix-js", + action: "send", + cfg: { + channels: { + "matrix-js": { + enabled: true, + homeserver: "https://matrix.example.org", + userId: "@bot:example.org", + accessToken: "token", + }, + }, + } as CoreConfig, + params: {}, + ...overrides, + }; +} + +describe("matrixMessageActions account propagation", () => { + beforeEach(() => { + mocks.handleMatrixAction.mockReset().mockResolvedValue({ + ok: true, + output: "", + details: { ok: true }, + }); + }); + + it("forwards accountId for send actions", async () => { + await matrixMessageActions.handleAction?.( + createContext({ + action: "send", + accountId: "ops", + params: { + to: "room:!room:example", + message: "hello", + }, + }), + ); + + expect(mocks.handleMatrixAction).toHaveBeenCalledWith( + expect.objectContaining({ + action: "sendMessage", + accountId: "ops", + }), + expect.any(Object), + ); + }); + + it("forwards accountId for permissions actions", async () => { + await matrixMessageActions.handleAction?.( + createContext({ + action: "permissions", + accountId: "ops", + params: { + operation: "verification-list", + }, + }), + ); + + expect(mocks.handleMatrixAction).toHaveBeenCalledWith( + expect.objectContaining({ + action: "verificationList", + accountId: "ops", + }), + expect.any(Object), + ); + }); +}); diff --git a/extensions/matrix-js/src/actions.ts b/extensions/matrix-js/src/actions.ts index 07bfcc4a93f..755aa7c834f 100644 --- a/extensions/matrix-js/src/actions.ts +++ b/extensions/matrix-js/src/actions.ts @@ -77,7 +77,15 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { return { to }; }, handleAction: async (ctx: ChannelMessageActionContext) => { - const { action, params, cfg } = ctx; + const { action, params, cfg, accountId } = ctx; + const dispatch = async (actionParams: Record) => + await handleMatrixAction( + { + ...actionParams, + ...(accountId ? { accountId } : {}), + }, + cfg as CoreConfig, + ); const resolveRoomId = () => readStringParam(params, "roomId") ?? readStringParam(params, "channelId") ?? @@ -92,97 +100,76 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { const mediaUrl = readStringParam(params, "media", { trim: false }); const replyTo = readStringParam(params, "replyTo"); const threadId = readStringParam(params, "threadId"); - return await handleMatrixAction( - { - action: "sendMessage", - to, - content, - mediaUrl: mediaUrl ?? undefined, - replyToId: replyTo ?? undefined, - threadId: threadId ?? undefined, - }, - cfg as CoreConfig, - ); + return await dispatch({ + action: "sendMessage", + to, + content, + mediaUrl: mediaUrl ?? undefined, + replyToId: replyTo ?? undefined, + threadId: threadId ?? undefined, + }); } if (action === "poll-vote") { - return await handleMatrixAction( - { - ...params, - action: "pollVote", - }, - cfg as CoreConfig, - ); + return await dispatch({ + ...params, + action: "pollVote", + }); } if (action === "react") { const messageId = readStringParam(params, "messageId", { required: true }); const emoji = readStringParam(params, "emoji", { allowEmpty: true }); const remove = typeof params.remove === "boolean" ? params.remove : undefined; - return await handleMatrixAction( - { - action: "react", - roomId: resolveRoomId(), - messageId, - emoji, - remove, - }, - cfg as CoreConfig, - ); + return await dispatch({ + action: "react", + roomId: resolveRoomId(), + messageId, + emoji, + remove, + }); } if (action === "reactions") { const messageId = readStringParam(params, "messageId", { required: true }); const limit = readNumberParam(params, "limit", { integer: true }); - return await handleMatrixAction( - { - action: "reactions", - roomId: resolveRoomId(), - messageId, - limit, - }, - cfg as CoreConfig, - ); + return await dispatch({ + action: "reactions", + roomId: resolveRoomId(), + messageId, + limit, + }); } if (action === "read") { const limit = readNumberParam(params, "limit", { integer: true }); - return await handleMatrixAction( - { - action: "readMessages", - roomId: resolveRoomId(), - limit, - before: readStringParam(params, "before"), - after: readStringParam(params, "after"), - }, - cfg as CoreConfig, - ); + return await dispatch({ + action: "readMessages", + roomId: resolveRoomId(), + limit, + before: readStringParam(params, "before"), + after: readStringParam(params, "after"), + }); } if (action === "edit") { const messageId = readStringParam(params, "messageId", { required: true }); const content = readStringParam(params, "message", { required: true }); - return await handleMatrixAction( - { - action: "editMessage", - roomId: resolveRoomId(), - messageId, - content, - }, - cfg as CoreConfig, - ); + return await dispatch({ + action: "editMessage", + roomId: resolveRoomId(), + messageId, + content, + }); } if (action === "delete") { const messageId = readStringParam(params, "messageId", { required: true }); - return await handleMatrixAction( - { - action: "deleteMessage", - roomId: resolveRoomId(), - messageId, - }, - cfg as CoreConfig, - ); + return await dispatch({ + action: "deleteMessage", + roomId: resolveRoomId(), + messageId, + }); } if (action === "pin" || action === "unpin" || action === "list-pins") { @@ -190,37 +177,27 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { action === "list-pins" ? undefined : readStringParam(params, "messageId", { required: true }); - return await handleMatrixAction( - { - action: - action === "pin" ? "pinMessage" : action === "unpin" ? "unpinMessage" : "listPins", - roomId: resolveRoomId(), - messageId, - }, - cfg as CoreConfig, - ); + return await dispatch({ + action: action === "pin" ? "pinMessage" : action === "unpin" ? "unpinMessage" : "listPins", + roomId: resolveRoomId(), + messageId, + }); } if (action === "member-info") { const userId = readStringParam(params, "userId", { required: true }); - return await handleMatrixAction( - { - action: "memberInfo", - userId, - roomId: readStringParam(params, "roomId") ?? readStringParam(params, "channelId"), - }, - cfg as CoreConfig, - ); + return await dispatch({ + action: "memberInfo", + userId, + roomId: readStringParam(params, "roomId") ?? readStringParam(params, "channelId"), + }); } if (action === "channel-info") { - return await handleMatrixAction( - { - action: "channelInfo", - roomId: resolveRoomId(), - }, - cfg as CoreConfig, - ); + return await dispatch({ + action: "channelInfo", + roomId: resolveRoomId(), + }); } if (action === "permissions") { @@ -258,13 +235,10 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { ).join(", ")}`, ); } - return await handleMatrixAction( - { - ...params, - action: resolvedAction, - }, - cfg, - ); + return await dispatch({ + ...params, + action: resolvedAction, + }); } throw new Error(`Action ${action} is not supported for provider matrix-js.`); diff --git a/extensions/matrix-js/src/config-migration.ts b/extensions/matrix-js/src/config-migration.ts index ae1a358cdc7..9ff1f1001f5 100644 --- a/extensions/matrix-js/src/config-migration.ts +++ b/extensions/matrix-js/src/config-migration.ts @@ -19,6 +19,7 @@ type LegacyAccountField = | "textChunkLimit" | "chunkMode" | "responsePrefix" + | "threadBindings" | "startupVerification" | "startupVerificationCooldownHours" | "mediaMaxMb" @@ -47,6 +48,7 @@ const LEGACY_ACCOUNT_FIELDS: ReadonlyArray = [ "textChunkLimit", "chunkMode", "responsePrefix", + "threadBindings", "startupVerification", "startupVerificationCooldownHours", "mediaMaxMb", diff --git a/extensions/matrix-js/src/config-schema.ts b/extensions/matrix-js/src/config-schema.ts index 3080c99e14a..b4823848368 100644 --- a/extensions/matrix-js/src/config-schema.ts +++ b/extensions/matrix-js/src/config-schema.ts @@ -14,6 +14,16 @@ const matrixActionSchema = z }) .optional(); +const matrixThreadBindingsSchema = z + .object({ + enabled: z.boolean().optional(), + idleHours: z.number().nonnegative().optional(), + maxAgeHours: z.number().nonnegative().optional(), + spawnSubagentSessions: z.boolean().optional(), + spawnAcpSessions: z.boolean().optional(), + }) + .optional(); + const matrixDmSchema = z .object({ enabled: z.boolean().optional(), @@ -60,6 +70,7 @@ export const MatrixConfigSchema = z.object({ .enum(["group-mentions", "group-all", "direct", "all", "none", "off"]) .optional(), reactionNotifications: z.enum(["off", "own"]).optional(), + threadBindings: matrixThreadBindingsSchema, startupVerification: z.enum(["off", "if-unverified"]).optional(), startupVerificationCooldownHours: z.number().optional(), mediaMaxMb: z.number().optional(), diff --git a/extensions/matrix-js/src/matrix/actions/messages.ts b/extensions/matrix-js/src/matrix/actions/messages.ts index 3fcf1cd43d4..5b0d4516ed7 100644 --- a/extensions/matrix-js/src/matrix/actions/messages.ts +++ b/extensions/matrix-js/src/matrix/actions/messages.ts @@ -25,6 +25,7 @@ export async function sendMatrixMessage( mediaUrl: opts.mediaUrl, replyToId: opts.replyToId, threadId: opts.threadId, + accountId: opts.accountId, client: opts.client, timeoutMs: opts.timeoutMs, }); diff --git a/extensions/matrix-js/src/matrix/monitor/handler.test.ts b/extensions/matrix-js/src/matrix/monitor/handler.test.ts index ed070b28a5d..69825a005df 100644 --- a/extensions/matrix-js/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix-js/src/matrix/monitor/handler.test.ts @@ -1,4 +1,8 @@ -import { describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + __testing as sessionBindingTesting, + registerSessionBindingAdapter, +} from "../../../../../src/infra/outbound/session-binding-service.js"; import { createMatrixRoomMessageHandler } from "./handler.js"; import { EventType, type MatrixRawEvent } from "./types.js"; @@ -13,6 +17,10 @@ vi.mock("../send.js", () => ({ sendTypingMatrix: vi.fn(async () => {}), })); +beforeEach(() => { + sessionBindingTesting.resetSessionBindingAdaptersForTests(); +}); + function createReactionHarness(params?: { cfg?: unknown; dmPolicy?: "pairing" | "allowlist" | "open" | "disabled"; @@ -515,6 +523,148 @@ describe("matrix monitor handler pairing account scope", () => { ); }); + it("routes bound Matrix threads to the target session key", async () => { + registerSessionBindingAdapter({ + channel: "matrix-js", + accountId: "ops", + listBySession: () => [], + resolveByConversation: (ref) => + ref.conversationId === "$root" + ? { + bindingId: "ops:!room:example:$root", + targetSessionKey: "agent:bound:session-1", + targetKind: "session", + conversation: { + channel: "matrix-js", + accountId: "ops", + conversationId: "$root", + parentConversationId: "!room:example", + }, + status: "active", + boundAt: Date.now(), + metadata: { + boundBy: "user-1", + }, + } + : null, + touch: vi.fn(), + }); + const recordInboundSession = vi.fn(async () => {}); + + const handler = createMatrixRoomMessageHandler({ + client: { + getUserId: async () => "@bot:example.org", + getEvent: async () => ({ + event_id: "$root", + sender: "@alice:example.org", + type: EventType.RoomMessage, + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "Root topic", + }, + }), + } as never, + core: { + channel: { + pairing: { + readAllowFromStore: async () => [] as string[], + upsertPairingRequest: async () => ({ code: "ABCDEFGH", created: false }), + }, + commands: { + shouldHandleTextCommands: () => false, + }, + text: { + hasControlCommand: () => false, + resolveMarkdownTableMode: () => "preserve", + }, + routing: { + resolveAgentRoute: () => ({ + agentId: "ops", + channel: "matrix-js", + accountId: "ops", + sessionKey: "agent:ops:main", + mainSessionKey: "agent:ops:main", + matchedBy: "binding.account", + }), + }, + session: { + resolveStorePath: () => "/tmp/session-store", + readSessionUpdatedAt: () => undefined, + recordInboundSession, + }, + reply: { + resolveEnvelopeFormatOptions: () => ({}), + formatAgentEnvelope: ({ body }: { body: string }) => body, + finalizeInboundContext: (ctx: unknown) => ctx, + createReplyDispatcherWithTyping: () => ({ + dispatcher: {}, + replyOptions: {}, + markDispatchIdle: () => {}, + }), + resolveHumanDelayConfig: () => undefined, + dispatchReplyFromConfig: async () => ({ + queuedFinal: false, + counts: { final: 0, block: 0, tool: 0 }, + }), + }, + reactions: { + shouldAckReaction: () => false, + }, + }, + } as never, + cfg: {} as never, + accountId: "ops", + runtime: { + error: () => {}, + } as never, + logger: { + info: () => {}, + warn: () => {}, + } as never, + logVerboseMessage: () => {}, + allowFrom: [], + mentionRegexes: [], + groupPolicy: "open", + replyToMode: "off", + threadReplies: "inbound", + dmEnabled: true, + dmPolicy: "open", + textLimit: 8_000, + mediaMaxBytes: 10_000_000, + startupMs: 0, + startupGraceMs: 0, + directTracker: { + isDirectMessage: async () => false, + }, + getRoomInfo: async () => ({ altAliases: [] }), + getMemberDisplayName: async () => "sender", + }); + + await handler("!room:example", { + type: EventType.RoomMessage, + sender: "@user:example.org", + event_id: "$reply1", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "follow up", + "m.relates_to": { + rel_type: "m.thread", + event_id: "$root", + "m.in_reply_to": { event_id: "$root" }, + }, + "m.mentions": { room: true }, + }, + } as MatrixRawEvent); + + expect(recordInboundSession).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "agent:bound:session-1", + }), + ); + }); + it("enqueues system events for reactions on bot-authored messages", async () => { const { handler, enqueueSystemEvent, resolveAgentRoute } = createReactionHarness(); diff --git a/extensions/matrix-js/src/matrix/monitor/handler.ts b/extensions/matrix-js/src/matrix/monitor/handler.ts index 9fbe8f7a83e..57aa35f089d 100644 --- a/extensions/matrix-js/src/matrix/monitor/handler.ts +++ b/extensions/matrix-js/src/matrix/monitor/handler.ts @@ -1,9 +1,13 @@ import { createReplyPrefixOptions, createTypingCallbacks, + ensureConfiguredAcpRouteReady, formatAllowlistMatchMeta, + getSessionBindingService, logInboundDrop, logTypingFailure, + resolveAgentIdFromSessionKey, + resolveConfiguredAcpRoute, resolveControlCommandGate, type PluginRuntime, type ReplyPayload, @@ -533,7 +537,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam ? await resolveThreadContext({ roomId, threadRootId }) : undefined; - const route = core.channel.routing.resolveAgentRoute({ + const baseRoute = core.channel.routing.resolveAgentRoute({ cfg, channel: "matrix-js", accountId, @@ -542,6 +546,56 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam id: isDirectMessage ? senderId : roomId, }, }); + const bindingConversationId = + threadRootId && threadRootId !== messageId ? threadRootId : roomId; + const bindingParentConversationId = bindingConversationId === roomId ? undefined : roomId; + const sessionBindingService = getSessionBindingService(); + const runtimeBinding = sessionBindingService.resolveByConversation({ + channel: "matrix-js", + accountId, + conversationId: bindingConversationId, + parentConversationId: bindingParentConversationId, + }); + const configuredRoute = + runtimeBinding == null + ? resolveConfiguredAcpRoute({ + cfg, + route: baseRoute, + channel: "matrix-js", + accountId, + conversationId: bindingConversationId, + parentConversationId: bindingParentConversationId, + }) + : null; + const configuredBinding = configuredRoute?.configuredBinding ?? null; + if (!runtimeBinding && configuredBinding) { + const ensured = await ensureConfiguredAcpRouteReady({ + cfg, + configuredBinding, + }); + if (!ensured.ok) { + logInboundDrop({ + log: logVerboseMessage, + channel: "matrix-js", + reason: "configured ACP binding unavailable", + target: configuredBinding.spec.conversationId, + }); + return; + } + } + const boundSessionKey = runtimeBinding?.targetSessionKey?.trim(); + const route = + runtimeBinding && boundSessionKey + ? { + ...baseRoute, + sessionKey: boundSessionKey, + agentId: resolveAgentIdFromSessionKey(boundSessionKey) || baseRoute.agentId, + matchedBy: "binding.channel" as const, + } + : (configuredRoute?.route ?? baseRoute); + if (runtimeBinding) { + sessionBindingService.touch(runtimeBinding.bindingId, eventTs); + } const envelopeFrom = isDirectMessage ? senderName : (roomName ?? roomId); const textWithId = `${bodyText}\n[matrix event id: ${messageId} room: ${roomId}]`; const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { diff --git a/extensions/matrix-js/src/matrix/monitor/index.ts b/extensions/matrix-js/src/matrix/monitor/index.ts index 81ab6865e03..e691dab93d7 100644 --- a/extensions/matrix-js/src/matrix/monitor/index.ts +++ b/extensions/matrix-js/src/matrix/monitor/index.ts @@ -2,6 +2,8 @@ import { format } from "node:util"; import { GROUP_POLICY_BLOCKED_LABEL, mergeAllowlist, + resolveThreadBindingIdleTimeoutMsForChannel, + resolveThreadBindingMaxAgeMsForChannel, resolveAllowlistProviderRuntimeGroupPolicy, resolveDefaultGroupPolicy, summarizeMapping, @@ -21,6 +23,7 @@ import { } from "../client.js"; import { updateMatrixAccountConfig } from "../config-update.js"; import { syncMatrixOwnProfile } from "../profile.js"; +import { createMatrixThreadBindingManager } from "../thread-bindings.js"; import { normalizeMatrixUserId } from "./allowlist.js"; import { registerMatrixAutoJoin } from "./auto-join.js"; import { createDirectRoomTracker } from "./direct.js"; @@ -270,6 +273,16 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const groupPolicy = allowlistOnly && groupPolicyRaw === "open" ? "allowlist" : groupPolicyRaw; const replyToMode = opts.replyToMode ?? accountConfig.replyToMode ?? "off"; const threadReplies = accountConfig.threadReplies ?? "inbound"; + const threadBindingIdleTimeoutMs = resolveThreadBindingIdleTimeoutMsForChannel({ + cfg, + channel: "matrix-js", + accountId: account.accountId, + }); + const threadBindingMaxAgeMs = resolveThreadBindingMaxAgeMsForChannel({ + cfg, + channel: "matrix-js", + accountId: account.accountId, + }); const dmConfig = accountConfig.dm; const dmEnabled = dmConfig?.enabled ?? true; const dmPolicyRaw = dmConfig?.policy ?? "pairing"; @@ -328,6 +341,18 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi accountId: opts.accountId, }); logVerboseMessage("matrix: client started"); + const threadBindingManager = await createMatrixThreadBindingManager({ + accountId: account.accountId, + auth, + client, + env: process.env, + idleTimeoutMs: threadBindingIdleTimeoutMs, + maxAgeMs: threadBindingMaxAgeMs, + logVerboseMessage, + }); + logVerboseMessage( + `matrix: thread bindings ready account=${threadBindingManager.accountId} idleMs=${threadBindingIdleTimeoutMs} maxAgeMs=${threadBindingMaxAgeMs}`, + ); // Shared client is already started via resolveSharedMatrixClient. logger.info(`matrix: logged in as ${auth.userId}`); @@ -414,6 +439,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi await new Promise((resolve) => { const onAbort = () => { try { + threadBindingManager.stop(); logVerboseMessage("matrix: stopping client"); stopSharedClientForAccount(auth, opts.accountId); } finally { diff --git a/extensions/matrix-js/src/matrix/thread-bindings.test.ts b/extensions/matrix-js/src/matrix/thread-bindings.test.ts new file mode 100644 index 00000000000..7e680ac6342 --- /dev/null +++ b/extensions/matrix-js/src/matrix/thread-bindings.test.ts @@ -0,0 +1,225 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import type { PluginRuntime } from "openclaw/plugin-sdk/matrix-js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + getSessionBindingService, + __testing, +} from "../../../../src/infra/outbound/session-binding-service.js"; +import { setMatrixRuntime } from "../runtime.js"; +import { + createMatrixThreadBindingManager, + resetMatrixThreadBindingsForTests, +} from "./thread-bindings.js"; + +const sendMessageMatrixMock = vi.hoisted(() => + vi.fn(async (_to: string, _message: string, opts?: { threadId?: string }) => ({ + messageId: opts?.threadId ? "$reply" : "$root", + roomId: "!room:example", + })), +); + +vi.mock("./send.js", async () => { + const actual = await vi.importActual("./send.js"); + return { + ...actual, + sendMessageMatrix: sendMessageMatrixMock, + }; +}); + +describe("matrix thread bindings", () => { + let stateDir: string; + + beforeEach(async () => { + stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "matrix-thread-bindings-")); + __testing.resetSessionBindingAdaptersForTests(); + resetMatrixThreadBindingsForTests(); + sendMessageMatrixMock.mockClear(); + setMatrixRuntime({ + state: { + resolveStateDir: () => stateDir, + }, + } as PluginRuntime); + }); + + it("creates child Matrix thread bindings from a top-level room context", async () => { + await createMatrixThreadBindingManager({ + accountId: "ops", + auth: { + homeserver: "https://matrix.example.org", + userId: "@bot:example.org", + accessToken: "token", + }, + client: {} as never, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + enableSweeper: false, + }); + + const binding = await getSessionBindingService().bind({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + conversation: { + channel: "matrix-js", + accountId: "ops", + conversationId: "!room:example", + }, + placement: "child", + metadata: { + introText: "intro root", + }, + }); + + expect(sendMessageMatrixMock).toHaveBeenCalledWith("room:!room:example", "intro root", { + client: {}, + accountId: "ops", + }); + expect(binding.conversation).toEqual({ + channel: "matrix-js", + accountId: "ops", + conversationId: "$root", + parentConversationId: "!room:example", + }); + }); + + it("posts intro messages inside existing Matrix threads for current placement", async () => { + await createMatrixThreadBindingManager({ + accountId: "ops", + auth: { + homeserver: "https://matrix.example.org", + userId: "@bot:example.org", + accessToken: "token", + }, + client: {} as never, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + enableSweeper: false, + }); + + const binding = await getSessionBindingService().bind({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + conversation: { + channel: "matrix-js", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + }, + placement: "current", + metadata: { + introText: "intro thread", + }, + }); + + expect(sendMessageMatrixMock).toHaveBeenCalledWith("room:!room:example", "intro thread", { + client: {}, + accountId: "ops", + threadId: "$thread", + }); + expect( + getSessionBindingService().resolveByConversation({ + channel: "matrix-js", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + }), + ).toMatchObject({ + bindingId: binding.bindingId, + targetSessionKey: "agent:ops:subagent:child", + }); + }); + + it("expires idle bindings via the sweeper", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-08T12:00:00.000Z")); + try { + await createMatrixThreadBindingManager({ + accountId: "ops", + auth: { + homeserver: "https://matrix.example.org", + userId: "@bot:example.org", + accessToken: "token", + }, + client: {} as never, + idleTimeoutMs: 1_000, + maxAgeMs: 0, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + conversation: { + channel: "matrix-js", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + }, + placement: "current", + metadata: { + introText: "intro thread", + }, + }); + + sendMessageMatrixMock.mockClear(); + await vi.advanceTimersByTimeAsync(61_000); + await Promise.resolve(); + + expect( + getSessionBindingService().resolveByConversation({ + channel: "matrix-js", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + }), + ).toBeNull(); + } finally { + vi.useRealTimers(); + } + }); + + it("sends threaded farewell messages when bindings are unbound", async () => { + await createMatrixThreadBindingManager({ + accountId: "ops", + auth: { + homeserver: "https://matrix.example.org", + userId: "@bot:example.org", + accessToken: "token", + }, + client: {} as never, + idleTimeoutMs: 1_000, + maxAgeMs: 0, + enableSweeper: false, + }); + + const binding = await getSessionBindingService().bind({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + conversation: { + channel: "matrix-js", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + }, + placement: "current", + metadata: { + introText: "intro thread", + }, + }); + + sendMessageMatrixMock.mockClear(); + await getSessionBindingService().unbind({ + bindingId: binding.bindingId, + reason: "idle-expired", + }); + + expect(sendMessageMatrixMock).toHaveBeenCalledWith( + "room:!room:example", + expect.stringContaining("Session ended automatically"), + expect.objectContaining({ + accountId: "ops", + threadId: "$thread", + }), + ); + }); +}); diff --git a/extensions/matrix-js/src/matrix/thread-bindings.ts b/extensions/matrix-js/src/matrix/thread-bindings.ts new file mode 100644 index 00000000000..0004b026c77 --- /dev/null +++ b/extensions/matrix-js/src/matrix/thread-bindings.ts @@ -0,0 +1,681 @@ +import path from "node:path"; +import { + readJsonFileWithFallback, + registerSessionBindingAdapter, + resolveAgentIdFromSessionKey, + resolveThreadBindingFarewellText, + unregisterSessionBindingAdapter, + writeJsonFileAtomically, + type BindingTargetKind, + type SessionBindingRecord, +} from "openclaw/plugin-sdk/matrix-js"; +import { resolveMatrixStoragePaths } from "./client/storage.js"; +import type { MatrixAuth } from "./client/types.js"; +import type { MatrixClient } from "./sdk.js"; +import { sendMessageMatrix } from "./send.js"; + +const STORE_VERSION = 1; +const THREAD_BINDINGS_SWEEP_INTERVAL_MS = 60_000; +const TOUCH_PERSIST_DELAY_MS = 30_000; + +type MatrixThreadBindingTargetKind = "subagent" | "acp"; + +type MatrixThreadBindingRecord = { + accountId: string; + conversationId: string; + parentConversationId?: string; + targetKind: MatrixThreadBindingTargetKind; + targetSessionKey: string; + agentId?: string; + label?: string; + boundBy?: string; + boundAt: number; + lastActivityAt: number; + idleTimeoutMs?: number; + maxAgeMs?: number; +}; + +type StoredMatrixThreadBindingState = { + version: number; + bindings: MatrixThreadBindingRecord[]; +}; + +export type MatrixThreadBindingManager = { + accountId: string; + getIdleTimeoutMs: () => number; + getMaxAgeMs: () => number; + getByConversation: (params: { + conversationId: string; + parentConversationId?: string; + }) => MatrixThreadBindingRecord | undefined; + listBySessionKey: (targetSessionKey: string) => MatrixThreadBindingRecord[]; + listBindings: () => MatrixThreadBindingRecord[]; + touchBinding: (bindingId: string, at?: number) => MatrixThreadBindingRecord | null; + setIdleTimeoutBySessionKey: (params: { + targetSessionKey: string; + idleTimeoutMs: number; + }) => MatrixThreadBindingRecord[]; + setMaxAgeBySessionKey: (params: { + targetSessionKey: string; + maxAgeMs: number; + }) => MatrixThreadBindingRecord[]; + stop: () => void; +}; + +const MANAGERS_BY_ACCOUNT_ID = new Map(); +const BINDINGS_BY_ACCOUNT_CONVERSATION = new Map(); + +function normalizeDurationMs(raw: unknown, fallback: number): number { + if (typeof raw !== "number" || !Number.isFinite(raw)) { + return fallback; + } + return Math.max(0, Math.floor(raw)); +} + +function normalizeText(raw: unknown): string { + return typeof raw === "string" ? raw.trim() : ""; +} + +function normalizeConversationId(raw: unknown): string | undefined { + const trimmed = normalizeText(raw); + return trimmed || undefined; +} + +function resolveBindingKey(params: { + accountId: string; + conversationId: string; + parentConversationId?: string; +}): string { + return `${params.accountId}:${params.parentConversationId?.trim() || "-"}:${params.conversationId}`; +} + +function toSessionBindingTargetKind(raw: MatrixThreadBindingTargetKind): BindingTargetKind { + return raw === "subagent" ? "subagent" : "session"; +} + +function toMatrixBindingTargetKind(raw: BindingTargetKind): MatrixThreadBindingTargetKind { + return raw === "subagent" ? "subagent" : "acp"; +} + +function resolveEffectiveBindingExpiry(params: { + record: MatrixThreadBindingRecord; + defaultIdleTimeoutMs: number; + defaultMaxAgeMs: number; +}): { + expiresAt?: number; + reason?: "idle-expired" | "max-age-expired"; +} { + const idleTimeoutMs = + typeof params.record.idleTimeoutMs === "number" + ? Math.max(0, Math.floor(params.record.idleTimeoutMs)) + : params.defaultIdleTimeoutMs; + const maxAgeMs = + typeof params.record.maxAgeMs === "number" + ? Math.max(0, Math.floor(params.record.maxAgeMs)) + : params.defaultMaxAgeMs; + const inactivityExpiresAt = + idleTimeoutMs > 0 + ? Math.max(params.record.lastActivityAt, params.record.boundAt) + idleTimeoutMs + : undefined; + const maxAgeExpiresAt = maxAgeMs > 0 ? params.record.boundAt + maxAgeMs : undefined; + + if (inactivityExpiresAt != null && maxAgeExpiresAt != null) { + return inactivityExpiresAt <= maxAgeExpiresAt + ? { expiresAt: inactivityExpiresAt, reason: "idle-expired" } + : { expiresAt: maxAgeExpiresAt, reason: "max-age-expired" }; + } + if (inactivityExpiresAt != null) { + return { expiresAt: inactivityExpiresAt, reason: "idle-expired" }; + } + if (maxAgeExpiresAt != null) { + return { expiresAt: maxAgeExpiresAt, reason: "max-age-expired" }; + } + return {}; +} + +function toSessionBindingRecord( + record: MatrixThreadBindingRecord, + defaults: { idleTimeoutMs: number; maxAgeMs: number }, +): SessionBindingRecord { + const lifecycle = resolveEffectiveBindingExpiry({ + record, + defaultIdleTimeoutMs: defaults.idleTimeoutMs, + defaultMaxAgeMs: defaults.maxAgeMs, + }); + const idleTimeoutMs = + typeof record.idleTimeoutMs === "number" ? record.idleTimeoutMs : defaults.idleTimeoutMs; + const maxAgeMs = typeof record.maxAgeMs === "number" ? record.maxAgeMs : defaults.maxAgeMs; + return { + bindingId: resolveBindingKey(record), + targetSessionKey: record.targetSessionKey, + targetKind: toSessionBindingTargetKind(record.targetKind), + conversation: { + channel: "matrix-js", + accountId: record.accountId, + conversationId: record.conversationId, + parentConversationId: record.parentConversationId, + }, + status: "active", + boundAt: record.boundAt, + expiresAt: lifecycle.expiresAt, + metadata: { + agentId: record.agentId, + label: record.label, + boundBy: record.boundBy, + lastActivityAt: record.lastActivityAt, + idleTimeoutMs, + maxAgeMs, + }, + }; +} + +function resolveBindingsPath(params: { + auth: MatrixAuth; + accountId: string; + env?: NodeJS.ProcessEnv; +}): string { + const storagePaths = resolveMatrixStoragePaths({ + homeserver: params.auth.homeserver, + userId: params.auth.userId, + accessToken: params.auth.accessToken, + accountId: params.accountId, + env: params.env, + }); + return path.join(storagePaths.rootDir, "thread-bindings.json"); +} + +async function loadBindingsFromDisk(filePath: string, accountId: string) { + const { value } = await readJsonFileWithFallback( + filePath, + null, + ); + if (value?.version !== STORE_VERSION || !Array.isArray(value.bindings)) { + return []; + } + const loaded: MatrixThreadBindingRecord[] = []; + for (const entry of value.bindings) { + const conversationId = normalizeConversationId(entry?.conversationId); + const parentConversationId = normalizeConversationId(entry?.parentConversationId); + const targetSessionKey = normalizeText(entry?.targetSessionKey); + if (!conversationId || !targetSessionKey) { + continue; + } + const boundAt = + typeof entry?.boundAt === "number" && Number.isFinite(entry.boundAt) + ? Math.floor(entry.boundAt) + : Date.now(); + const lastActivityAt = + typeof entry?.lastActivityAt === "number" && Number.isFinite(entry.lastActivityAt) + ? Math.floor(entry.lastActivityAt) + : boundAt; + loaded.push({ + accountId, + conversationId, + ...(parentConversationId ? { parentConversationId } : {}), + targetKind: entry?.targetKind === "subagent" ? "subagent" : "acp", + targetSessionKey, + agentId: normalizeText(entry?.agentId) || undefined, + label: normalizeText(entry?.label) || undefined, + boundBy: normalizeText(entry?.boundBy) || undefined, + boundAt, + lastActivityAt: Math.max(lastActivityAt, boundAt), + idleTimeoutMs: + typeof entry?.idleTimeoutMs === "number" && Number.isFinite(entry.idleTimeoutMs) + ? Math.max(0, Math.floor(entry.idleTimeoutMs)) + : undefined, + maxAgeMs: + typeof entry?.maxAgeMs === "number" && Number.isFinite(entry.maxAgeMs) + ? Math.max(0, Math.floor(entry.maxAgeMs)) + : undefined, + }); + } + return loaded; +} + +async function persistBindings(filePath: string, accountId: string): Promise { + const bindings = [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()] + .filter((entry) => entry.accountId === accountId) + .sort((a, b) => a.boundAt - b.boundAt); + await writeJsonFileAtomically(filePath, { + version: STORE_VERSION, + bindings, + } satisfies StoredMatrixThreadBindingState); +} + +function setBindingRecord(record: MatrixThreadBindingRecord): void { + BINDINGS_BY_ACCOUNT_CONVERSATION.set(resolveBindingKey(record), record); +} + +function removeBindingRecord(record: MatrixThreadBindingRecord): MatrixThreadBindingRecord | null { + const key = resolveBindingKey(record); + const removed = BINDINGS_BY_ACCOUNT_CONVERSATION.get(key) ?? null; + if (removed) { + BINDINGS_BY_ACCOUNT_CONVERSATION.delete(key); + } + return removed; +} + +function listBindingsForAccount(accountId: string): MatrixThreadBindingRecord[] { + return [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter( + (entry) => entry.accountId === accountId, + ); +} + +function buildMatrixBindingIntroText(params: { + metadata?: Record; + targetSessionKey: string; +}): string { + const introText = normalizeText(params.metadata?.introText); + if (introText) { + return introText; + } + const label = normalizeText(params.metadata?.label); + const agentId = + normalizeText(params.metadata?.agentId) || + resolveAgentIdFromSessionKey(params.targetSessionKey); + const base = label || agentId || "session"; + return `âš™ī¸ ${base} session active. Messages here go directly to this session.`; +} + +async function sendBindingMessage(params: { + client: MatrixClient; + accountId: string; + roomId: string; + threadId?: string; + text: string; +}): Promise { + const trimmed = params.text.trim(); + if (!trimmed) { + return null; + } + const result = await sendMessageMatrix(`room:${params.roomId}`, trimmed, { + client: params.client, + accountId: params.accountId, + ...(params.threadId ? { threadId: params.threadId } : {}), + }); + return result.messageId || null; +} + +async function sendFarewellMessage(params: { + client: MatrixClient; + accountId: string; + record: MatrixThreadBindingRecord; + defaultIdleTimeoutMs: number; + defaultMaxAgeMs: number; + reason?: string; +}): Promise { + const roomId = params.record.parentConversationId ?? params.record.conversationId; + const idleTimeoutMs = + typeof params.record.idleTimeoutMs === "number" + ? params.record.idleTimeoutMs + : params.defaultIdleTimeoutMs; + const maxAgeMs = + typeof params.record.maxAgeMs === "number" ? params.record.maxAgeMs : params.defaultMaxAgeMs; + const farewellText = resolveThreadBindingFarewellText({ + reason: params.reason, + idleTimeoutMs, + maxAgeMs, + }); + await sendBindingMessage({ + client: params.client, + accountId: params.accountId, + roomId, + threadId: + params.record.parentConversationId && + params.record.parentConversationId !== params.record.conversationId + ? params.record.conversationId + : undefined, + text: farewellText, + }).catch(() => {}); +} + +export async function createMatrixThreadBindingManager(params: { + accountId: string; + auth: MatrixAuth; + client: MatrixClient; + env?: NodeJS.ProcessEnv; + idleTimeoutMs: number; + maxAgeMs: number; + enableSweeper?: boolean; + logVerboseMessage?: (message: string) => void; +}): Promise { + const existing = MANAGERS_BY_ACCOUNT_ID.get(params.accountId); + if (existing) { + return existing; + } + + const filePath = resolveBindingsPath({ + auth: params.auth, + accountId: params.accountId, + env: params.env, + }); + const loaded = await loadBindingsFromDisk(filePath, params.accountId); + for (const record of loaded) { + setBindingRecord(record); + } + + const persist = async () => await persistBindings(filePath, params.accountId); + const defaults = { + idleTimeoutMs: params.idleTimeoutMs, + maxAgeMs: params.maxAgeMs, + }; + let persistTimer: NodeJS.Timeout | null = null; + const schedulePersist = (delayMs: number) => { + if (persistTimer) { + return; + } + persistTimer = setTimeout(() => { + persistTimer = null; + void persist(); + }, delayMs); + persistTimer.unref?.(); + }; + + const manager: MatrixThreadBindingManager = { + accountId: params.accountId, + getIdleTimeoutMs: () => defaults.idleTimeoutMs, + getMaxAgeMs: () => defaults.maxAgeMs, + getByConversation: ({ conversationId, parentConversationId }) => + listBindingsForAccount(params.accountId).find((entry) => { + if (entry.conversationId !== conversationId.trim()) { + return false; + } + if (!parentConversationId) { + return true; + } + return (entry.parentConversationId ?? "") === parentConversationId.trim(); + }), + listBySessionKey: (targetSessionKey) => + listBindingsForAccount(params.accountId).filter( + (entry) => entry.targetSessionKey === targetSessionKey.trim(), + ), + listBindings: () => listBindingsForAccount(params.accountId), + touchBinding: (bindingId, at) => { + const record = listBindingsForAccount(params.accountId).find( + (entry) => resolveBindingKey(entry) === bindingId.trim(), + ); + if (!record) { + return null; + } + const nextRecord = { + ...record, + lastActivityAt: + typeof at === "number" && Number.isFinite(at) + ? Math.max(record.lastActivityAt, Math.floor(at)) + : Date.now(), + }; + setBindingRecord(nextRecord); + schedulePersist(TOUCH_PERSIST_DELAY_MS); + return nextRecord; + }, + setIdleTimeoutBySessionKey: ({ targetSessionKey, idleTimeoutMs }) => { + const nextBindings = listBindingsForAccount(params.accountId) + .filter((entry) => entry.targetSessionKey === targetSessionKey.trim()) + .map((entry) => ({ + ...entry, + idleTimeoutMs: Math.max(0, Math.floor(idleTimeoutMs)), + })); + for (const entry of nextBindings) { + setBindingRecord(entry); + } + void persist(); + return nextBindings; + }, + setMaxAgeBySessionKey: ({ targetSessionKey, maxAgeMs }) => { + const nextBindings = listBindingsForAccount(params.accountId) + .filter((entry) => entry.targetSessionKey === targetSessionKey.trim()) + .map((entry) => ({ + ...entry, + maxAgeMs: Math.max(0, Math.floor(maxAgeMs)), + })); + for (const entry of nextBindings) { + setBindingRecord(entry); + } + void persist(); + return nextBindings; + }, + stop: () => { + if (sweepTimer) { + clearInterval(sweepTimer); + } + if (persistTimer) { + clearTimeout(persistTimer); + persistTimer = null; + } + unregisterSessionBindingAdapter({ + channel: "matrix-js", + accountId: params.accountId, + }); + if (MANAGERS_BY_ACCOUNT_ID.get(params.accountId) === manager) { + MANAGERS_BY_ACCOUNT_ID.delete(params.accountId); + } + for (const record of listBindingsForAccount(params.accountId)) { + BINDINGS_BY_ACCOUNT_CONVERSATION.delete(resolveBindingKey(record)); + } + }, + }; + + let sweepTimer: NodeJS.Timeout | null = null; + const unbindRecords = async (records: MatrixThreadBindingRecord[], reason: string) => { + if (records.length === 0) { + return []; + } + const removed = records + .map((record) => removeBindingRecord(record)) + .filter((record): record is MatrixThreadBindingRecord => Boolean(record)); + if (removed.length === 0) { + return []; + } + await persist(); + await Promise.all( + removed.map(async (record) => { + await sendFarewellMessage({ + client: params.client, + accountId: params.accountId, + record, + defaultIdleTimeoutMs: defaults.idleTimeoutMs, + defaultMaxAgeMs: defaults.maxAgeMs, + reason, + }); + }), + ); + return removed.map((record) => toSessionBindingRecord(record, defaults)); + }; + + registerSessionBindingAdapter({ + channel: "matrix-js", + accountId: params.accountId, + capabilities: { placements: ["current", "child"], bindSupported: true, unbindSupported: true }, + bind: async (input) => { + const conversationId = input.conversation.conversationId.trim(); + const parentConversationId = input.conversation.parentConversationId?.trim() || undefined; + const targetSessionKey = input.targetSessionKey.trim(); + if (!conversationId || !targetSessionKey) { + return null; + } + + let boundConversationId = conversationId; + let boundParentConversationId = parentConversationId; + const introText = buildMatrixBindingIntroText({ + metadata: input.metadata, + targetSessionKey, + }); + + if (input.placement === "child") { + const roomId = parentConversationId || conversationId; + const rootEventId = await sendBindingMessage({ + client: params.client, + accountId: params.accountId, + roomId, + text: introText, + }); + if (!rootEventId) { + return null; + } + boundConversationId = rootEventId; + boundParentConversationId = roomId; + } + + const now = Date.now(); + const record: MatrixThreadBindingRecord = { + accountId: params.accountId, + conversationId: boundConversationId, + ...(boundParentConversationId ? { parentConversationId: boundParentConversationId } : {}), + targetKind: toMatrixBindingTargetKind(input.targetKind), + targetSessionKey, + agentId: + normalizeText(input.metadata?.agentId) || resolveAgentIdFromSessionKey(targetSessionKey), + label: normalizeText(input.metadata?.label) || undefined, + boundBy: normalizeText(input.metadata?.boundBy) || "system", + boundAt: now, + lastActivityAt: now, + idleTimeoutMs: defaults.idleTimeoutMs, + maxAgeMs: defaults.maxAgeMs, + }; + setBindingRecord(record); + await persist(); + + if (input.placement === "current" && introText) { + const roomId = boundParentConversationId || boundConversationId; + const threadId = + boundParentConversationId && boundParentConversationId !== boundConversationId + ? boundConversationId + : undefined; + await sendBindingMessage({ + client: params.client, + accountId: params.accountId, + roomId, + threadId, + text: introText, + }).catch(() => {}); + } + + return toSessionBindingRecord(record, defaults); + }, + listBySession: (targetSessionKey) => + manager + .listBySessionKey(targetSessionKey) + .map((record) => toSessionBindingRecord(record, defaults)), + resolveByConversation: (ref) => { + const record = manager.getByConversation({ + conversationId: ref.conversationId, + parentConversationId: ref.parentConversationId, + }); + return record ? toSessionBindingRecord(record, defaults) : null; + }, + setIdleTimeoutBySession: ({ targetSessionKey, idleTimeoutMs }) => + manager + .setIdleTimeoutBySessionKey({ targetSessionKey, idleTimeoutMs }) + .map((record) => toSessionBindingRecord(record, defaults)), + setMaxAgeBySession: ({ targetSessionKey, maxAgeMs }) => + manager + .setMaxAgeBySessionKey({ targetSessionKey, maxAgeMs }) + .map((record) => toSessionBindingRecord(record, defaults)), + touch: (bindingId, at) => { + manager.touchBinding(bindingId, at); + }, + unbind: async (input) => { + const removed = await unbindRecords( + listBindingsForAccount(params.accountId).filter((record) => { + if (input.bindingId?.trim()) { + return resolveBindingKey(record) === input.bindingId.trim(); + } + if (input.targetSessionKey?.trim()) { + return record.targetSessionKey === input.targetSessionKey.trim(); + } + return false; + }), + input.reason, + ); + return removed; + }, + }); + + if (params.enableSweeper !== false) { + sweepTimer = setInterval(() => { + const now = Date.now(); + const expired = listBindingsForAccount(params.accountId) + .map((record) => ({ + record, + lifecycle: resolveEffectiveBindingExpiry({ + record, + defaultIdleTimeoutMs: defaults.idleTimeoutMs, + defaultMaxAgeMs: defaults.maxAgeMs, + }), + })) + .filter( + ( + entry, + ): entry is { + record: MatrixThreadBindingRecord; + lifecycle: { expiresAt: number; reason: "idle-expired" | "max-age-expired" }; + } => + typeof entry.lifecycle.expiresAt === "number" && + entry.lifecycle.expiresAt <= now && + Boolean(entry.lifecycle.reason), + ); + if (expired.length === 0) { + return; + } + void Promise.all( + expired.map(async ({ record, lifecycle }) => { + params.logVerboseMessage?.( + `matrix: auto-unbinding ${record.conversationId} due to ${lifecycle.reason}`, + ); + await unbindRecords([record], lifecycle.reason); + }), + ); + }, THREAD_BINDINGS_SWEEP_INTERVAL_MS); + } + + MANAGERS_BY_ACCOUNT_ID.set(params.accountId, manager); + return manager; +} + +export function getMatrixThreadBindingManager( + accountId: string, +): MatrixThreadBindingManager | null { + return MANAGERS_BY_ACCOUNT_ID.get(accountId) ?? null; +} + +export function setMatrixThreadBindingIdleTimeoutBySessionKey(params: { + accountId: string; + targetSessionKey: string; + idleTimeoutMs: number; +}): SessionBindingRecord[] { + const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId); + if (!manager) { + return []; + } + return manager.setIdleTimeoutBySessionKey(params).map((record) => + toSessionBindingRecord(record, { + idleTimeoutMs: manager.getIdleTimeoutMs(), + maxAgeMs: manager.getMaxAgeMs(), + }), + ); +} + +export function setMatrixThreadBindingMaxAgeBySessionKey(params: { + accountId: string; + targetSessionKey: string; + maxAgeMs: number; +}): SessionBindingRecord[] { + const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId); + if (!manager) { + return []; + } + return manager.setMaxAgeBySessionKey(params).map((record) => + toSessionBindingRecord(record, { + idleTimeoutMs: manager.getIdleTimeoutMs(), + maxAgeMs: manager.getMaxAgeMs(), + }), + ); +} + +export function resetMatrixThreadBindingsForTests(): void { + for (const manager of MANAGERS_BY_ACCOUNT_ID.values()) { + manager.stop(); + } + MANAGERS_BY_ACCOUNT_ID.clear(); + BINDINGS_BY_ACCOUNT_CONVERSATION.clear(); +} diff --git a/extensions/matrix-js/src/tool-actions.test.ts b/extensions/matrix-js/src/tool-actions.test.ts index 3099b476232..dc520503822 100644 --- a/extensions/matrix-js/src/tool-actions.test.ts +++ b/extensions/matrix-js/src/tool-actions.test.ts @@ -7,14 +7,22 @@ const mocks = vi.hoisted(() => ({ reactMatrixMessage: vi.fn(), listMatrixReactions: vi.fn(), removeMatrixReactions: vi.fn(), + sendMatrixMessage: vi.fn(), + listMatrixPins: vi.fn(), + getMatrixMemberInfo: vi.fn(), + getMatrixRoomInfo: vi.fn(), })); vi.mock("./matrix/actions.js", async () => { const actual = await vi.importActual("./matrix/actions.js"); return { ...actual, + getMatrixMemberInfo: mocks.getMatrixMemberInfo, + getMatrixRoomInfo: mocks.getMatrixRoomInfo, listMatrixReactions: mocks.listMatrixReactions, + listMatrixPins: mocks.listMatrixPins, removeMatrixReactions: mocks.removeMatrixReactions, + sendMatrixMessage: mocks.sendMatrixMessage, voteMatrixPoll: mocks.voteMatrixPoll, }; }); @@ -39,7 +47,14 @@ describe("handleMatrixAction pollVote", () => { maxSelections: 2, }); mocks.listMatrixReactions.mockResolvedValue([{ key: "👍", count: 1, users: ["@u:example"] }]); + mocks.listMatrixPins.mockResolvedValue({ pinned: ["$pin"], events: [] }); mocks.removeMatrixReactions.mockResolvedValue({ removed: 1 }); + mocks.sendMatrixMessage.mockResolvedValue({ + messageId: "$sent", + roomId: "!room:example", + }); + mocks.getMatrixMemberInfo.mockResolvedValue({ userId: "@u:example" }); + mocks.getMatrixRoomInfo.mockResolvedValue({ roomId: "!room:example" }); }); it("parses snake_case vote params and forwards normalized selectors", async () => { @@ -141,4 +156,67 @@ describe("handleMatrixAction pollVote", () => { reactions: [{ key: "👍", count: 1 }], }); }); + + it("passes account-scoped opts to message sends", async () => { + await handleMatrixAction( + { + action: "sendMessage", + accountId: "ops", + to: "room:!room:example", + content: "hello", + threadId: "$thread", + }, + { channels: { "matrix-js": { actions: { messages: true } } } } as CoreConfig, + ); + + expect(mocks.sendMatrixMessage).toHaveBeenCalledWith("room:!room:example", "hello", { + accountId: "ops", + mediaUrl: undefined, + replyToId: undefined, + threadId: "$thread", + }); + }); + + it("passes account-scoped opts to pin listing", async () => { + await handleMatrixAction( + { + action: "listPins", + accountId: "ops", + roomId: "!room:example", + }, + { channels: { "matrix-js": { actions: { pins: true } } } } as CoreConfig, + ); + + expect(mocks.listMatrixPins).toHaveBeenCalledWith("!room:example", { + accountId: "ops", + }); + }); + + it("passes account-scoped opts to member and room info actions", async () => { + await handleMatrixAction( + { + action: "memberInfo", + accountId: "ops", + userId: "@u:example", + roomId: "!room:example", + }, + { channels: { "matrix-js": { actions: { memberInfo: true } } } } as CoreConfig, + ); + await handleMatrixAction( + { + action: "channelInfo", + accountId: "ops", + roomId: "!room:example", + }, + { channels: { "matrix-js": { actions: { channelInfo: true } } } } as CoreConfig, + ); + + expect(mocks.getMatrixMemberInfo).toHaveBeenCalledWith("@u:example", { + accountId: "ops", + roomId: "!room:example", + }); + expect(mocks.getMatrixRoomInfo).toHaveBeenCalledWith("!room:example", { + accountId: "ops", + }); + }); }); diff --git a/extensions/matrix-js/src/tool-actions.ts b/extensions/matrix-js/src/tool-actions.ts index bf6f1bd42bb..d15dc24a953 100644 --- a/extensions/matrix-js/src/tool-actions.ts +++ b/extensions/matrix-js/src/tool-actions.ts @@ -130,6 +130,7 @@ export async function handleMatrixAction( const action = readStringParam(params, "action", { required: true }); const accountId = readStringParam(params, "accountId") ?? undefined; const isActionEnabled = createActionGate(cfg.channels?.["matrix-js"]?.actions); + const clientOpts = accountId ? { accountId } : {}; if (reactionActions.has(action)) { if (!isActionEnabled("reactions")) { @@ -199,6 +200,7 @@ export async function handleMatrixAction( mediaUrl: mediaUrl ?? undefined, replyToId: replyToId ?? undefined, threadId: threadId ?? undefined, + ...clientOpts, }); return jsonResult({ ok: true, result }); } @@ -206,14 +208,17 @@ export async function handleMatrixAction( const roomId = readRoomId(params); const messageId = readStringParam(params, "messageId", { required: true }); const content = readStringParam(params, "content", { required: true }); - const result = await editMatrixMessage(roomId, messageId, content); + const result = await editMatrixMessage(roomId, messageId, content, clientOpts); return jsonResult({ ok: true, result }); } case "deleteMessage": { const roomId = readRoomId(params); const messageId = readStringParam(params, "messageId", { required: true }); const reason = readStringParam(params, "reason"); - await deleteMatrixMessage(roomId, messageId, { reason: reason ?? undefined }); + await deleteMatrixMessage(roomId, messageId, { + reason: reason ?? undefined, + ...clientOpts, + }); return jsonResult({ ok: true, deleted: true }); } case "readMessages": { @@ -225,6 +230,7 @@ export async function handleMatrixAction( limit: limit ?? undefined, before: before ?? undefined, after: after ?? undefined, + ...clientOpts, }); return jsonResult({ ok: true, ...result }); } @@ -240,15 +246,15 @@ export async function handleMatrixAction( const roomId = readRoomId(params); if (action === "pinMessage") { const messageId = readStringParam(params, "messageId", { required: true }); - const result = await pinMatrixMessage(roomId, messageId); + const result = await pinMatrixMessage(roomId, messageId, clientOpts); return jsonResult({ ok: true, pinned: result.pinned }); } if (action === "unpinMessage") { const messageId = readStringParam(params, "messageId", { required: true }); - const result = await unpinMatrixMessage(roomId, messageId); + const result = await unpinMatrixMessage(roomId, messageId, clientOpts); return jsonResult({ ok: true, pinned: result.pinned }); } - const result = await listMatrixPins(roomId); + const result = await listMatrixPins(roomId, clientOpts); return jsonResult({ ok: true, pinned: result.pinned, events: result.events }); } @@ -260,6 +266,7 @@ export async function handleMatrixAction( const roomId = readStringParam(params, "roomId") ?? readStringParam(params, "channelId"); const result = await getMatrixMemberInfo(userId, { roomId: roomId ?? undefined, + ...clientOpts, }); return jsonResult({ ok: true, member: result }); } @@ -269,7 +276,7 @@ export async function handleMatrixAction( throw new Error("Matrix room info is disabled."); } const roomId = readRoomId(params); - const result = await getMatrixRoomInfo(roomId); + const result = await getMatrixRoomInfo(roomId, clientOpts); return jsonResult({ ok: true, room: result }); } diff --git a/extensions/matrix-js/src/types.ts b/extensions/matrix-js/src/types.ts index 93507570608..91f7ee896e5 100644 --- a/extensions/matrix-js/src/types.ts +++ b/extensions/matrix-js/src/types.ts @@ -40,6 +40,14 @@ export type MatrixActionConfig = { verification?: boolean; }; +export type MatrixThreadBindingsConfig = { + enabled?: boolean; + idleHours?: number; + maxAgeHours?: number; + spawnSubagentSessions?: boolean; + spawnAcpSessions?: boolean; +}; + /** Per-account Matrix config (excludes the accounts field to prevent recursion). */ export type MatrixAccountConfig = Omit; @@ -90,6 +98,8 @@ export type MatrixConfig = { ackReactionScope?: "group-mentions" | "group-all" | "direct" | "all" | "none" | "off"; /** Inbound reaction notifications for bot-authored Matrix messages. */ reactionNotifications?: "off" | "own"; + /** Thread/session binding behavior for Matrix room threads. */ + threadBindings?: MatrixThreadBindingsConfig; /** Whether Matrix-js should auto-request self verification on startup when unverified. */ startupVerification?: "off" | "if-unverified"; /** Cooldown window for automatic startup verification requests. Default: 24 hours. */ diff --git a/extensions/telegram/src/thread-bindings.ts b/extensions/telegram/src/thread-bindings.ts index aaf13e15561..4b6b688b42d 100644 --- a/extensions/telegram/src/thread-bindings.ts +++ b/extensions/telegram/src/thread-bindings.ts @@ -587,6 +587,28 @@ export function createTelegramThreadBindingManager( }) : null; }, + setIdleTimeoutBySession: ({ targetSessionKey, idleTimeoutMs: nextIdleTimeoutMs }) => + setTelegramThreadBindingIdleTimeoutBySessionKey({ + targetSessionKey, + accountId, + idleTimeoutMs: nextIdleTimeoutMs, + }).map((entry) => + toSessionBindingRecord(entry, { + idleTimeoutMs, + maxAgeMs, + }), + ), + setMaxAgeBySession: ({ targetSessionKey, maxAgeMs: nextMaxAgeMs }) => + setTelegramThreadBindingMaxAgeBySessionKey({ + targetSessionKey, + accountId, + maxAgeMs: nextMaxAgeMs, + }).map((entry) => + toSessionBindingRecord(entry, { + idleTimeoutMs, + maxAgeMs, + }), + ), touch: (bindingId, at) => { const conversationId = resolveThreadBindingConversationIdFromBindingId({ accountId, diff --git a/src/auto-reply/reply/channel-context.ts b/src/auto-reply/reply/channel-context.ts index d8ffb261eb8..fab3625d51c 100644 --- a/src/auto-reply/reply/channel-context.ts +++ b/src/auto-reply/reply/channel-context.ts @@ -24,6 +24,10 @@ export function isTelegramSurface(params: DiscordSurfaceParams): boolean { return resolveCommandSurfaceChannel(params) === "telegram"; } +export function isMatrixSurface(params: DiscordSurfaceParams): boolean { + return resolveCommandSurfaceChannel(params) === "matrix-js"; +} + export function resolveCommandSurfaceChannel(params: DiscordSurfaceParams): string { const channel = params.ctx.OriginatingChannel ?? diff --git a/src/auto-reply/reply/commands-acp/context.test.ts b/src/auto-reply/reply/commands-acp/context.test.ts index 5b1e60ad1fc..b33612bc233 100644 --- a/src/auto-reply/reply/commands-acp/context.test.ts +++ b/src/auto-reply/reply/commands-acp/context.test.ts @@ -141,6 +141,45 @@ describe("commands-acp context", () => { expect(resolveAcpCommandConversationId(params)).toBe("123456789"); }); + it("resolves Matrix thread conversation ids from room targets", () => { + const params = buildCommandTestParams("/acp status", baseCfg, { + Provider: "matrix-js", + Surface: "matrix-js", + OriginatingChannel: "matrix-js", + OriginatingTo: "room:!room:example", + MessageThreadId: "$thread-42", + AccountId: "work", + }); + + expect(resolveAcpCommandBindingContext(params)).toEqual({ + channel: "matrix-js", + accountId: "work", + threadId: "$thread-42", + conversationId: "$thread-42", + parentConversationId: "!room:example", + }); + expect(resolveAcpCommandConversationId(params)).toBe("$thread-42"); + expect(resolveAcpCommandParentConversationId(params)).toBe("!room:example"); + }); + + it("resolves Matrix room conversation ids outside thread context", () => { + const params = buildCommandTestParams("/acp status", baseCfg, { + Provider: "matrix-js", + Surface: "matrix-js", + OriginatingChannel: "matrix-js", + OriginatingTo: "room:!room:example", + AccountId: "work", + }); + + expect(resolveAcpCommandBindingContext(params)).toEqual({ + channel: "matrix-js", + accountId: "work", + threadId: undefined, + conversationId: "!room:example", + parentConversationId: "!room:example", + }); + }); + it("builds Feishu topic conversation ids from chat target + root message id", () => { const params = buildCommandTestParams("/acp status", baseCfg, { Provider: "feishu", diff --git a/src/auto-reply/reply/commands-acp/context.ts b/src/auto-reply/reply/commands-acp/context.ts index 59db08384af..6ed0da06b0b 100644 --- a/src/auto-reply/reply/commands-acp/context.ts +++ b/src/auto-reply/reply/commands-acp/context.ts @@ -10,6 +10,10 @@ import { buildFeishuConversationId } from "../../../plugin-sdk/feishu.js"; import { parseAgentSessionKey } from "../../../routing/session-key.js"; import type { HandleCommandsParams } from "../commands-types.js"; import { parseDiscordParentChannelFromSessionKey } from "../discord-parent-channel.js"; +import { + resolveMatrixConversationId, + resolveMatrixParentConversationId, +} from "../matrix-context.js"; import { resolveTelegramConversationId } from "../telegram-context.js"; function parseFeishuTargetId(raw: unknown): string | undefined { @@ -131,6 +135,18 @@ export function resolveAcpCommandThreadId(params: HandleCommandsParams): string export function resolveAcpCommandConversationId(params: HandleCommandsParams): string | undefined { const channel = resolveAcpCommandChannel(params); + if (channel === "matrix-js") { + return resolveMatrixConversationId({ + ctx: { + MessageThreadId: params.ctx.MessageThreadId, + OriginatingTo: params.ctx.OriginatingTo, + To: params.ctx.To, + }, + command: { + to: params.command.to, + }, + }); + } if (channel === "telegram") { const telegramConversationId = resolveTelegramConversationId({ ctx: { @@ -201,6 +217,18 @@ export function resolveAcpCommandParentConversationId( params: HandleCommandsParams, ): string | undefined { const channel = resolveAcpCommandChannel(params); + if (channel === "matrix-js") { + return resolveMatrixParentConversationId({ + ctx: { + MessageThreadId: params.ctx.MessageThreadId, + OriginatingTo: params.ctx.OriginatingTo, + To: params.ctx.To, + }, + command: { + to: params.command.to, + }, + }); + } if (channel === "telegram") { return ( parseTelegramChatIdFromTarget(params.ctx.OriginatingTo) ?? diff --git a/src/auto-reply/reply/commands-session-lifecycle.test.ts b/src/auto-reply/reply/commands-session-lifecycle.test.ts index bb56ef82bd9..4d6040373f6 100644 --- a/src/auto-reply/reply/commands-session-lifecycle.test.ts +++ b/src/auto-reply/reply/commands-session-lifecycle.test.ts @@ -1,9 +1,6 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import { telegramPlugin } from "../../../extensions/telegram/src/channel.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { SessionBindingRecord } from "../../infra/outbound/session-binding-service.js"; -import { setActivePluginRegistry } from "../../plugins/runtime.js"; -import { createTestRegistry } from "../../test-utils/channel-plugins.js"; const hoisted = vi.hoisted(() => { const getThreadBindingManagerMock = vi.fn(); @@ -22,34 +19,24 @@ const hoisted = vi.hoisted(() => { }; }); -vi.mock("../../plugins/runtime/index.js", async () => { - const discordThreadBindings = await vi.importActual< - typeof import("../../../extensions/discord/src/monitor/thread-bindings.js") - >("../../../extensions/discord/src/monitor/thread-bindings.js"); +vi.mock("../../discord/monitor/thread-bindings.js", async (importOriginal) => { + const actual = await importOriginal(); return { - createPluginRuntime: () => ({ - channel: { - discord: { - threadBindings: { - getManager: hoisted.getThreadBindingManagerMock, - resolveIdleTimeoutMs: discordThreadBindings.resolveThreadBindingIdleTimeoutMs, - resolveInactivityExpiresAt: - discordThreadBindings.resolveThreadBindingInactivityExpiresAt, - resolveMaxAgeMs: discordThreadBindings.resolveThreadBindingMaxAgeMs, - resolveMaxAgeExpiresAt: discordThreadBindings.resolveThreadBindingMaxAgeExpiresAt, - setIdleTimeoutBySessionKey: hoisted.setThreadBindingIdleTimeoutBySessionKeyMock, - setMaxAgeBySessionKey: hoisted.setThreadBindingMaxAgeBySessionKeyMock, - unbindBySessionKey: vi.fn(), - }, - }, - telegram: { - threadBindings: { - setIdleTimeoutBySessionKey: hoisted.setTelegramThreadBindingIdleTimeoutBySessionKeyMock, - setMaxAgeBySessionKey: hoisted.setTelegramThreadBindingMaxAgeBySessionKeyMock, - }, - }, - }, - }), + ...actual, + getThreadBindingManager: hoisted.getThreadBindingManagerMock, + setThreadBindingIdleTimeoutBySessionKey: hoisted.setThreadBindingIdleTimeoutBySessionKeyMock, + setThreadBindingMaxAgeBySessionKey: hoisted.setThreadBindingMaxAgeBySessionKeyMock, + }; +}); + +vi.mock("../../telegram/thread-bindings.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + setTelegramThreadBindingIdleTimeoutBySessionKey: + hoisted.setTelegramThreadBindingIdleTimeoutBySessionKeyMock, + setTelegramThreadBindingMaxAgeBySessionKey: + hoisted.setTelegramThreadBindingMaxAgeBySessionKeyMock, }; }); @@ -60,9 +47,62 @@ vi.mock("../../infra/outbound/session-binding-service.js", async (importOriginal ...actual, getSessionBindingService: () => ({ bind: vi.fn(), - getCapabilities: vi.fn(), + getCapabilities: vi.fn(() => ({ + adapterAvailable: true, + bindSupported: true, + unbindSupported: true, + placements: ["current", "child"], + })), listBySession: vi.fn(), resolveByConversation: (ref: unknown) => hoisted.sessionBindingResolveByConversationMock(ref), + setIdleTimeoutBySession: ({ + channel, + targetSessionKey, + accountId, + idleTimeoutMs, + }: { + channel: string; + targetSessionKey: string; + accountId: string; + idleTimeoutMs: number; + }) => + Promise.resolve( + channel === "telegram" + ? hoisted.setTelegramThreadBindingIdleTimeoutBySessionKeyMock({ + targetSessionKey, + accountId, + idleTimeoutMs, + }) + : hoisted.setThreadBindingIdleTimeoutBySessionKeyMock({ + targetSessionKey, + accountId, + idleTimeoutMs, + }), + ), + setMaxAgeBySession: ({ + channel, + targetSessionKey, + accountId, + maxAgeMs, + }: { + channel: string; + targetSessionKey: string; + accountId: string; + maxAgeMs: number; + }) => + Promise.resolve( + channel === "telegram" + ? hoisted.setTelegramThreadBindingMaxAgeBySessionKeyMock({ + targetSessionKey, + accountId, + maxAgeMs, + }) + : hoisted.setThreadBindingMaxAgeBySessionKeyMock({ + targetSessionKey, + accountId, + maxAgeMs, + }), + ), touch: vi.fn(), unbind: vi.fn(), }), @@ -76,20 +116,6 @@ const baseCfg = { session: { mainKey: "main", scope: "per-sender" }, } satisfies OpenClawConfig; -type FakeBinding = { - accountId: string; - channelId: string; - threadId: string; - targetKind: "subagent" | "acp"; - targetSessionKey: string; - agentId: string; - boundBy: string; - boundAt: number; - lastActivityAt: number; - idleTimeoutMs?: number; - maxAgeMs?: number; -}; - function createDiscordCommandParams(commandBody: string, overrides?: Record) { return buildCommandTestParams(commandBody, baseCfg, { Provider: "discord", @@ -114,18 +140,37 @@ function createTelegramCommandParams(commandBody: string, overrides?: Record = {}): FakeBinding { - const now = Date.now(); +function createMatrixCommandParams(commandBody: string, overrides?: Record) { + return buildCommandTestParams(commandBody, baseCfg, { + Provider: "matrix-js", + Surface: "matrix-js", + OriginatingChannel: "matrix-js", + OriginatingTo: "room:!room:example", + To: "room:!room:example", + AccountId: "default", + MessageThreadId: "$thread-1", + ...overrides, + }); +} + +function createDiscordBinding(overrides?: Partial): SessionBindingRecord { return { - accountId: "default", - channelId: "parent-1", - threadId: "thread-1", - targetKind: "subagent", + bindingId: "default:thread-1", targetSessionKey: "agent:main:subagent:child", - agentId: "main", - boundBy: "user-1", - boundAt: now, - lastActivityAt: now, + targetKind: "subagent", + conversation: { + channel: "discord", + accountId: "default", + conversationId: "thread-1", + }, + status: "active", + boundAt: Date.now(), + metadata: { + boundBy: "user-1", + lastActivityAt: Date.now(), + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + }, ...overrides, }; } @@ -152,34 +197,31 @@ function createTelegramBinding(overrides?: Partial): Sessi }; } -function expectIdleTimeoutSetReply( - mock: ReturnType, - text: string, - idleTimeoutMs: number, - idleTimeoutLabel: string, -) { - expect(mock).toHaveBeenCalledWith({ - targetSessionKey: "agent:main:subagent:child", - accountId: "default", - idleTimeoutMs, - }); - expect(text).toContain(`Idle timeout set to ${idleTimeoutLabel}`); - expect(text).toContain("2026-02-20T02:00:00.000Z"); -} - -function createFakeThreadBindingManager(binding: FakeBinding | null) { +function createMatrixBinding(overrides?: Partial): SessionBindingRecord { return { - getByThreadId: vi.fn((_threadId: string) => binding), - getIdleTimeoutMs: vi.fn(() => 24 * 60 * 60 * 1000), - getMaxAgeMs: vi.fn(() => 0), + bindingId: "default:!room:example:$thread-1", + targetSessionKey: "agent:main:subagent:child", + targetKind: "subagent", + conversation: { + channel: "matrix-js", + accountId: "default", + conversationId: "$thread-1", + parentConversationId: "!room:example", + }, + status: "active", + boundAt: Date.now(), + metadata: { + boundBy: "user-1", + lastActivityAt: Date.now(), + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + }, + ...overrides, }; } describe("/session idle and /session max-age", () => { beforeEach(() => { - setActivePluginRegistry( - createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]), - ); hoisted.getThreadBindingManagerMock.mockReset(); hoisted.setThreadBindingIdleTimeoutBySessionKeyMock.mockReset(); hoisted.setThreadBindingMaxAgeBySessionKeyMock.mockReset(); @@ -193,11 +235,12 @@ describe("/session idle and /session max-age", () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-02-20T00:00:00.000Z")); - const binding = createFakeBinding(); - hoisted.getThreadBindingManagerMock.mockReturnValue(createFakeThreadBindingManager(binding)); + const binding = createDiscordBinding(); + hoisted.sessionBindingResolveByConversationMock.mockReturnValue(binding); hoisted.setThreadBindingIdleTimeoutBySessionKeyMock.mockReturnValue([ { - ...binding, + targetSessionKey: binding.targetSessionKey, + boundAt: Date.now(), lastActivityAt: Date.now(), idleTimeoutMs: 2 * 60 * 60 * 1000, }, @@ -206,23 +249,28 @@ describe("/session idle and /session max-age", () => { const result = await handleSessionCommand(createDiscordCommandParams("/session idle 2h"), true); const text = result?.reply?.text ?? ""; - expectIdleTimeoutSetReply( - hoisted.setThreadBindingIdleTimeoutBySessionKeyMock, - text, - 2 * 60 * 60 * 1000, - "2h", - ); + expect(hoisted.setThreadBindingIdleTimeoutBySessionKeyMock).toHaveBeenCalledWith({ + targetSessionKey: "agent:main:subagent:child", + accountId: "default", + idleTimeoutMs: 2 * 60 * 60 * 1000, + }); + expect(text).toContain("Idle timeout set to 2h"); + expect(text).toContain("2026-02-20T02:00:00.000Z"); }); it("shows active idle timeout when no value is provided", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-02-20T00:00:00.000Z")); - const binding = createFakeBinding({ - idleTimeoutMs: 2 * 60 * 60 * 1000, - lastActivityAt: Date.now(), + const binding = createDiscordBinding({ + metadata: { + boundBy: "user-1", + lastActivityAt: Date.now(), + idleTimeoutMs: 2 * 60 * 60 * 1000, + maxAgeMs: 0, + }, }); - hoisted.getThreadBindingManagerMock.mockReturnValue(createFakeThreadBindingManager(binding)); + hoisted.sessionBindingResolveByConversationMock.mockReturnValue(binding); const result = await handleSessionCommand(createDiscordCommandParams("/session idle"), true); expect(result?.reply?.text).toContain("Idle timeout active (2h"); @@ -233,11 +281,11 @@ describe("/session idle and /session max-age", () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-02-20T00:00:00.000Z")); - const binding = createFakeBinding(); - hoisted.getThreadBindingManagerMock.mockReturnValue(createFakeThreadBindingManager(binding)); + const binding = createDiscordBinding(); + hoisted.sessionBindingResolveByConversationMock.mockReturnValue(binding); hoisted.setThreadBindingMaxAgeBySessionKeyMock.mockReturnValue([ { - ...binding, + targetSessionKey: binding.targetSessionKey, boundAt: Date.now(), maxAgeMs: 3 * 60 * 60 * 1000, }, @@ -278,12 +326,13 @@ describe("/session idle and /session max-age", () => { ); const text = result?.reply?.text ?? ""; - expectIdleTimeoutSetReply( - hoisted.setTelegramThreadBindingIdleTimeoutBySessionKeyMock, - text, - 2 * 60 * 60 * 1000, - "2h", - ); + expect(hoisted.setTelegramThreadBindingIdleTimeoutBySessionKeyMock).toHaveBeenCalledWith({ + targetSessionKey: "agent:main:subagent:child", + accountId: "default", + idleTimeoutMs: 2 * 60 * 60 * 1000, + }); + expect(text).toContain("Idle timeout set to 2h"); + expect(text).toContain("2026-02-20T02:00:00.000Z"); }); it("reports Telegram max-age expiry from the original bind time", async () => { @@ -318,10 +367,49 @@ describe("/session idle and /session max-age", () => { expect(text).toContain("2026-02-20T01:00:00.000Z"); }); + it("sets idle timeout for focused Matrix threads", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-02-20T00:00:00.000Z")); + + hoisted.sessionBindingResolveByConversationMock.mockReturnValue(createMatrixBinding()); + hoisted.setThreadBindingIdleTimeoutBySessionKeyMock.mockReturnValue([ + { + targetSessionKey: "agent:main:subagent:child", + boundAt: Date.now(), + lastActivityAt: Date.now(), + idleTimeoutMs: 2 * 60 * 60 * 1000, + }, + ]); + + const result = await handleSessionCommand(createMatrixCommandParams("/session idle 2h"), true); + const text = result?.reply?.text ?? ""; + + expect(hoisted.setThreadBindingIdleTimeoutBySessionKeyMock).toHaveBeenCalledWith({ + targetSessionKey: "agent:main:subagent:child", + accountId: "default", + idleTimeoutMs: 2 * 60 * 60 * 1000, + }); + expect(text).toContain("Idle timeout set to 2h"); + expect(text).toContain("2026-02-20T02:00:00.000Z"); + }); + it("disables max age when set to off", async () => { - const binding = createFakeBinding({ maxAgeMs: 2 * 60 * 60 * 1000 }); - hoisted.getThreadBindingManagerMock.mockReturnValue(createFakeThreadBindingManager(binding)); - hoisted.setThreadBindingMaxAgeBySessionKeyMock.mockReturnValue([{ ...binding, maxAgeMs: 0 }]); + const binding = createDiscordBinding({ + metadata: { + boundBy: "user-1", + lastActivityAt: Date.now(), + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 2 * 60 * 60 * 1000, + }, + }); + hoisted.sessionBindingResolveByConversationMock.mockReturnValue(binding); + hoisted.setThreadBindingMaxAgeBySessionKeyMock.mockReturnValue([ + { + targetSessionKey: binding.targetSessionKey, + boundAt: binding.boundAt, + maxAgeMs: 0, + }, + ]); const result = await handleSessionCommand( createDiscordCommandParams("/session max-age off"), @@ -340,13 +428,20 @@ describe("/session idle and /session max-age", () => { const params = buildCommandTestParams("/session idle 2h", baseCfg); const result = await handleSessionCommand(params, true); expect(result?.reply?.text).toContain( - "currently available for Discord and Telegram bound sessions", + "currently available for Discord, Matrix, and Telegram bound sessions", ); }); it("requires binding owner for lifecycle updates", async () => { - const binding = createFakeBinding({ boundBy: "owner-1" }); - hoisted.getThreadBindingManagerMock.mockReturnValue(createFakeThreadBindingManager(binding)); + const binding = createDiscordBinding({ + metadata: { + boundBy: "owner-1", + lastActivityAt: Date.now(), + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + }, + }); + hoisted.sessionBindingResolveByConversationMock.mockReturnValue(binding); const result = await handleSessionCommand( createDiscordCommandParams("/session idle 2h", { diff --git a/src/auto-reply/reply/commands-session.ts b/src/auto-reply/reply/commands-session.ts index 0359c77331b..cebc533daaa 100644 --- a/src/auto-reply/reply/commands-session.ts +++ b/src/auto-reply/reply/commands-session.ts @@ -7,27 +7,29 @@ import { getSessionBindingService } from "../../infra/outbound/session-binding-s import type { SessionBindingRecord } from "../../infra/outbound/session-binding-service.js"; import { scheduleGatewaySigusr1Restart, triggerOpenClawRestart } from "../../infra/restart.js"; import { loadCostUsageSummary, loadSessionCostSummary } from "../../infra/session-cost-usage.js"; -import { createPluginRuntime } from "../../plugins/runtime/index.js"; import { formatTokenCount, formatUsd } from "../../utils/usage-format.js"; import { parseActivationCommand } from "../group-activation.js"; import { parseSendPolicyCommand } from "../send-policy.js"; import { normalizeFastMode, normalizeUsageDisplay, resolveResponseUsageMode } from "../thinking.js"; -import { isDiscordSurface, isTelegramSurface, resolveChannelAccountId } from "./channel-context.js"; +import { + isDiscordSurface, + isMatrixSurface, + isTelegramSurface, + resolveChannelAccountId, +} from "./channel-context.js"; import { handleAbortTrigger, handleStopCommand } from "./commands-session-abort.js"; import { persistSessionEntry } from "./commands-session-store.js"; import type { CommandHandler } from "./commands-types.js"; +import { + resolveMatrixConversationId, + resolveMatrixParentConversationId, +} from "./matrix-context.js"; import { resolveTelegramConversationId } from "./telegram-context.js"; const SESSION_COMMAND_PREFIX = "/session"; const SESSION_DURATION_OFF_VALUES = new Set(["off", "disable", "disabled", "none", "0"]); const SESSION_ACTION_IDLE = "idle"; const SESSION_ACTION_MAX_AGE = "max-age"; -let cachedChannelRuntime: ReturnType["channel"] | undefined; - -function getChannelRuntime() { - cachedChannelRuntime ??= createPluginRuntime().channel; - return cachedChannelRuntime; -} function resolveSessionCommandUsage() { return "Usage: /session idle | /session max-age (example: /session idle 24h)"; @@ -55,7 +57,7 @@ function formatSessionExpiry(expiresAt: number) { return new Date(expiresAt).toISOString(); } -function resolveTelegramBindingDurationMs( +function resolveBindingDurationMs( binding: SessionBindingRecord, key: "idleTimeoutMs" | "maxAgeMs", fallbackMs: number, @@ -67,7 +69,7 @@ function resolveTelegramBindingDurationMs( return Math.max(0, Math.floor(raw)); } -function resolveTelegramBindingLastActivityAt(binding: SessionBindingRecord): number { +function resolveBindingLastActivityAt(binding: SessionBindingRecord): number { const raw = binding.metadata?.lastActivityAt; if (typeof raw !== "number" || !Number.isFinite(raw)) { return binding.boundAt; @@ -75,11 +77,37 @@ function resolveTelegramBindingLastActivityAt(binding: SessionBindingRecord): nu return Math.max(Math.floor(raw), binding.boundAt); } -function resolveTelegramBindingBoundBy(binding: SessionBindingRecord): string { +function resolveBindingBoundBy(binding: SessionBindingRecord): string { const raw = binding.metadata?.boundBy; return typeof raw === "string" ? raw.trim() : ""; } +function resolveBindingConversationLabel(channel: string): "thread" | "conversation" { + return channel === "telegram" ? "conversation" : "thread"; +} + +function resolveIdleExpiresAt( + binding: SessionBindingRecord, + fallbackIdleTimeoutMs: number, +): number | undefined { + const idleTimeoutMs = resolveBindingDurationMs(binding, "idleTimeoutMs", fallbackIdleTimeoutMs); + if (idleTimeoutMs <= 0) { + return undefined; + } + return resolveBindingLastActivityAt(binding) + idleTimeoutMs; +} + +function resolveMaxAgeExpiresAt( + binding: SessionBindingRecord, + fallbackMaxAgeMs: number, +): number | undefined { + const maxAgeMs = resolveBindingDurationMs(binding, "maxAgeMs", fallbackMaxAgeMs); + if (maxAgeMs <= 0) { + return undefined; + } + return binding.boundAt + maxAgeMs; +} + type UpdatedLifecycleBinding = { boundAt: number; lastActivityAt: number; @@ -200,6 +228,57 @@ export const handleSendPolicyCommand: CommandHandler = async (params, allowTextC }; }; +export const handleFastCommand: CommandHandler = async (params, allowTextCommands) => { + if (!allowTextCommands) { + return null; + } + const normalized = params.command.commandBodyNormalized; + if (normalized !== "/fast" && !normalized.startsWith("/fast ")) { + return null; + } + if (!params.command.isAuthorizedSender) { + logVerbose( + `Ignoring /fast from unauthorized sender: ${params.command.senderId || ""}`, + ); + return { shouldContinue: false }; + } + + const rawArgs = normalized === "/fast" ? "" : normalized.slice("/fast".length).trim(); + const rawMode = rawArgs.toLowerCase(); + if (!rawMode || rawMode === "status") { + const state = resolveFastModeState({ + cfg: params.cfg, + provider: params.provider, + model: params.model, + sessionEntry: params.sessionEntry, + }); + const suffix = + state.source === "config" ? " (config)" : state.source === "default" ? " (default)" : ""; + return { + shouldContinue: false, + reply: { text: `âš™ī¸ Current fast mode: ${state.enabled ? "on" : "off"}${suffix}.` }, + }; + } + + const nextMode = normalizeFastMode(rawMode); + if (nextMode === undefined) { + return { + shouldContinue: false, + reply: { text: "âš™ī¸ Usage: /fast status|on|off" }, + }; + } + + if (params.sessionEntry && params.sessionStore && params.sessionKey) { + params.sessionEntry.fastMode = nextMode; + await persistSessionEntry(params); + } + + return { + shouldContinue: false, + reply: { text: `âš™ī¸ Fast mode ${nextMode ? "enabled" : "disabled"}.` }, + }; +}; + export const handleUsageCommand: CommandHandler = async (params, allowTextCommands) => { if (!allowTextCommands) { return null; @@ -286,57 +365,6 @@ export const handleUsageCommand: CommandHandler = async (params, allowTextComman }; }; -export const handleFastCommand: CommandHandler = async (params, allowTextCommands) => { - if (!allowTextCommands) { - return null; - } - const normalized = params.command.commandBodyNormalized; - if (normalized !== "/fast" && !normalized.startsWith("/fast ")) { - return null; - } - if (!params.command.isAuthorizedSender) { - logVerbose( - `Ignoring /fast from unauthorized sender: ${params.command.senderId || ""}`, - ); - return { shouldContinue: false }; - } - - const rawArgs = normalized === "/fast" ? "" : normalized.slice("/fast".length).trim(); - const rawMode = rawArgs.toLowerCase(); - if (!rawMode || rawMode === "status") { - const state = resolveFastModeState({ - cfg: params.cfg, - provider: params.provider, - model: params.model, - sessionEntry: params.sessionEntry, - }); - const suffix = - state.source === "config" ? " (config)" : state.source === "default" ? " (default)" : ""; - return { - shouldContinue: false, - reply: { text: `âš™ī¸ Current fast mode: ${state.enabled ? "on" : "off"}${suffix}.` }, - }; - } - - const nextMode = normalizeFastMode(rawMode); - if (nextMode === undefined) { - return { - shouldContinue: false, - reply: { text: "âš™ī¸ Usage: /fast status|on|off" }, - }; - } - - if (params.sessionEntry && params.sessionStore && params.sessionKey) { - params.sessionEntry.fastMode = nextMode; - await persistSessionEntry(params); - } - - return { - shouldContinue: false, - reply: { text: `âš™ī¸ Fast mode ${nextMode ? "enabled" : "disabled"}.` }, - }; -}; - export const handleSessionCommand: CommandHandler = async (params, allowTextCommands) => { if (!allowTextCommands) { return null; @@ -363,44 +391,71 @@ export const handleSessionCommand: CommandHandler = async (params, allowTextComm } const onDiscord = isDiscordSurface(params); + const onMatrix = isMatrixSurface(params); const onTelegram = isTelegramSurface(params); - if (!onDiscord && !onTelegram) { + if (!onDiscord && !onTelegram && !onMatrix) { return { shouldContinue: false, reply: { - text: "âš ī¸ /session idle and /session max-age are currently available for Discord and Telegram bound sessions.", + text: "âš ī¸ /session idle and /session max-age are currently available for Discord, Matrix, and Telegram bound sessions.", }, }; } const accountId = resolveChannelAccountId(params); const sessionBindingService = getSessionBindingService(); + const channel = onDiscord ? "discord" : onTelegram ? "telegram" : "matrix-js"; const threadId = params.ctx.MessageThreadId != null ? String(params.ctx.MessageThreadId).trim() : ""; - const telegramConversationId = onTelegram ? resolveTelegramConversationId(params) : undefined; - const channelRuntime = getChannelRuntime(); - - const discordManager = onDiscord - ? channelRuntime.discord.threadBindings.getManager(accountId) - : null; - if (onDiscord && !discordManager) { + const conversationId = onTelegram + ? resolveTelegramConversationId(params) + : onMatrix + ? resolveMatrixConversationId({ + ctx: { + MessageThreadId: params.ctx.MessageThreadId, + OriginatingTo: params.ctx.OriginatingTo, + To: params.ctx.To, + }, + command: { + to: params.command.to, + }, + }) + : threadId || undefined; + const parentConversationId = onMatrix + ? resolveMatrixParentConversationId({ + ctx: { + MessageThreadId: params.ctx.MessageThreadId, + OriginatingTo: params.ctx.OriginatingTo, + To: params.ctx.To, + }, + command: { + to: params.command.to, + }, + }) + : undefined; + const capabilities = sessionBindingService.getCapabilities({ channel, accountId }); + if (!capabilities.adapterAvailable) { + const label = + channel === "discord" + ? "Discord thread" + : channel === "telegram" + ? "Telegram conversation" + : "Matrix thread"; return { shouldContinue: false, - reply: { text: "âš ī¸ Discord thread bindings are unavailable for this account." }, + reply: { text: `âš ī¸ ${label} bindings are unavailable for this account.` }, }; } - - const discordBinding = - onDiscord && threadId ? discordManager?.getByThreadId(threadId) : undefined; - const telegramBinding = - onTelegram && telegramConversationId + const binding = + conversationId != null ? sessionBindingService.resolveByConversation({ - channel: "telegram", + channel, accountId, - conversationId: telegramConversationId, + conversationId, + ...(parentConversationId ? { parentConversationId } : {}), }) : null; - if (onDiscord && !discordBinding) { + if (!binding) { if (onDiscord && !threadId) { return { shouldContinue: false, @@ -409,13 +464,15 @@ export const handleSessionCommand: CommandHandler = async (params, allowTextComm }, }; } - return { - shouldContinue: false, - reply: { text: "â„šī¸ This thread is not currently focused." }, - }; - } - if (onTelegram && !telegramBinding) { - if (!telegramConversationId) { + if (onMatrix && !threadId) { + return { + shouldContinue: false, + reply: { + text: "âš ī¸ /session idle and /session max-age must be run inside a focused Matrix thread.", + }, + }; + } + if (onTelegram && !conversationId) { return { shouldContinue: false, reply: { @@ -425,38 +482,19 @@ export const handleSessionCommand: CommandHandler = async (params, allowTextComm } return { shouldContinue: false, - reply: { text: "â„šī¸ This conversation is not currently focused." }, + reply: { + text: + channel === "telegram" + ? "â„šī¸ This conversation is not currently focused." + : "â„šī¸ This thread is not currently focused.", + }, }; } - const idleTimeoutMs = onDiscord - ? channelRuntime.discord.threadBindings.resolveIdleTimeoutMs({ - record: discordBinding!, - defaultIdleTimeoutMs: discordManager!.getIdleTimeoutMs(), - }) - : resolveTelegramBindingDurationMs(telegramBinding!, "idleTimeoutMs", 24 * 60 * 60 * 1000); - const idleExpiresAt = onDiscord - ? channelRuntime.discord.threadBindings.resolveInactivityExpiresAt({ - record: discordBinding!, - defaultIdleTimeoutMs: discordManager!.getIdleTimeoutMs(), - }) - : idleTimeoutMs > 0 - ? resolveTelegramBindingLastActivityAt(telegramBinding!) + idleTimeoutMs - : undefined; - const maxAgeMs = onDiscord - ? channelRuntime.discord.threadBindings.resolveMaxAgeMs({ - record: discordBinding!, - defaultMaxAgeMs: discordManager!.getMaxAgeMs(), - }) - : resolveTelegramBindingDurationMs(telegramBinding!, "maxAgeMs", 0); - const maxAgeExpiresAt = onDiscord - ? channelRuntime.discord.threadBindings.resolveMaxAgeExpiresAt({ - record: discordBinding!, - defaultMaxAgeMs: discordManager!.getMaxAgeMs(), - }) - : maxAgeMs > 0 - ? telegramBinding!.boundAt + maxAgeMs - : undefined; + const idleTimeoutMs = resolveBindingDurationMs(binding, "idleTimeoutMs", 24 * 60 * 60 * 1000); + const idleExpiresAt = resolveIdleExpiresAt(binding, 24 * 60 * 60 * 1000); + const maxAgeMs = resolveBindingDurationMs(binding, "maxAgeMs", 0); + const maxAgeExpiresAt = resolveMaxAgeExpiresAt(binding, 0); const durationArgRaw = tokens.slice(1).join(""); if (!durationArgRaw) { @@ -498,16 +536,13 @@ export const handleSessionCommand: CommandHandler = async (params, allowTextComm } const senderId = params.command.senderId?.trim() || ""; - const boundBy = onDiscord - ? discordBinding!.boundBy - : resolveTelegramBindingBoundBy(telegramBinding!); + const boundBy = resolveBindingBoundBy(binding); if (boundBy && boundBy !== "system" && senderId && senderId !== boundBy) { + const noun = resolveBindingConversationLabel(channel); return { shouldContinue: false, reply: { - text: onDiscord - ? `âš ī¸ Only ${boundBy} can update session lifecycle settings for this thread.` - : `âš ī¸ Only ${boundBy} can update session lifecycle settings for this conversation.`, + text: `âš ī¸ Only ${boundBy} can update session lifecycle settings for this ${noun}.`, }, }; } @@ -522,32 +557,20 @@ export const handleSessionCommand: CommandHandler = async (params, allowTextComm }; } - const updatedBindings = (() => { - if (onDiscord) { - return action === SESSION_ACTION_IDLE - ? channelRuntime.discord.threadBindings.setIdleTimeoutBySessionKey({ - targetSessionKey: discordBinding!.targetSessionKey, - accountId, - idleTimeoutMs: durationMs, - }) - : channelRuntime.discord.threadBindings.setMaxAgeBySessionKey({ - targetSessionKey: discordBinding!.targetSessionKey, - accountId, - maxAgeMs: durationMs, - }); - } - return action === SESSION_ACTION_IDLE - ? channelRuntime.telegram.threadBindings.setIdleTimeoutBySessionKey({ - targetSessionKey: telegramBinding!.targetSessionKey, + const updatedBindings = + action === SESSION_ACTION_IDLE + ? await sessionBindingService.setIdleTimeoutBySession({ + channel, accountId, + targetSessionKey: binding.targetSessionKey, idleTimeoutMs: durationMs, }) - : channelRuntime.telegram.threadBindings.setMaxAgeBySessionKey({ - targetSessionKey: telegramBinding!.targetSessionKey, + : await sessionBindingService.setMaxAgeBySession({ + channel, accountId, + targetSessionKey: binding.targetSessionKey, maxAgeMs: durationMs, }); - })(); if (updatedBindings.length === 0) { return { shouldContinue: false, @@ -574,7 +597,12 @@ export const handleSessionCommand: CommandHandler = async (params, allowTextComm const nextExpiry = resolveUpdatedBindingExpiry({ action, - bindings: updatedBindings, + bindings: updatedBindings.map((binding) => ({ + boundAt: binding.boundAt, + lastActivityAt: resolveBindingLastActivityAt(binding), + idleTimeoutMs: resolveBindingDurationMs(binding, "idleTimeoutMs", 0), + maxAgeMs: resolveBindingDurationMs(binding, "maxAgeMs", 0), + })), }); const expiryLabel = typeof nextExpiry === "number" && Number.isFinite(nextExpiry) diff --git a/src/auto-reply/reply/commands-subagents-focus.test.ts b/src/auto-reply/reply/commands-subagents-focus.test.ts index 651d8088486..b118bf99793 100644 --- a/src/auto-reply/reply/commands-subagents-focus.test.ts +++ b/src/auto-reply/reply/commands-subagents-focus.test.ts @@ -29,6 +29,8 @@ const hoisted = vi.hoisted(() => { function buildFocusSessionBindingService() { return { touch: vi.fn(), + setIdleTimeoutBySession: vi.fn(async () => []), + setMaxAgeBySession: vi.fn(async () => []), listBySession(targetSessionKey: string) { return hoisted.sessionBindingListBySessionMock(targetSessionKey); }, @@ -103,6 +105,19 @@ function createTelegramTopicCommandParams(commandBody: string) { return params; } +function createMatrixCommandParams(commandBody: string) { + const params = buildCommandTestParams(commandBody, baseCfg, { + Provider: "matrix-js", + Surface: "matrix-js", + OriginatingChannel: "matrix-js", + OriginatingTo: "room:!room:example", + To: "room:!room:example", + AccountId: "default", + }); + params.command.senderId = "user-1"; + return params; +} + function createSessionBindingRecord( overrides?: Partial, ): SessionBindingRecord { @@ -220,6 +235,22 @@ describe("/focus, /unfocus, /agents", () => { ); }); + it("/focus creates Matrix child thread bindings from top-level rooms", async () => { + const result = await focusCodexAcp(createMatrixCommandParams("/focus codex-acp")); + + expect(result?.reply?.text).toContain("created thread"); + expect(hoisted.sessionBindingBindMock).toHaveBeenCalledWith( + expect.objectContaining({ + placement: "child", + conversation: expect.objectContaining({ + channel: "matrix-js", + conversationId: "!room:example", + parentConversationId: "!room:example", + }), + }), + ); + }); + it("/focus includes ACP session identifiers in intro text when available", async () => { hoisted.readAcpSessionEntryMock.mockReturnValue({ sessionKey: "agent:codex-acp:session-1", @@ -401,6 +432,6 @@ describe("/focus, /unfocus, /agents", () => { it("/focus rejects unsupported channels", async () => { const params = buildCommandTestParams("/focus codex-acp", baseCfg); const result = await handleSubagentsCommand(params, true); - expect(result?.reply?.text).toContain("only available on Discord and Telegram"); + expect(result?.reply?.text).toContain("only available on Discord, Matrix, and Telegram"); }); }); diff --git a/src/auto-reply/reply/commands-subagents/action-agents.ts b/src/auto-reply/reply/commands-subagents/action-agents.ts index 69c9ac05a79..99be8abdd08 100644 --- a/src/auto-reply/reply/commands-subagents/action-agents.ts +++ b/src/auto-reply/reply/commands-subagents/action-agents.ts @@ -15,6 +15,9 @@ function formatConversationBindingText(params: { if (params.channel === "discord") { return `thread:${params.conversationId}`; } + if (params.channel === "matrix-js") { + return `thread:${params.conversationId}`; + } if (params.channel === "telegram") { return `conversation:${params.conversationId}`; } @@ -64,9 +67,9 @@ export function handleSubagentsAgentsAction(ctx: SubagentsCommandContext): Comma channel, conversationId: binding.conversation.conversationId, }) - : channel === "discord" || channel === "telegram" + : channel === "discord" || channel === "telegram" || channel === "matrix-js" ? "unbound" - : "bindings available on discord/telegram"; + : "bindings available on discord/matrix-js/telegram"; lines.push(`${index}. ${formatRunLabel(entry)} (${bindingText})`); index += 1; } diff --git a/src/auto-reply/reply/commands-subagents/action-focus.ts b/src/auto-reply/reply/commands-subagents/action-focus.ts index df7a268b3b0..79da5ca5dc9 100644 --- a/src/auto-reply/reply/commands-subagents/action-focus.ts +++ b/src/auto-reply/reply/commands-subagents/action-focus.ts @@ -16,19 +16,23 @@ import type { CommandHandlerResult } from "../commands-types.js"; import { type SubagentsCommandContext, isDiscordSurface, + isMatrixSurface, isTelegramSurface, resolveChannelAccountId, resolveCommandSurfaceChannel, resolveDiscordChannelIdForFocus, resolveFocusTargetSession, + resolveMatrixConversationId, + resolveMatrixParentConversationId, resolveTelegramConversationId, stopWithText, } from "./shared.js"; type FocusBindingContext = { - channel: "discord" | "telegram"; + channel: "discord" | "telegram" | "matrix-js"; accountId: string; conversationId: string; + parentConversationId?: string; placement: "current" | "child"; labelNoun: "thread" | "conversation"; }; @@ -65,6 +69,41 @@ function resolveFocusBindingContext( labelNoun: "conversation", }; } + if (isMatrixSurface(params)) { + const currentThreadId = + params.ctx.MessageThreadId != null ? String(params.ctx.MessageThreadId).trim() : ""; + const conversationId = resolveMatrixConversationId({ + ctx: { + MessageThreadId: params.ctx.MessageThreadId, + OriginatingTo: params.ctx.OriginatingTo, + To: params.ctx.To, + }, + command: { + to: params.command.to, + }, + }); + if (!conversationId) { + return null; + } + const parentConversationId = resolveMatrixParentConversationId({ + ctx: { + MessageThreadId: params.ctx.MessageThreadId, + OriginatingTo: params.ctx.OriginatingTo, + To: params.ctx.To, + }, + command: { + to: params.command.to, + }, + }); + return { + channel: "matrix-js", + accountId: resolveChannelAccountId(params), + conversationId, + ...(parentConversationId ? { parentConversationId } : {}), + placement: currentThreadId ? "current" : "child", + labelNoun: "thread", + }; + } return null; } @@ -73,8 +112,8 @@ export async function handleSubagentsFocusAction( ): Promise { const { params, runs, restTokens } = ctx; const channel = resolveCommandSurfaceChannel(params); - if (channel !== "discord" && channel !== "telegram") { - return stopWithText("âš ī¸ /focus is only available on Discord and Telegram."); + if (channel !== "discord" && channel !== "telegram" && channel !== "matrix-js") { + return stopWithText("âš ī¸ /focus is only available on Discord, Matrix, and Telegram."); } const token = restTokens.join(" ").trim(); @@ -89,7 +128,12 @@ export async function handleSubagentsFocusAction( accountId, }); if (!capabilities.adapterAvailable || !capabilities.bindSupported) { - const label = channel === "discord" ? "Discord thread" : "Telegram conversation"; + const label = + channel === "discord" + ? "Discord thread" + : channel === "telegram" + ? "Telegram conversation" + : "Matrix thread"; return stopWithText(`âš ī¸ ${label} bindings are unavailable for this account.`); } @@ -105,6 +149,9 @@ export async function handleSubagentsFocusAction( "âš ī¸ /focus on Telegram requires a topic context in groups, or a direct-message conversation.", ); } + if (channel === "matrix-js") { + return stopWithText("âš ī¸ Could not resolve a Matrix conversation for /focus."); + } return stopWithText("âš ī¸ Could not resolve a Discord channel for /focus."); } @@ -113,6 +160,9 @@ export async function handleSubagentsFocusAction( channel: bindingContext.channel, accountId: bindingContext.accountId, conversationId: bindingContext.conversationId, + ...(bindingContext.parentConversationId + ? { parentConversationId: bindingContext.parentConversationId } + : {}), }); const boundBy = typeof existingBinding?.metadata?.boundBy === "string" @@ -143,6 +193,9 @@ export async function handleSubagentsFocusAction( channel: bindingContext.channel, accountId: bindingContext.accountId, conversationId: bindingContext.conversationId, + ...(bindingContext.parentConversationId + ? { parentConversationId: bindingContext.parentConversationId } + : {}), }, placement: bindingContext.placement, metadata: { diff --git a/src/auto-reply/reply/commands-subagents/action-unfocus.ts b/src/auto-reply/reply/commands-subagents/action-unfocus.ts index 78bb02b2427..07fe5a7e45b 100644 --- a/src/auto-reply/reply/commands-subagents/action-unfocus.ts +++ b/src/auto-reply/reply/commands-subagents/action-unfocus.ts @@ -3,9 +3,12 @@ import type { CommandHandlerResult } from "../commands-types.js"; import { type SubagentsCommandContext, isDiscordSurface, + isMatrixSurface, isTelegramSurface, resolveChannelAccountId, resolveCommandSurfaceChannel, + resolveMatrixConversationId, + resolveMatrixParentConversationId, resolveTelegramConversationId, stopWithText, } from "./shared.js"; @@ -15,8 +18,8 @@ export async function handleSubagentsUnfocusAction( ): Promise { const { params } = ctx; const channel = resolveCommandSurfaceChannel(params); - if (channel !== "discord" && channel !== "telegram") { - return stopWithText("âš ī¸ /unfocus is only available on Discord and Telegram."); + if (channel !== "discord" && channel !== "telegram" && channel !== "matrix-js") { + return stopWithText("âš ī¸ /unfocus is only available on Discord, Matrix, and Telegram."); } const accountId = resolveChannelAccountId(params); @@ -27,16 +30,43 @@ export async function handleSubagentsUnfocusAction( const threadId = params.ctx.MessageThreadId != null ? String(params.ctx.MessageThreadId) : ""; return threadId.trim() || undefined; } + if (isMatrixSurface(params)) { + return resolveMatrixConversationId({ + ctx: { + MessageThreadId: params.ctx.MessageThreadId, + OriginatingTo: params.ctx.OriginatingTo, + To: params.ctx.To, + }, + command: { + to: params.command.to, + }, + }); + } if (isTelegramSurface(params)) { return resolveTelegramConversationId(params); } return undefined; })(); + const parentConversationId = isMatrixSurface(params) + ? resolveMatrixParentConversationId({ + ctx: { + MessageThreadId: params.ctx.MessageThreadId, + OriginatingTo: params.ctx.OriginatingTo, + To: params.ctx.To, + }, + command: { + to: params.command.to, + }, + }) + : undefined; if (!conversationId) { if (channel === "discord") { return stopWithText("âš ī¸ /unfocus must be run inside a Discord thread."); } + if (channel === "matrix-js") { + return stopWithText("âš ī¸ /unfocus must be run inside a focused Matrix thread."); + } return stopWithText( "âš ī¸ /unfocus on Telegram requires a topic context in groups, or a direct-message conversation.", ); @@ -46,12 +76,15 @@ export async function handleSubagentsUnfocusAction( channel, accountId, conversationId, + ...(parentConversationId ? { parentConversationId } : {}), }); if (!binding) { return stopWithText( channel === "discord" ? "â„šī¸ This thread is not currently focused." - : "â„šī¸ This conversation is not currently focused.", + : channel === "matrix-js" + ? "â„šī¸ This thread is not currently focused." + : "â„šī¸ This conversation is not currently focused.", ); } @@ -62,7 +95,9 @@ export async function handleSubagentsUnfocusAction( return stopWithText( channel === "discord" ? `âš ī¸ Only ${boundBy} can unfocus this thread.` - : `âš ī¸ Only ${boundBy} can unfocus this conversation.`, + : channel === "matrix-js" + ? `âš ī¸ Only ${boundBy} can unfocus this thread.` + : `âš ī¸ Only ${boundBy} can unfocus this conversation.`, ); } @@ -71,6 +106,8 @@ export async function handleSubagentsUnfocusAction( reason: "manual", }); return stopWithText( - channel === "discord" ? "✅ Thread unfocused." : "✅ Conversation unfocused.", + channel === "discord" || channel === "matrix-js" + ? "✅ Thread unfocused." + : "✅ Conversation unfocused.", ); } diff --git a/src/auto-reply/reply/commands-subagents/shared.ts b/src/auto-reply/reply/commands-subagents/shared.ts index 9781683267e..337ea7f5d57 100644 --- a/src/auto-reply/reply/commands-subagents/shared.ts +++ b/src/auto-reply/reply/commands-subagents/shared.ts @@ -30,12 +30,17 @@ import { } from "../../../shared/subagents-format.js"; import { isDiscordSurface, + isMatrixSurface, isTelegramSurface, resolveCommandSurfaceChannel, resolveDiscordAccountId, resolveChannelAccountId, } from "../channel-context.js"; import type { CommandHandler, CommandHandlerResult } from "../commands-types.js"; +import { + resolveMatrixConversationId, + resolveMatrixParentConversationId, +} from "../matrix-context.ts"; import { formatRunLabel, formatRunStatus, @@ -47,10 +52,13 @@ import { resolveTelegramConversationId } from "../telegram-context.js"; export { extractAssistantText, stripToolMessages }; export { isDiscordSurface, + isMatrixSurface, isTelegramSurface, resolveCommandSurfaceChannel, resolveDiscordAccountId, resolveChannelAccountId, + resolveMatrixConversationId, + resolveMatrixParentConversationId, resolveTelegramConversationId, }; diff --git a/src/auto-reply/reply/matrix-context.ts b/src/auto-reply/reply/matrix-context.ts new file mode 100644 index 00000000000..8689cc79d57 --- /dev/null +++ b/src/auto-reply/reply/matrix-context.ts @@ -0,0 +1,54 @@ +type MatrixConversationParams = { + ctx: { + MessageThreadId?: string | number | null; + OriginatingTo?: string; + To?: string; + }; + command: { + to?: string; + }; +}; + +function normalizeMatrixTarget(value: unknown): string { + return typeof value === "string" ? value.trim() : ""; +} + +function resolveMatrixRoomIdFromTarget(raw: string): string | undefined { + let target = normalizeMatrixTarget(raw); + if (!target) { + return undefined; + } + if (target.toLowerCase().startsWith("matrix:")) { + target = target.slice("matrix:".length).trim(); + } + if (/^(room|channel):/i.test(target)) { + const roomId = target.replace(/^(room|channel):/i, "").trim(); + return roomId || undefined; + } + if (target.startsWith("!") || target.startsWith("#")) { + return target; + } + return undefined; +} + +export function resolveMatrixParentConversationId( + params: MatrixConversationParams, +): string | undefined { + const targets = [params.ctx.OriginatingTo, params.command.to, params.ctx.To]; + for (const candidate of targets) { + const roomId = resolveMatrixRoomIdFromTarget(candidate ?? ""); + if (roomId) { + return roomId; + } + } + return undefined; +} + +export function resolveMatrixConversationId(params: MatrixConversationParams): string | undefined { + const threadId = + params.ctx.MessageThreadId != null ? String(params.ctx.MessageThreadId).trim() : ""; + if (threadId) { + return threadId; + } + return resolveMatrixParentConversationId(params); +} diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 4518d393ed2..aeb2ebe35cc 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -467,7 +467,7 @@ export const FIELD_HELP: Record = { "bindings[].match": "Match rule object for deciding when a binding applies, including channel and optional account/peer constraints. Keep rules narrow to avoid accidental agent takeover across contexts.", "bindings[].match.channel": - "Channel/provider identifier this binding applies to, such as `telegram`, `discord`, or a plugin channel ID. Use the configured channel key exactly so binding evaluation works reliably.", + "Channel/provider identifier this binding applies to, such as `telegram`, `discord`, `matrix-js`, or another plugin channel ID. Use the configured channel key exactly so binding evaluation works reliably.", "bindings[].match.accountId": "Optional account selector for multi-account channel setups so the binding applies only to one identity. Use this when account scoping is required for the route and leave unset otherwise.", "bindings[].match.peer": @@ -1598,6 +1598,16 @@ export const FIELD_HELP: Record = { "Allow subagent spawns with thread=true to auto-create and bind Discord threads (default: false; opt-in). Set true to enable thread-bound subagent spawns for this account/channel.", "channels.discord.threadBindings.spawnAcpSessions": "Allow /acp spawn to auto-create and bind Discord threads for ACP sessions (default: false; opt-in). Set true to enable thread-bound ACP spawns for this account/channel.", + "channels.matrix-js.threadBindings.enabled": + "Enable Matrix-js thread binding features (/focus, /unfocus, /agents, /session idle|max-age, and thread-bound routing). Overrides session.threadBindings.enabled when set.", + "channels.matrix-js.threadBindings.idleHours": + "Inactivity window in hours for Matrix-js thread-bound sessions. Set 0 to disable idle auto-unfocus (default: 24). Overrides session.threadBindings.idleHours when set.", + "channels.matrix-js.threadBindings.maxAgeHours": + "Optional hard max age in hours for Matrix-js thread-bound sessions. Set 0 to disable hard cap (default: 0). Overrides session.threadBindings.maxAgeHours when set.", + "channels.matrix-js.threadBindings.spawnSubagentSessions": + "Allow subagent spawns/focus flows to auto-create and bind Matrix threads when starting from a top-level Matrix room or DM.", + "channels.matrix-js.threadBindings.spawnAcpSessions": + "Allow /acp spawn to auto-create and bind Matrix threads for ACP sessions when starting from a top-level Matrix room or DM.", "channels.discord.ui.components.accentColor": "Accent color for Discord component containers (hex). Set per account via channels.discord.accounts..ui.components.accentColor.", "channels.discord.voice.enabled": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index ae1c8d2829d..3c05875113b 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -793,6 +793,12 @@ export const FIELD_LABELS: Record = { "channels.discord.threadBindings.maxAgeHours": "Discord Thread Binding Max Age (hours)", "channels.discord.threadBindings.spawnSubagentSessions": "Discord Thread-Bound Subagent Spawn", "channels.discord.threadBindings.spawnAcpSessions": "Discord Thread-Bound ACP Spawn", + "channels.matrix-js.threadBindings.enabled": "Matrix-js Thread Binding Enabled", + "channels.matrix-js.threadBindings.idleHours": "Matrix-js Thread Binding Idle Timeout (hours)", + "channels.matrix-js.threadBindings.maxAgeHours": "Matrix-js Thread Binding Max Age (hours)", + "channels.matrix-js.threadBindings.spawnSubagentSessions": + "Matrix-js Thread-Bound Subagent Spawn", + "channels.matrix-js.threadBindings.spawnAcpSessions": "Matrix-js Thread-Bound ACP Spawn", "channels.discord.ui.components.accentColor": "Discord Component Accent Color", "channels.discord.intents.presence": "Discord Presence Intent", "channels.discord.intents.guildMembers": "Discord Guild Members Intent", diff --git a/src/config/zod-schema.agents.ts b/src/config/zod-schema.agents.ts index 5dddfc9813a..4b676422cd6 100644 --- a/src/config/zod-schema.agents.ts +++ b/src/config/zod-schema.agents.ts @@ -71,12 +71,17 @@ const AcpBindingSchema = z return; } const channel = value.match.channel.trim().toLowerCase(); - if (channel !== "discord" && channel !== "telegram" && channel !== "feishu") { + if ( + channel !== "discord" && + channel !== "matrix-js" && + channel !== "telegram" && + channel !== "feishu" + ) { ctx.addIssue({ code: z.ZodIssueCode.custom, path: ["match", "channel"], message: - 'ACP bindings currently support only "discord", "telegram", and "feishu" channels.', + 'ACP bindings currently support only "discord", "matrix-js", "telegram", and "feishu" channels.', }); return; } diff --git a/src/infra/outbound/session-binding-service.ts b/src/infra/outbound/session-binding-service.ts index c155d3792b8..a8a6b74a221 100644 --- a/src/infra/outbound/session-binding-service.ts +++ b/src/infra/outbound/session-binding-service.ts @@ -72,6 +72,18 @@ export type SessionBindingService = { getCapabilities: (params: { channel: string; accountId: string }) => SessionBindingCapabilities; listBySession: (targetSessionKey: string) => SessionBindingRecord[]; resolveByConversation: (ref: ConversationRef) => SessionBindingRecord | null; + setIdleTimeoutBySession: (params: { + channel: string; + accountId: string; + targetSessionKey: string; + idleTimeoutMs: number; + }) => Promise; + setMaxAgeBySession: (params: { + channel: string; + accountId: string; + targetSessionKey: string; + maxAgeMs: number; + }) => Promise; touch: (bindingId: string, at?: number) => void; unbind: (input: SessionBindingUnbindInput) => Promise; }; @@ -89,6 +101,14 @@ export type SessionBindingAdapter = { bind?: (input: SessionBindingBindInput) => Promise; listBySession: (targetSessionKey: string) => SessionBindingRecord[]; resolveByConversation: (ref: ConversationRef) => SessionBindingRecord | null; + setIdleTimeoutBySession?: (params: { + targetSessionKey: string; + idleTimeoutMs: number; + }) => Promise | SessionBindingRecord[]; + setMaxAgeBySession?: (params: { + targetSessionKey: string; + maxAgeMs: number; + }) => Promise | SessionBindingRecord[]; touch?: (bindingId: string, at?: number) => void; unbind?: (input: SessionBindingUnbindInput) => Promise; }; @@ -291,6 +311,36 @@ function createDefaultSessionBindingService(): SessionBindingService { } return adapter.resolveByConversation(normalized); }, + setIdleTimeoutBySession: async (params) => { + const adapter = resolveAdapterForChannelAccount({ + channel: params.channel, + accountId: params.accountId, + }); + if (!adapter?.setIdleTimeoutBySession) { + return []; + } + return dedupeBindings( + await adapter.setIdleTimeoutBySession({ + targetSessionKey: params.targetSessionKey.trim(), + idleTimeoutMs: Math.max(0, Math.floor(params.idleTimeoutMs)), + }), + ); + }, + setMaxAgeBySession: async (params) => { + const adapter = resolveAdapterForChannelAccount({ + channel: params.channel, + accountId: params.accountId, + }); + if (!adapter?.setMaxAgeBySession) { + return []; + } + return dedupeBindings( + await adapter.setMaxAgeBySession({ + targetSessionKey: params.targetSessionKey.trim(), + maxAgeMs: Math.max(0, Math.floor(params.maxAgeMs)), + }), + ); + }, touch: (bindingId, at) => { const normalizedBindingId = bindingId.trim(); if (!normalizedBindingId) { diff --git a/src/plugin-sdk/matrix-js.ts b/src/plugin-sdk/matrix-js.ts index ecc9e3951c9..76aadf2c99c 100644 --- a/src/plugin-sdk/matrix-js.ts +++ b/src/plugin-sdk/matrix-js.ts @@ -56,6 +56,11 @@ export type { export type { ChannelPlugin } from "../channels/plugins/types.plugin.js"; export type { ChannelSetupInput } from "../channels/plugins/types.js"; export { createReplyPrefixOptions } from "../channels/reply-prefix.js"; +export { resolveThreadBindingFarewellText } from "../channels/thread-bindings-messages.js"; +export { + resolveThreadBindingIdleTimeoutMsForChannel, + resolveThreadBindingMaxAgeMsForChannel, +} from "../channels/thread-bindings-policy.js"; export { createTypingCallbacks } from "../channels/typing.js"; export { resolveAckReaction } from "../agents/identity.js"; export type { OpenClawConfig } from "../config/config.js"; @@ -81,6 +86,16 @@ export { ToolPolicySchema } from "../config/zod-schema.agent-runtime.js"; export { MarkdownConfigSchema } from "../config/zod-schema.core.js"; export { formatZonedTimestamp } from "../infra/format-time/format-datetime.js"; export { fetchWithSsrFGuard } from "../infra/net/fetch-guard.js"; +export { + getSessionBindingService, + registerSessionBindingAdapter, + unregisterSessionBindingAdapter, +} from "../infra/outbound/session-binding-service.js"; +export type { + BindingTargetKind, + SessionBindingRecord, + SessionBindingAdapter, +} from "../infra/outbound/session-binding-service.js"; export { issuePairingChallenge } from "../pairing/pairing-challenge.js"; export { emptyPluginConfigSchema } from "../plugins/config-schema.js"; export type { PluginRuntime, RuntimeLogger } from "../plugins/runtime/types.js"; @@ -88,6 +103,9 @@ export type { OpenClawPluginApi } from "../plugins/types.js"; export type { PollInput } from "../polls.js"; export { normalizePollInput } from "../polls.js"; export { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "../routing/session-key.js"; +export { resolveAgentIdFromSessionKey } from "../routing/session-key.js"; +export { resolveConfiguredAcpRoute } from "../acp/persistent-bindings.route.js"; +export { ensureConfiguredAcpRouteReady } from "../acp/persistent-bindings.route.js"; export type { RuntimeEnv } from "../runtime.js"; export { readStoreAllowFromForDmPolicy,