matrix-js: add account-aware bindings and ACP routing

This commit is contained in:
Gustavo Madeira Santana
2026-03-08 18:43:15 -04:00
parent 565ff5f913
commit 96d7e4552d
32 changed files with 2194 additions and 367 deletions

View File

@@ -20,6 +20,10 @@ import {
resolveChannelIdForBinding,
summarizeDiscordError,
} from "./thread-bindings.discord-api.js";
import {
setThreadBindingIdleTimeoutBySessionKey,
setThreadBindingMaxAgeBySessionKey,
} from "./thread-bindings.lifecycle.js";
import {
resolveThreadBindingFarewellText,
resolveThreadBindingThreadName,
@@ -651,6 +655,18 @@ export function createThreadBindingManager(
const binding = manager.getByThreadId(ref.conversationId);
return binding ? toSessionBindingRecord(binding, { idleTimeoutMs, maxAgeMs }) : null;
},
setIdleTimeoutBySession: ({ targetSessionKey, idleTimeoutMs: nextIdleTimeoutMs }) =>
setThreadBindingIdleTimeoutBySessionKey({
targetSessionKey,
accountId,
idleTimeoutMs: nextIdleTimeoutMs,
}).map((entry) => toSessionBindingRecord(entry, { idleTimeoutMs, maxAgeMs })),
setMaxAgeBySession: ({ targetSessionKey, maxAgeMs: nextMaxAgeMs }) =>
setThreadBindingMaxAgeBySessionKey({
targetSessionKey,
accountId,
maxAgeMs: nextMaxAgeMs,
}).map((entry) => toSessionBindingRecord(entry, { idleTimeoutMs, maxAgeMs })),
touch: (bindingId, at) => {
const threadId = resolveThreadBindingConversationIdFromBindingId({
accountId,

View File

@@ -0,0 +1,85 @@
import type { ChannelMessageActionContext } from "openclaw/plugin-sdk/matrix-js";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { CoreConfig } from "./types.js";
const mocks = vi.hoisted(() => ({
handleMatrixAction: vi.fn(),
}));
vi.mock("./tool-actions.js", () => ({
handleMatrixAction: mocks.handleMatrixAction,
}));
const { matrixMessageActions } = await import("./actions.js");
function createContext(
overrides: Partial<ChannelMessageActionContext>,
): ChannelMessageActionContext {
return {
channel: "matrix-js",
action: "send",
cfg: {
channels: {
"matrix-js": {
enabled: true,
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
},
} as CoreConfig,
params: {},
...overrides,
};
}
describe("matrixMessageActions account propagation", () => {
beforeEach(() => {
mocks.handleMatrixAction.mockReset().mockResolvedValue({
ok: true,
output: "",
details: { ok: true },
});
});
it("forwards accountId for send actions", async () => {
await matrixMessageActions.handleAction?.(
createContext({
action: "send",
accountId: "ops",
params: {
to: "room:!room:example",
message: "hello",
},
}),
);
expect(mocks.handleMatrixAction).toHaveBeenCalledWith(
expect.objectContaining({
action: "sendMessage",
accountId: "ops",
}),
expect.any(Object),
);
});
it("forwards accountId for permissions actions", async () => {
await matrixMessageActions.handleAction?.(
createContext({
action: "permissions",
accountId: "ops",
params: {
operation: "verification-list",
},
}),
);
expect(mocks.handleMatrixAction).toHaveBeenCalledWith(
expect.objectContaining({
action: "verificationList",
accountId: "ops",
}),
expect.any(Object),
);
});
});

View File

@@ -77,7 +77,15 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
return { to };
},
handleAction: async (ctx: ChannelMessageActionContext) => {
const { action, params, cfg } = ctx;
const { action, params, cfg, accountId } = ctx;
const dispatch = async (actionParams: Record<string, unknown>) =>
await handleMatrixAction(
{
...actionParams,
...(accountId ? { accountId } : {}),
},
cfg as CoreConfig,
);
const resolveRoomId = () =>
readStringParam(params, "roomId") ??
readStringParam(params, "channelId") ??
@@ -92,97 +100,76 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
const mediaUrl = readStringParam(params, "media", { trim: false });
const replyTo = readStringParam(params, "replyTo");
const threadId = readStringParam(params, "threadId");
return await handleMatrixAction(
{
action: "sendMessage",
to,
content,
mediaUrl: mediaUrl ?? undefined,
replyToId: replyTo ?? undefined,
threadId: threadId ?? undefined,
},
cfg as CoreConfig,
);
return await dispatch({
action: "sendMessage",
to,
content,
mediaUrl: mediaUrl ?? undefined,
replyToId: replyTo ?? undefined,
threadId: threadId ?? undefined,
});
}
if (action === "poll-vote") {
return await handleMatrixAction(
{
...params,
action: "pollVote",
},
cfg as CoreConfig,
);
return await dispatch({
...params,
action: "pollVote",
});
}
if (action === "react") {
const messageId = readStringParam(params, "messageId", { required: true });
const emoji = readStringParam(params, "emoji", { allowEmpty: true });
const remove = typeof params.remove === "boolean" ? params.remove : undefined;
return await handleMatrixAction(
{
action: "react",
roomId: resolveRoomId(),
messageId,
emoji,
remove,
},
cfg as CoreConfig,
);
return await dispatch({
action: "react",
roomId: resolveRoomId(),
messageId,
emoji,
remove,
});
}
if (action === "reactions") {
const messageId = readStringParam(params, "messageId", { required: true });
const limit = readNumberParam(params, "limit", { integer: true });
return await handleMatrixAction(
{
action: "reactions",
roomId: resolveRoomId(),
messageId,
limit,
},
cfg as CoreConfig,
);
return await dispatch({
action: "reactions",
roomId: resolveRoomId(),
messageId,
limit,
});
}
if (action === "read") {
const limit = readNumberParam(params, "limit", { integer: true });
return await handleMatrixAction(
{
action: "readMessages",
roomId: resolveRoomId(),
limit,
before: readStringParam(params, "before"),
after: readStringParam(params, "after"),
},
cfg as CoreConfig,
);
return await dispatch({
action: "readMessages",
roomId: resolveRoomId(),
limit,
before: readStringParam(params, "before"),
after: readStringParam(params, "after"),
});
}
if (action === "edit") {
const messageId = readStringParam(params, "messageId", { required: true });
const content = readStringParam(params, "message", { required: true });
return await handleMatrixAction(
{
action: "editMessage",
roomId: resolveRoomId(),
messageId,
content,
},
cfg as CoreConfig,
);
return await dispatch({
action: "editMessage",
roomId: resolveRoomId(),
messageId,
content,
});
}
if (action === "delete") {
const messageId = readStringParam(params, "messageId", { required: true });
return await handleMatrixAction(
{
action: "deleteMessage",
roomId: resolveRoomId(),
messageId,
},
cfg as CoreConfig,
);
return await dispatch({
action: "deleteMessage",
roomId: resolveRoomId(),
messageId,
});
}
if (action === "pin" || action === "unpin" || action === "list-pins") {
@@ -190,37 +177,27 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
action === "list-pins"
? undefined
: readStringParam(params, "messageId", { required: true });
return await handleMatrixAction(
{
action:
action === "pin" ? "pinMessage" : action === "unpin" ? "unpinMessage" : "listPins",
roomId: resolveRoomId(),
messageId,
},
cfg as CoreConfig,
);
return await dispatch({
action: action === "pin" ? "pinMessage" : action === "unpin" ? "unpinMessage" : "listPins",
roomId: resolveRoomId(),
messageId,
});
}
if (action === "member-info") {
const userId = readStringParam(params, "userId", { required: true });
return await handleMatrixAction(
{
action: "memberInfo",
userId,
roomId: readStringParam(params, "roomId") ?? readStringParam(params, "channelId"),
},
cfg as CoreConfig,
);
return await dispatch({
action: "memberInfo",
userId,
roomId: readStringParam(params, "roomId") ?? readStringParam(params, "channelId"),
});
}
if (action === "channel-info") {
return await handleMatrixAction(
{
action: "channelInfo",
roomId: resolveRoomId(),
},
cfg as CoreConfig,
);
return await dispatch({
action: "channelInfo",
roomId: resolveRoomId(),
});
}
if (action === "permissions") {
@@ -258,13 +235,10 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
).join(", ")}`,
);
}
return await handleMatrixAction(
{
...params,
action: resolvedAction,
},
cfg,
);
return await dispatch({
...params,
action: resolvedAction,
});
}
throw new Error(`Action ${action} is not supported for provider matrix-js.`);

View File

@@ -19,6 +19,7 @@ type LegacyAccountField =
| "textChunkLimit"
| "chunkMode"
| "responsePrefix"
| "threadBindings"
| "startupVerification"
| "startupVerificationCooldownHours"
| "mediaMaxMb"
@@ -47,6 +48,7 @@ const LEGACY_ACCOUNT_FIELDS: ReadonlyArray<LegacyAccountField> = [
"textChunkLimit",
"chunkMode",
"responsePrefix",
"threadBindings",
"startupVerification",
"startupVerificationCooldownHours",
"mediaMaxMb",

View File

@@ -14,6 +14,16 @@ const matrixActionSchema = z
})
.optional();
const matrixThreadBindingsSchema = z
.object({
enabled: z.boolean().optional(),
idleHours: z.number().nonnegative().optional(),
maxAgeHours: z.number().nonnegative().optional(),
spawnSubagentSessions: z.boolean().optional(),
spawnAcpSessions: z.boolean().optional(),
})
.optional();
const matrixDmSchema = z
.object({
enabled: z.boolean().optional(),
@@ -60,6 +70,7 @@ export const MatrixConfigSchema = z.object({
.enum(["group-mentions", "group-all", "direct", "all", "none", "off"])
.optional(),
reactionNotifications: z.enum(["off", "own"]).optional(),
threadBindings: matrixThreadBindingsSchema,
startupVerification: z.enum(["off", "if-unverified"]).optional(),
startupVerificationCooldownHours: z.number().optional(),
mediaMaxMb: z.number().optional(),

View File

@@ -25,6 +25,7 @@ export async function sendMatrixMessage(
mediaUrl: opts.mediaUrl,
replyToId: opts.replyToId,
threadId: opts.threadId,
accountId: opts.accountId,
client: opts.client,
timeoutMs: opts.timeoutMs,
});

View File

@@ -1,4 +1,8 @@
import { describe, expect, it, vi } from "vitest";
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
__testing as sessionBindingTesting,
registerSessionBindingAdapter,
} from "../../../../../src/infra/outbound/session-binding-service.js";
import { createMatrixRoomMessageHandler } from "./handler.js";
import { EventType, type MatrixRawEvent } from "./types.js";
@@ -13,6 +17,10 @@ vi.mock("../send.js", () => ({
sendTypingMatrix: vi.fn(async () => {}),
}));
beforeEach(() => {
sessionBindingTesting.resetSessionBindingAdaptersForTests();
});
function createReactionHarness(params?: {
cfg?: unknown;
dmPolicy?: "pairing" | "allowlist" | "open" | "disabled";
@@ -515,6 +523,148 @@ describe("matrix monitor handler pairing account scope", () => {
);
});
it("routes bound Matrix threads to the target session key", async () => {
registerSessionBindingAdapter({
channel: "matrix-js",
accountId: "ops",
listBySession: () => [],
resolveByConversation: (ref) =>
ref.conversationId === "$root"
? {
bindingId: "ops:!room:example:$root",
targetSessionKey: "agent:bound:session-1",
targetKind: "session",
conversation: {
channel: "matrix-js",
accountId: "ops",
conversationId: "$root",
parentConversationId: "!room:example",
},
status: "active",
boundAt: Date.now(),
metadata: {
boundBy: "user-1",
},
}
: null,
touch: vi.fn(),
});
const recordInboundSession = vi.fn(async () => {});
const handler = createMatrixRoomMessageHandler({
client: {
getUserId: async () => "@bot:example.org",
getEvent: async () => ({
event_id: "$root",
sender: "@alice:example.org",
type: EventType.RoomMessage,
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "Root topic",
},
}),
} as never,
core: {
channel: {
pairing: {
readAllowFromStore: async () => [] as string[],
upsertPairingRequest: async () => ({ code: "ABCDEFGH", created: false }),
},
commands: {
shouldHandleTextCommands: () => false,
},
text: {
hasControlCommand: () => false,
resolveMarkdownTableMode: () => "preserve",
},
routing: {
resolveAgentRoute: () => ({
agentId: "ops",
channel: "matrix-js",
accountId: "ops",
sessionKey: "agent:ops:main",
mainSessionKey: "agent:ops:main",
matchedBy: "binding.account",
}),
},
session: {
resolveStorePath: () => "/tmp/session-store",
readSessionUpdatedAt: () => undefined,
recordInboundSession,
},
reply: {
resolveEnvelopeFormatOptions: () => ({}),
formatAgentEnvelope: ({ body }: { body: string }) => body,
finalizeInboundContext: (ctx: unknown) => ctx,
createReplyDispatcherWithTyping: () => ({
dispatcher: {},
replyOptions: {},
markDispatchIdle: () => {},
}),
resolveHumanDelayConfig: () => undefined,
dispatchReplyFromConfig: async () => ({
queuedFinal: false,
counts: { final: 0, block: 0, tool: 0 },
}),
},
reactions: {
shouldAckReaction: () => false,
},
},
} as never,
cfg: {} as never,
accountId: "ops",
runtime: {
error: () => {},
} as never,
logger: {
info: () => {},
warn: () => {},
} as never,
logVerboseMessage: () => {},
allowFrom: [],
mentionRegexes: [],
groupPolicy: "open",
replyToMode: "off",
threadReplies: "inbound",
dmEnabled: true,
dmPolicy: "open",
textLimit: 8_000,
mediaMaxBytes: 10_000_000,
startupMs: 0,
startupGraceMs: 0,
directTracker: {
isDirectMessage: async () => false,
},
getRoomInfo: async () => ({ altAliases: [] }),
getMemberDisplayName: async () => "sender",
});
await handler("!room:example", {
type: EventType.RoomMessage,
sender: "@user:example.org",
event_id: "$reply1",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "follow up",
"m.relates_to": {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
"m.mentions": { room: true },
},
} as MatrixRawEvent);
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:bound:session-1",
}),
);
});
it("enqueues system events for reactions on bot-authored messages", async () => {
const { handler, enqueueSystemEvent, resolveAgentRoute } = createReactionHarness();

View File

@@ -1,9 +1,13 @@
import {
createReplyPrefixOptions,
createTypingCallbacks,
ensureConfiguredAcpRouteReady,
formatAllowlistMatchMeta,
getSessionBindingService,
logInboundDrop,
logTypingFailure,
resolveAgentIdFromSessionKey,
resolveConfiguredAcpRoute,
resolveControlCommandGate,
type PluginRuntime,
type ReplyPayload,
@@ -533,7 +537,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
? await resolveThreadContext({ roomId, threadRootId })
: undefined;
const route = core.channel.routing.resolveAgentRoute({
const baseRoute = core.channel.routing.resolveAgentRoute({
cfg,
channel: "matrix-js",
accountId,
@@ -542,6 +546,56 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
id: isDirectMessage ? senderId : roomId,
},
});
const bindingConversationId =
threadRootId && threadRootId !== messageId ? threadRootId : roomId;
const bindingParentConversationId = bindingConversationId === roomId ? undefined : roomId;
const sessionBindingService = getSessionBindingService();
const runtimeBinding = sessionBindingService.resolveByConversation({
channel: "matrix-js",
accountId,
conversationId: bindingConversationId,
parentConversationId: bindingParentConversationId,
});
const configuredRoute =
runtimeBinding == null
? resolveConfiguredAcpRoute({
cfg,
route: baseRoute,
channel: "matrix-js",
accountId,
conversationId: bindingConversationId,
parentConversationId: bindingParentConversationId,
})
: null;
const configuredBinding = configuredRoute?.configuredBinding ?? null;
if (!runtimeBinding && configuredBinding) {
const ensured = await ensureConfiguredAcpRouteReady({
cfg,
configuredBinding,
});
if (!ensured.ok) {
logInboundDrop({
log: logVerboseMessage,
channel: "matrix-js",
reason: "configured ACP binding unavailable",
target: configuredBinding.spec.conversationId,
});
return;
}
}
const boundSessionKey = runtimeBinding?.targetSessionKey?.trim();
const route =
runtimeBinding && boundSessionKey
? {
...baseRoute,
sessionKey: boundSessionKey,
agentId: resolveAgentIdFromSessionKey(boundSessionKey) || baseRoute.agentId,
matchedBy: "binding.channel" as const,
}
: (configuredRoute?.route ?? baseRoute);
if (runtimeBinding) {
sessionBindingService.touch(runtimeBinding.bindingId, eventTs);
}
const envelopeFrom = isDirectMessage ? senderName : (roomName ?? roomId);
const textWithId = `${bodyText}\n[matrix event id: ${messageId} room: ${roomId}]`;
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {

View File

@@ -2,6 +2,8 @@ import { format } from "node:util";
import {
GROUP_POLICY_BLOCKED_LABEL,
mergeAllowlist,
resolveThreadBindingIdleTimeoutMsForChannel,
resolveThreadBindingMaxAgeMsForChannel,
resolveAllowlistProviderRuntimeGroupPolicy,
resolveDefaultGroupPolicy,
summarizeMapping,
@@ -21,6 +23,7 @@ import {
} from "../client.js";
import { updateMatrixAccountConfig } from "../config-update.js";
import { syncMatrixOwnProfile } from "../profile.js";
import { createMatrixThreadBindingManager } from "../thread-bindings.js";
import { normalizeMatrixUserId } from "./allowlist.js";
import { registerMatrixAutoJoin } from "./auto-join.js";
import { createDirectRoomTracker } from "./direct.js";
@@ -270,6 +273,16 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
const groupPolicy = allowlistOnly && groupPolicyRaw === "open" ? "allowlist" : groupPolicyRaw;
const replyToMode = opts.replyToMode ?? accountConfig.replyToMode ?? "off";
const threadReplies = accountConfig.threadReplies ?? "inbound";
const threadBindingIdleTimeoutMs = resolveThreadBindingIdleTimeoutMsForChannel({
cfg,
channel: "matrix-js",
accountId: account.accountId,
});
const threadBindingMaxAgeMs = resolveThreadBindingMaxAgeMsForChannel({
cfg,
channel: "matrix-js",
accountId: account.accountId,
});
const dmConfig = accountConfig.dm;
const dmEnabled = dmConfig?.enabled ?? true;
const dmPolicyRaw = dmConfig?.policy ?? "pairing";
@@ -328,6 +341,18 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
accountId: opts.accountId,
});
logVerboseMessage("matrix: client started");
const threadBindingManager = await createMatrixThreadBindingManager({
accountId: account.accountId,
auth,
client,
env: process.env,
idleTimeoutMs: threadBindingIdleTimeoutMs,
maxAgeMs: threadBindingMaxAgeMs,
logVerboseMessage,
});
logVerboseMessage(
`matrix: thread bindings ready account=${threadBindingManager.accountId} idleMs=${threadBindingIdleTimeoutMs} maxAgeMs=${threadBindingMaxAgeMs}`,
);
// Shared client is already started via resolveSharedMatrixClient.
logger.info(`matrix: logged in as ${auth.userId}`);
@@ -414,6 +439,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
await new Promise<void>((resolve) => {
const onAbort = () => {
try {
threadBindingManager.stop();
logVerboseMessage("matrix: stopping client");
stopSharedClientForAccount(auth, opts.accountId);
} finally {

View File

@@ -0,0 +1,225 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { PluginRuntime } from "openclaw/plugin-sdk/matrix-js";
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
getSessionBindingService,
__testing,
} from "../../../../src/infra/outbound/session-binding-service.js";
import { setMatrixRuntime } from "../runtime.js";
import {
createMatrixThreadBindingManager,
resetMatrixThreadBindingsForTests,
} from "./thread-bindings.js";
const sendMessageMatrixMock = vi.hoisted(() =>
vi.fn(async (_to: string, _message: string, opts?: { threadId?: string }) => ({
messageId: opts?.threadId ? "$reply" : "$root",
roomId: "!room:example",
})),
);
vi.mock("./send.js", async () => {
const actual = await vi.importActual<typeof import("./send.js")>("./send.js");
return {
...actual,
sendMessageMatrix: sendMessageMatrixMock,
};
});
describe("matrix thread bindings", () => {
let stateDir: string;
beforeEach(async () => {
stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "matrix-thread-bindings-"));
__testing.resetSessionBindingAdaptersForTests();
resetMatrixThreadBindingsForTests();
sendMessageMatrixMock.mockClear();
setMatrixRuntime({
state: {
resolveStateDir: () => stateDir,
},
} as PluginRuntime);
});
it("creates child Matrix thread bindings from a top-level room context", async () => {
await createMatrixThreadBindingManager({
accountId: "ops",
auth: {
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
client: {} as never,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
enableSweeper: false,
});
const binding = await getSessionBindingService().bind({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
conversation: {
channel: "matrix-js",
accountId: "ops",
conversationId: "!room:example",
},
placement: "child",
metadata: {
introText: "intro root",
},
});
expect(sendMessageMatrixMock).toHaveBeenCalledWith("room:!room:example", "intro root", {
client: {},
accountId: "ops",
});
expect(binding.conversation).toEqual({
channel: "matrix-js",
accountId: "ops",
conversationId: "$root",
parentConversationId: "!room:example",
});
});
it("posts intro messages inside existing Matrix threads for current placement", async () => {
await createMatrixThreadBindingManager({
accountId: "ops",
auth: {
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
client: {} as never,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
enableSweeper: false,
});
const binding = await getSessionBindingService().bind({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
conversation: {
channel: "matrix-js",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
},
placement: "current",
metadata: {
introText: "intro thread",
},
});
expect(sendMessageMatrixMock).toHaveBeenCalledWith("room:!room:example", "intro thread", {
client: {},
accountId: "ops",
threadId: "$thread",
});
expect(
getSessionBindingService().resolveByConversation({
channel: "matrix-js",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
}),
).toMatchObject({
bindingId: binding.bindingId,
targetSessionKey: "agent:ops:subagent:child",
});
});
it("expires idle bindings via the sweeper", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-08T12:00:00.000Z"));
try {
await createMatrixThreadBindingManager({
accountId: "ops",
auth: {
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
client: {} as never,
idleTimeoutMs: 1_000,
maxAgeMs: 0,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
conversation: {
channel: "matrix-js",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
},
placement: "current",
metadata: {
introText: "intro thread",
},
});
sendMessageMatrixMock.mockClear();
await vi.advanceTimersByTimeAsync(61_000);
await Promise.resolve();
expect(
getSessionBindingService().resolveByConversation({
channel: "matrix-js",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
}),
).toBeNull();
} finally {
vi.useRealTimers();
}
});
it("sends threaded farewell messages when bindings are unbound", async () => {
await createMatrixThreadBindingManager({
accountId: "ops",
auth: {
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
client: {} as never,
idleTimeoutMs: 1_000,
maxAgeMs: 0,
enableSweeper: false,
});
const binding = await getSessionBindingService().bind({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
conversation: {
channel: "matrix-js",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
},
placement: "current",
metadata: {
introText: "intro thread",
},
});
sendMessageMatrixMock.mockClear();
await getSessionBindingService().unbind({
bindingId: binding.bindingId,
reason: "idle-expired",
});
expect(sendMessageMatrixMock).toHaveBeenCalledWith(
"room:!room:example",
expect.stringContaining("Session ended automatically"),
expect.objectContaining({
accountId: "ops",
threadId: "$thread",
}),
);
});
});

View File

@@ -0,0 +1,681 @@
import path from "node:path";
import {
readJsonFileWithFallback,
registerSessionBindingAdapter,
resolveAgentIdFromSessionKey,
resolveThreadBindingFarewellText,
unregisterSessionBindingAdapter,
writeJsonFileAtomically,
type BindingTargetKind,
type SessionBindingRecord,
} from "openclaw/plugin-sdk/matrix-js";
import { resolveMatrixStoragePaths } from "./client/storage.js";
import type { MatrixAuth } from "./client/types.js";
import type { MatrixClient } from "./sdk.js";
import { sendMessageMatrix } from "./send.js";
const STORE_VERSION = 1;
const THREAD_BINDINGS_SWEEP_INTERVAL_MS = 60_000;
const TOUCH_PERSIST_DELAY_MS = 30_000;
type MatrixThreadBindingTargetKind = "subagent" | "acp";
type MatrixThreadBindingRecord = {
accountId: string;
conversationId: string;
parentConversationId?: string;
targetKind: MatrixThreadBindingTargetKind;
targetSessionKey: string;
agentId?: string;
label?: string;
boundBy?: string;
boundAt: number;
lastActivityAt: number;
idleTimeoutMs?: number;
maxAgeMs?: number;
};
type StoredMatrixThreadBindingState = {
version: number;
bindings: MatrixThreadBindingRecord[];
};
export type MatrixThreadBindingManager = {
accountId: string;
getIdleTimeoutMs: () => number;
getMaxAgeMs: () => number;
getByConversation: (params: {
conversationId: string;
parentConversationId?: string;
}) => MatrixThreadBindingRecord | undefined;
listBySessionKey: (targetSessionKey: string) => MatrixThreadBindingRecord[];
listBindings: () => MatrixThreadBindingRecord[];
touchBinding: (bindingId: string, at?: number) => MatrixThreadBindingRecord | null;
setIdleTimeoutBySessionKey: (params: {
targetSessionKey: string;
idleTimeoutMs: number;
}) => MatrixThreadBindingRecord[];
setMaxAgeBySessionKey: (params: {
targetSessionKey: string;
maxAgeMs: number;
}) => MatrixThreadBindingRecord[];
stop: () => void;
};
const MANAGERS_BY_ACCOUNT_ID = new Map<string, MatrixThreadBindingManager>();
const BINDINGS_BY_ACCOUNT_CONVERSATION = new Map<string, MatrixThreadBindingRecord>();
function normalizeDurationMs(raw: unknown, fallback: number): number {
if (typeof raw !== "number" || !Number.isFinite(raw)) {
return fallback;
}
return Math.max(0, Math.floor(raw));
}
function normalizeText(raw: unknown): string {
return typeof raw === "string" ? raw.trim() : "";
}
function normalizeConversationId(raw: unknown): string | undefined {
const trimmed = normalizeText(raw);
return trimmed || undefined;
}
function resolveBindingKey(params: {
accountId: string;
conversationId: string;
parentConversationId?: string;
}): string {
return `${params.accountId}:${params.parentConversationId?.trim() || "-"}:${params.conversationId}`;
}
function toSessionBindingTargetKind(raw: MatrixThreadBindingTargetKind): BindingTargetKind {
return raw === "subagent" ? "subagent" : "session";
}
function toMatrixBindingTargetKind(raw: BindingTargetKind): MatrixThreadBindingTargetKind {
return raw === "subagent" ? "subagent" : "acp";
}
function resolveEffectiveBindingExpiry(params: {
record: MatrixThreadBindingRecord;
defaultIdleTimeoutMs: number;
defaultMaxAgeMs: number;
}): {
expiresAt?: number;
reason?: "idle-expired" | "max-age-expired";
} {
const idleTimeoutMs =
typeof params.record.idleTimeoutMs === "number"
? Math.max(0, Math.floor(params.record.idleTimeoutMs))
: params.defaultIdleTimeoutMs;
const maxAgeMs =
typeof params.record.maxAgeMs === "number"
? Math.max(0, Math.floor(params.record.maxAgeMs))
: params.defaultMaxAgeMs;
const inactivityExpiresAt =
idleTimeoutMs > 0
? Math.max(params.record.lastActivityAt, params.record.boundAt) + idleTimeoutMs
: undefined;
const maxAgeExpiresAt = maxAgeMs > 0 ? params.record.boundAt + maxAgeMs : undefined;
if (inactivityExpiresAt != null && maxAgeExpiresAt != null) {
return inactivityExpiresAt <= maxAgeExpiresAt
? { expiresAt: inactivityExpiresAt, reason: "idle-expired" }
: { expiresAt: maxAgeExpiresAt, reason: "max-age-expired" };
}
if (inactivityExpiresAt != null) {
return { expiresAt: inactivityExpiresAt, reason: "idle-expired" };
}
if (maxAgeExpiresAt != null) {
return { expiresAt: maxAgeExpiresAt, reason: "max-age-expired" };
}
return {};
}
function toSessionBindingRecord(
record: MatrixThreadBindingRecord,
defaults: { idleTimeoutMs: number; maxAgeMs: number },
): SessionBindingRecord {
const lifecycle = resolveEffectiveBindingExpiry({
record,
defaultIdleTimeoutMs: defaults.idleTimeoutMs,
defaultMaxAgeMs: defaults.maxAgeMs,
});
const idleTimeoutMs =
typeof record.idleTimeoutMs === "number" ? record.idleTimeoutMs : defaults.idleTimeoutMs;
const maxAgeMs = typeof record.maxAgeMs === "number" ? record.maxAgeMs : defaults.maxAgeMs;
return {
bindingId: resolveBindingKey(record),
targetSessionKey: record.targetSessionKey,
targetKind: toSessionBindingTargetKind(record.targetKind),
conversation: {
channel: "matrix-js",
accountId: record.accountId,
conversationId: record.conversationId,
parentConversationId: record.parentConversationId,
},
status: "active",
boundAt: record.boundAt,
expiresAt: lifecycle.expiresAt,
metadata: {
agentId: record.agentId,
label: record.label,
boundBy: record.boundBy,
lastActivityAt: record.lastActivityAt,
idleTimeoutMs,
maxAgeMs,
},
};
}
function resolveBindingsPath(params: {
auth: MatrixAuth;
accountId: string;
env?: NodeJS.ProcessEnv;
}): string {
const storagePaths = resolveMatrixStoragePaths({
homeserver: params.auth.homeserver,
userId: params.auth.userId,
accessToken: params.auth.accessToken,
accountId: params.accountId,
env: params.env,
});
return path.join(storagePaths.rootDir, "thread-bindings.json");
}
async function loadBindingsFromDisk(filePath: string, accountId: string) {
const { value } = await readJsonFileWithFallback<StoredMatrixThreadBindingState | null>(
filePath,
null,
);
if (value?.version !== STORE_VERSION || !Array.isArray(value.bindings)) {
return [];
}
const loaded: MatrixThreadBindingRecord[] = [];
for (const entry of value.bindings) {
const conversationId = normalizeConversationId(entry?.conversationId);
const parentConversationId = normalizeConversationId(entry?.parentConversationId);
const targetSessionKey = normalizeText(entry?.targetSessionKey);
if (!conversationId || !targetSessionKey) {
continue;
}
const boundAt =
typeof entry?.boundAt === "number" && Number.isFinite(entry.boundAt)
? Math.floor(entry.boundAt)
: Date.now();
const lastActivityAt =
typeof entry?.lastActivityAt === "number" && Number.isFinite(entry.lastActivityAt)
? Math.floor(entry.lastActivityAt)
: boundAt;
loaded.push({
accountId,
conversationId,
...(parentConversationId ? { parentConversationId } : {}),
targetKind: entry?.targetKind === "subagent" ? "subagent" : "acp",
targetSessionKey,
agentId: normalizeText(entry?.agentId) || undefined,
label: normalizeText(entry?.label) || undefined,
boundBy: normalizeText(entry?.boundBy) || undefined,
boundAt,
lastActivityAt: Math.max(lastActivityAt, boundAt),
idleTimeoutMs:
typeof entry?.idleTimeoutMs === "number" && Number.isFinite(entry.idleTimeoutMs)
? Math.max(0, Math.floor(entry.idleTimeoutMs))
: undefined,
maxAgeMs:
typeof entry?.maxAgeMs === "number" && Number.isFinite(entry.maxAgeMs)
? Math.max(0, Math.floor(entry.maxAgeMs))
: undefined,
});
}
return loaded;
}
async function persistBindings(filePath: string, accountId: string): Promise<void> {
const bindings = [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()]
.filter((entry) => entry.accountId === accountId)
.sort((a, b) => a.boundAt - b.boundAt);
await writeJsonFileAtomically(filePath, {
version: STORE_VERSION,
bindings,
} satisfies StoredMatrixThreadBindingState);
}
function setBindingRecord(record: MatrixThreadBindingRecord): void {
BINDINGS_BY_ACCOUNT_CONVERSATION.set(resolveBindingKey(record), record);
}
function removeBindingRecord(record: MatrixThreadBindingRecord): MatrixThreadBindingRecord | null {
const key = resolveBindingKey(record);
const removed = BINDINGS_BY_ACCOUNT_CONVERSATION.get(key) ?? null;
if (removed) {
BINDINGS_BY_ACCOUNT_CONVERSATION.delete(key);
}
return removed;
}
function listBindingsForAccount(accountId: string): MatrixThreadBindingRecord[] {
return [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter(
(entry) => entry.accountId === accountId,
);
}
function buildMatrixBindingIntroText(params: {
metadata?: Record<string, unknown>;
targetSessionKey: string;
}): string {
const introText = normalizeText(params.metadata?.introText);
if (introText) {
return introText;
}
const label = normalizeText(params.metadata?.label);
const agentId =
normalizeText(params.metadata?.agentId) ||
resolveAgentIdFromSessionKey(params.targetSessionKey);
const base = label || agentId || "session";
return `⚙️ ${base} session active. Messages here go directly to this session.`;
}
async function sendBindingMessage(params: {
client: MatrixClient;
accountId: string;
roomId: string;
threadId?: string;
text: string;
}): Promise<string | null> {
const trimmed = params.text.trim();
if (!trimmed) {
return null;
}
const result = await sendMessageMatrix(`room:${params.roomId}`, trimmed, {
client: params.client,
accountId: params.accountId,
...(params.threadId ? { threadId: params.threadId } : {}),
});
return result.messageId || null;
}
async function sendFarewellMessage(params: {
client: MatrixClient;
accountId: string;
record: MatrixThreadBindingRecord;
defaultIdleTimeoutMs: number;
defaultMaxAgeMs: number;
reason?: string;
}): Promise<void> {
const roomId = params.record.parentConversationId ?? params.record.conversationId;
const idleTimeoutMs =
typeof params.record.idleTimeoutMs === "number"
? params.record.idleTimeoutMs
: params.defaultIdleTimeoutMs;
const maxAgeMs =
typeof params.record.maxAgeMs === "number" ? params.record.maxAgeMs : params.defaultMaxAgeMs;
const farewellText = resolveThreadBindingFarewellText({
reason: params.reason,
idleTimeoutMs,
maxAgeMs,
});
await sendBindingMessage({
client: params.client,
accountId: params.accountId,
roomId,
threadId:
params.record.parentConversationId &&
params.record.parentConversationId !== params.record.conversationId
? params.record.conversationId
: undefined,
text: farewellText,
}).catch(() => {});
}
export async function createMatrixThreadBindingManager(params: {
accountId: string;
auth: MatrixAuth;
client: MatrixClient;
env?: NodeJS.ProcessEnv;
idleTimeoutMs: number;
maxAgeMs: number;
enableSweeper?: boolean;
logVerboseMessage?: (message: string) => void;
}): Promise<MatrixThreadBindingManager> {
const existing = MANAGERS_BY_ACCOUNT_ID.get(params.accountId);
if (existing) {
return existing;
}
const filePath = resolveBindingsPath({
auth: params.auth,
accountId: params.accountId,
env: params.env,
});
const loaded = await loadBindingsFromDisk(filePath, params.accountId);
for (const record of loaded) {
setBindingRecord(record);
}
const persist = async () => await persistBindings(filePath, params.accountId);
const defaults = {
idleTimeoutMs: params.idleTimeoutMs,
maxAgeMs: params.maxAgeMs,
};
let persistTimer: NodeJS.Timeout | null = null;
const schedulePersist = (delayMs: number) => {
if (persistTimer) {
return;
}
persistTimer = setTimeout(() => {
persistTimer = null;
void persist();
}, delayMs);
persistTimer.unref?.();
};
const manager: MatrixThreadBindingManager = {
accountId: params.accountId,
getIdleTimeoutMs: () => defaults.idleTimeoutMs,
getMaxAgeMs: () => defaults.maxAgeMs,
getByConversation: ({ conversationId, parentConversationId }) =>
listBindingsForAccount(params.accountId).find((entry) => {
if (entry.conversationId !== conversationId.trim()) {
return false;
}
if (!parentConversationId) {
return true;
}
return (entry.parentConversationId ?? "") === parentConversationId.trim();
}),
listBySessionKey: (targetSessionKey) =>
listBindingsForAccount(params.accountId).filter(
(entry) => entry.targetSessionKey === targetSessionKey.trim(),
),
listBindings: () => listBindingsForAccount(params.accountId),
touchBinding: (bindingId, at) => {
const record = listBindingsForAccount(params.accountId).find(
(entry) => resolveBindingKey(entry) === bindingId.trim(),
);
if (!record) {
return null;
}
const nextRecord = {
...record,
lastActivityAt:
typeof at === "number" && Number.isFinite(at)
? Math.max(record.lastActivityAt, Math.floor(at))
: Date.now(),
};
setBindingRecord(nextRecord);
schedulePersist(TOUCH_PERSIST_DELAY_MS);
return nextRecord;
},
setIdleTimeoutBySessionKey: ({ targetSessionKey, idleTimeoutMs }) => {
const nextBindings = listBindingsForAccount(params.accountId)
.filter((entry) => entry.targetSessionKey === targetSessionKey.trim())
.map((entry) => ({
...entry,
idleTimeoutMs: Math.max(0, Math.floor(idleTimeoutMs)),
}));
for (const entry of nextBindings) {
setBindingRecord(entry);
}
void persist();
return nextBindings;
},
setMaxAgeBySessionKey: ({ targetSessionKey, maxAgeMs }) => {
const nextBindings = listBindingsForAccount(params.accountId)
.filter((entry) => entry.targetSessionKey === targetSessionKey.trim())
.map((entry) => ({
...entry,
maxAgeMs: Math.max(0, Math.floor(maxAgeMs)),
}));
for (const entry of nextBindings) {
setBindingRecord(entry);
}
void persist();
return nextBindings;
},
stop: () => {
if (sweepTimer) {
clearInterval(sweepTimer);
}
if (persistTimer) {
clearTimeout(persistTimer);
persistTimer = null;
}
unregisterSessionBindingAdapter({
channel: "matrix-js",
accountId: params.accountId,
});
if (MANAGERS_BY_ACCOUNT_ID.get(params.accountId) === manager) {
MANAGERS_BY_ACCOUNT_ID.delete(params.accountId);
}
for (const record of listBindingsForAccount(params.accountId)) {
BINDINGS_BY_ACCOUNT_CONVERSATION.delete(resolveBindingKey(record));
}
},
};
let sweepTimer: NodeJS.Timeout | null = null;
const unbindRecords = async (records: MatrixThreadBindingRecord[], reason: string) => {
if (records.length === 0) {
return [];
}
const removed = records
.map((record) => removeBindingRecord(record))
.filter((record): record is MatrixThreadBindingRecord => Boolean(record));
if (removed.length === 0) {
return [];
}
await persist();
await Promise.all(
removed.map(async (record) => {
await sendFarewellMessage({
client: params.client,
accountId: params.accountId,
record,
defaultIdleTimeoutMs: defaults.idleTimeoutMs,
defaultMaxAgeMs: defaults.maxAgeMs,
reason,
});
}),
);
return removed.map((record) => toSessionBindingRecord(record, defaults));
};
registerSessionBindingAdapter({
channel: "matrix-js",
accountId: params.accountId,
capabilities: { placements: ["current", "child"], bindSupported: true, unbindSupported: true },
bind: async (input) => {
const conversationId = input.conversation.conversationId.trim();
const parentConversationId = input.conversation.parentConversationId?.trim() || undefined;
const targetSessionKey = input.targetSessionKey.trim();
if (!conversationId || !targetSessionKey) {
return null;
}
let boundConversationId = conversationId;
let boundParentConversationId = parentConversationId;
const introText = buildMatrixBindingIntroText({
metadata: input.metadata,
targetSessionKey,
});
if (input.placement === "child") {
const roomId = parentConversationId || conversationId;
const rootEventId = await sendBindingMessage({
client: params.client,
accountId: params.accountId,
roomId,
text: introText,
});
if (!rootEventId) {
return null;
}
boundConversationId = rootEventId;
boundParentConversationId = roomId;
}
const now = Date.now();
const record: MatrixThreadBindingRecord = {
accountId: params.accountId,
conversationId: boundConversationId,
...(boundParentConversationId ? { parentConversationId: boundParentConversationId } : {}),
targetKind: toMatrixBindingTargetKind(input.targetKind),
targetSessionKey,
agentId:
normalizeText(input.metadata?.agentId) || resolveAgentIdFromSessionKey(targetSessionKey),
label: normalizeText(input.metadata?.label) || undefined,
boundBy: normalizeText(input.metadata?.boundBy) || "system",
boundAt: now,
lastActivityAt: now,
idleTimeoutMs: defaults.idleTimeoutMs,
maxAgeMs: defaults.maxAgeMs,
};
setBindingRecord(record);
await persist();
if (input.placement === "current" && introText) {
const roomId = boundParentConversationId || boundConversationId;
const threadId =
boundParentConversationId && boundParentConversationId !== boundConversationId
? boundConversationId
: undefined;
await sendBindingMessage({
client: params.client,
accountId: params.accountId,
roomId,
threadId,
text: introText,
}).catch(() => {});
}
return toSessionBindingRecord(record, defaults);
},
listBySession: (targetSessionKey) =>
manager
.listBySessionKey(targetSessionKey)
.map((record) => toSessionBindingRecord(record, defaults)),
resolveByConversation: (ref) => {
const record = manager.getByConversation({
conversationId: ref.conversationId,
parentConversationId: ref.parentConversationId,
});
return record ? toSessionBindingRecord(record, defaults) : null;
},
setIdleTimeoutBySession: ({ targetSessionKey, idleTimeoutMs }) =>
manager
.setIdleTimeoutBySessionKey({ targetSessionKey, idleTimeoutMs })
.map((record) => toSessionBindingRecord(record, defaults)),
setMaxAgeBySession: ({ targetSessionKey, maxAgeMs }) =>
manager
.setMaxAgeBySessionKey({ targetSessionKey, maxAgeMs })
.map((record) => toSessionBindingRecord(record, defaults)),
touch: (bindingId, at) => {
manager.touchBinding(bindingId, at);
},
unbind: async (input) => {
const removed = await unbindRecords(
listBindingsForAccount(params.accountId).filter((record) => {
if (input.bindingId?.trim()) {
return resolveBindingKey(record) === input.bindingId.trim();
}
if (input.targetSessionKey?.trim()) {
return record.targetSessionKey === input.targetSessionKey.trim();
}
return false;
}),
input.reason,
);
return removed;
},
});
if (params.enableSweeper !== false) {
sweepTimer = setInterval(() => {
const now = Date.now();
const expired = listBindingsForAccount(params.accountId)
.map((record) => ({
record,
lifecycle: resolveEffectiveBindingExpiry({
record,
defaultIdleTimeoutMs: defaults.idleTimeoutMs,
defaultMaxAgeMs: defaults.maxAgeMs,
}),
}))
.filter(
(
entry,
): entry is {
record: MatrixThreadBindingRecord;
lifecycle: { expiresAt: number; reason: "idle-expired" | "max-age-expired" };
} =>
typeof entry.lifecycle.expiresAt === "number" &&
entry.lifecycle.expiresAt <= now &&
Boolean(entry.lifecycle.reason),
);
if (expired.length === 0) {
return;
}
void Promise.all(
expired.map(async ({ record, lifecycle }) => {
params.logVerboseMessage?.(
`matrix: auto-unbinding ${record.conversationId} due to ${lifecycle.reason}`,
);
await unbindRecords([record], lifecycle.reason);
}),
);
}, THREAD_BINDINGS_SWEEP_INTERVAL_MS);
}
MANAGERS_BY_ACCOUNT_ID.set(params.accountId, manager);
return manager;
}
export function getMatrixThreadBindingManager(
accountId: string,
): MatrixThreadBindingManager | null {
return MANAGERS_BY_ACCOUNT_ID.get(accountId) ?? null;
}
export function setMatrixThreadBindingIdleTimeoutBySessionKey(params: {
accountId: string;
targetSessionKey: string;
idleTimeoutMs: number;
}): SessionBindingRecord[] {
const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId);
if (!manager) {
return [];
}
return manager.setIdleTimeoutBySessionKey(params).map((record) =>
toSessionBindingRecord(record, {
idleTimeoutMs: manager.getIdleTimeoutMs(),
maxAgeMs: manager.getMaxAgeMs(),
}),
);
}
export function setMatrixThreadBindingMaxAgeBySessionKey(params: {
accountId: string;
targetSessionKey: string;
maxAgeMs: number;
}): SessionBindingRecord[] {
const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId);
if (!manager) {
return [];
}
return manager.setMaxAgeBySessionKey(params).map((record) =>
toSessionBindingRecord(record, {
idleTimeoutMs: manager.getIdleTimeoutMs(),
maxAgeMs: manager.getMaxAgeMs(),
}),
);
}
export function resetMatrixThreadBindingsForTests(): void {
for (const manager of MANAGERS_BY_ACCOUNT_ID.values()) {
manager.stop();
}
MANAGERS_BY_ACCOUNT_ID.clear();
BINDINGS_BY_ACCOUNT_CONVERSATION.clear();
}

View File

@@ -7,14 +7,22 @@ const mocks = vi.hoisted(() => ({
reactMatrixMessage: vi.fn(),
listMatrixReactions: vi.fn(),
removeMatrixReactions: vi.fn(),
sendMatrixMessage: vi.fn(),
listMatrixPins: vi.fn(),
getMatrixMemberInfo: vi.fn(),
getMatrixRoomInfo: vi.fn(),
}));
vi.mock("./matrix/actions.js", async () => {
const actual = await vi.importActual<typeof import("./matrix/actions.js")>("./matrix/actions.js");
return {
...actual,
getMatrixMemberInfo: mocks.getMatrixMemberInfo,
getMatrixRoomInfo: mocks.getMatrixRoomInfo,
listMatrixReactions: mocks.listMatrixReactions,
listMatrixPins: mocks.listMatrixPins,
removeMatrixReactions: mocks.removeMatrixReactions,
sendMatrixMessage: mocks.sendMatrixMessage,
voteMatrixPoll: mocks.voteMatrixPoll,
};
});
@@ -39,7 +47,14 @@ describe("handleMatrixAction pollVote", () => {
maxSelections: 2,
});
mocks.listMatrixReactions.mockResolvedValue([{ key: "👍", count: 1, users: ["@u:example"] }]);
mocks.listMatrixPins.mockResolvedValue({ pinned: ["$pin"], events: [] });
mocks.removeMatrixReactions.mockResolvedValue({ removed: 1 });
mocks.sendMatrixMessage.mockResolvedValue({
messageId: "$sent",
roomId: "!room:example",
});
mocks.getMatrixMemberInfo.mockResolvedValue({ userId: "@u:example" });
mocks.getMatrixRoomInfo.mockResolvedValue({ roomId: "!room:example" });
});
it("parses snake_case vote params and forwards normalized selectors", async () => {
@@ -141,4 +156,67 @@ describe("handleMatrixAction pollVote", () => {
reactions: [{ key: "👍", count: 1 }],
});
});
it("passes account-scoped opts to message sends", async () => {
await handleMatrixAction(
{
action: "sendMessage",
accountId: "ops",
to: "room:!room:example",
content: "hello",
threadId: "$thread",
},
{ channels: { "matrix-js": { actions: { messages: true } } } } as CoreConfig,
);
expect(mocks.sendMatrixMessage).toHaveBeenCalledWith("room:!room:example", "hello", {
accountId: "ops",
mediaUrl: undefined,
replyToId: undefined,
threadId: "$thread",
});
});
it("passes account-scoped opts to pin listing", async () => {
await handleMatrixAction(
{
action: "listPins",
accountId: "ops",
roomId: "!room:example",
},
{ channels: { "matrix-js": { actions: { pins: true } } } } as CoreConfig,
);
expect(mocks.listMatrixPins).toHaveBeenCalledWith("!room:example", {
accountId: "ops",
});
});
it("passes account-scoped opts to member and room info actions", async () => {
await handleMatrixAction(
{
action: "memberInfo",
accountId: "ops",
userId: "@u:example",
roomId: "!room:example",
},
{ channels: { "matrix-js": { actions: { memberInfo: true } } } } as CoreConfig,
);
await handleMatrixAction(
{
action: "channelInfo",
accountId: "ops",
roomId: "!room:example",
},
{ channels: { "matrix-js": { actions: { channelInfo: true } } } } as CoreConfig,
);
expect(mocks.getMatrixMemberInfo).toHaveBeenCalledWith("@u:example", {
accountId: "ops",
roomId: "!room:example",
});
expect(mocks.getMatrixRoomInfo).toHaveBeenCalledWith("!room:example", {
accountId: "ops",
});
});
});

View File

@@ -130,6 +130,7 @@ export async function handleMatrixAction(
const action = readStringParam(params, "action", { required: true });
const accountId = readStringParam(params, "accountId") ?? undefined;
const isActionEnabled = createActionGate(cfg.channels?.["matrix-js"]?.actions);
const clientOpts = accountId ? { accountId } : {};
if (reactionActions.has(action)) {
if (!isActionEnabled("reactions")) {
@@ -199,6 +200,7 @@ export async function handleMatrixAction(
mediaUrl: mediaUrl ?? undefined,
replyToId: replyToId ?? undefined,
threadId: threadId ?? undefined,
...clientOpts,
});
return jsonResult({ ok: true, result });
}
@@ -206,14 +208,17 @@ export async function handleMatrixAction(
const roomId = readRoomId(params);
const messageId = readStringParam(params, "messageId", { required: true });
const content = readStringParam(params, "content", { required: true });
const result = await editMatrixMessage(roomId, messageId, content);
const result = await editMatrixMessage(roomId, messageId, content, clientOpts);
return jsonResult({ ok: true, result });
}
case "deleteMessage": {
const roomId = readRoomId(params);
const messageId = readStringParam(params, "messageId", { required: true });
const reason = readStringParam(params, "reason");
await deleteMatrixMessage(roomId, messageId, { reason: reason ?? undefined });
await deleteMatrixMessage(roomId, messageId, {
reason: reason ?? undefined,
...clientOpts,
});
return jsonResult({ ok: true, deleted: true });
}
case "readMessages": {
@@ -225,6 +230,7 @@ export async function handleMatrixAction(
limit: limit ?? undefined,
before: before ?? undefined,
after: after ?? undefined,
...clientOpts,
});
return jsonResult({ ok: true, ...result });
}
@@ -240,15 +246,15 @@ export async function handleMatrixAction(
const roomId = readRoomId(params);
if (action === "pinMessage") {
const messageId = readStringParam(params, "messageId", { required: true });
const result = await pinMatrixMessage(roomId, messageId);
const result = await pinMatrixMessage(roomId, messageId, clientOpts);
return jsonResult({ ok: true, pinned: result.pinned });
}
if (action === "unpinMessage") {
const messageId = readStringParam(params, "messageId", { required: true });
const result = await unpinMatrixMessage(roomId, messageId);
const result = await unpinMatrixMessage(roomId, messageId, clientOpts);
return jsonResult({ ok: true, pinned: result.pinned });
}
const result = await listMatrixPins(roomId);
const result = await listMatrixPins(roomId, clientOpts);
return jsonResult({ ok: true, pinned: result.pinned, events: result.events });
}
@@ -260,6 +266,7 @@ export async function handleMatrixAction(
const roomId = readStringParam(params, "roomId") ?? readStringParam(params, "channelId");
const result = await getMatrixMemberInfo(userId, {
roomId: roomId ?? undefined,
...clientOpts,
});
return jsonResult({ ok: true, member: result });
}
@@ -269,7 +276,7 @@ export async function handleMatrixAction(
throw new Error("Matrix room info is disabled.");
}
const roomId = readRoomId(params);
const result = await getMatrixRoomInfo(roomId);
const result = await getMatrixRoomInfo(roomId, clientOpts);
return jsonResult({ ok: true, room: result });
}

View File

@@ -40,6 +40,14 @@ export type MatrixActionConfig = {
verification?: boolean;
};
export type MatrixThreadBindingsConfig = {
enabled?: boolean;
idleHours?: number;
maxAgeHours?: number;
spawnSubagentSessions?: boolean;
spawnAcpSessions?: boolean;
};
/** Per-account Matrix config (excludes the accounts field to prevent recursion). */
export type MatrixAccountConfig = Omit<MatrixConfig, "accounts">;
@@ -90,6 +98,8 @@ export type MatrixConfig = {
ackReactionScope?: "group-mentions" | "group-all" | "direct" | "all" | "none" | "off";
/** Inbound reaction notifications for bot-authored Matrix messages. */
reactionNotifications?: "off" | "own";
/** Thread/session binding behavior for Matrix room threads. */
threadBindings?: MatrixThreadBindingsConfig;
/** Whether Matrix-js should auto-request self verification on startup when unverified. */
startupVerification?: "off" | "if-unverified";
/** Cooldown window for automatic startup verification requests. Default: 24 hours. */

View File

@@ -587,6 +587,28 @@ export function createTelegramThreadBindingManager(
})
: null;
},
setIdleTimeoutBySession: ({ targetSessionKey, idleTimeoutMs: nextIdleTimeoutMs }) =>
setTelegramThreadBindingIdleTimeoutBySessionKey({
targetSessionKey,
accountId,
idleTimeoutMs: nextIdleTimeoutMs,
}).map((entry) =>
toSessionBindingRecord(entry, {
idleTimeoutMs,
maxAgeMs,
}),
),
setMaxAgeBySession: ({ targetSessionKey, maxAgeMs: nextMaxAgeMs }) =>
setTelegramThreadBindingMaxAgeBySessionKey({
targetSessionKey,
accountId,
maxAgeMs: nextMaxAgeMs,
}).map((entry) =>
toSessionBindingRecord(entry, {
idleTimeoutMs,
maxAgeMs,
}),
),
touch: (bindingId, at) => {
const conversationId = resolveThreadBindingConversationIdFromBindingId({
accountId,