mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 21:40:44 +00:00
matrix: add subagent spawn thread-binding hooks
This commit is contained in:
committed by
Gustavo Madeira Santana
parent
857b9cd326
commit
af0db3f310
@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Memory/sqlite-vec: emit the degraded sqlite-vec warning once per degraded episode instead of repeating it for every file write, while preserving the latch across safe-reindex rollback and resetting it when vector state is genuinely rebuilt. (#67898) Thanks @rubencu.
|
||||
- Reply/block streaming: preserve post-stream incomplete-turn error payloads after block streaming already emitted content, so users get the warning instead of silence. (#67991) Thanks @obviyus.
|
||||
- Telegram/streaming: clear the compaction replay guard after visible non-final boundaries so a post-tool assistant reply rotates to a fresh preview instead of editing the pre-compaction message. (#67993) Thanks @obviyus.
|
||||
- Matrix: fix `sessions_spawn --thread` subagent session spawning — thread binding creation, cleanup on session end, and completion-message delivery target resolution now work end-to-end. (#67643) Thanks @eejohnso-ops.
|
||||
|
||||
## 2026.4.15
|
||||
|
||||
|
||||
@@ -2,6 +2,15 @@ import { defineBundledChannelEntry } from "openclaw/plugin-sdk/channel-entry-con
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
|
||||
import { registerMatrixCliMetadata } from "./cli-metadata.js";
|
||||
|
||||
type MatrixSubagentHooksModule = typeof import("./src/matrix/subagent-hooks.js");
|
||||
|
||||
let matrixSubagentHooksPromise: Promise<MatrixSubagentHooksModule> | null = null;
|
||||
|
||||
function loadMatrixSubagentHooksModule() {
|
||||
matrixSubagentHooksPromise ??= import("./src/matrix/subagent-hooks.js");
|
||||
return matrixSubagentHooksPromise;
|
||||
}
|
||||
|
||||
export default defineBundledChannelEntry({
|
||||
id: "matrix",
|
||||
name: "Matrix",
|
||||
@@ -47,5 +56,18 @@ export default defineBundledChannelEntry({
|
||||
const { handleVerificationStatus } = await import("./plugin-entry.handlers.runtime.js");
|
||||
await handleVerificationStatus(ctx);
|
||||
});
|
||||
|
||||
api.on("subagent_spawning", async (event) => {
|
||||
const { handleMatrixSubagentSpawning } = await loadMatrixSubagentHooksModule();
|
||||
return await handleMatrixSubagentSpawning(api, event);
|
||||
});
|
||||
api.on("subagent_ended", async (event) => {
|
||||
const { handleMatrixSubagentEnded } = await loadMatrixSubagentHooksModule();
|
||||
await handleMatrixSubagentEnded(event);
|
||||
});
|
||||
api.on("subagent_delivery_target", async (event) => {
|
||||
const { handleMatrixSubagentDeliveryTarget } = await loadMatrixSubagentHooksModule();
|
||||
return handleMatrixSubagentDeliveryTarget(event);
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
568
extensions/matrix/src/matrix/subagent-hooks.test.ts
Normal file
568
extensions/matrix/src/matrix/subagent-hooks.test.ts
Normal file
@@ -0,0 +1,568 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
// Hoisted stubs referenced in vi.mock factories below
|
||||
const bindMock = vi.hoisted(() => vi.fn());
|
||||
const getManagerMock = vi.hoisted(() => vi.fn());
|
||||
const listAllBindingsMock = vi.hoisted(() => vi.fn((): any[] => []));
|
||||
const listBindingsForAccountMock = vi.hoisted(() => vi.fn((): any[] => []));
|
||||
const removeBindingRecordMock = vi.hoisted(() => vi.fn(() => false));
|
||||
const resolveMatrixBaseConfigMock = vi.hoisted(() => vi.fn((): any => ({})));
|
||||
const findMatrixAccountConfigMock = vi.hoisted(() => vi.fn((): any => undefined));
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/conversation-binding-runtime", () => ({
|
||||
getSessionBindingService: () => ({ bind: bindMock }),
|
||||
}));
|
||||
|
||||
vi.mock("./account-config.js", () => ({
|
||||
resolveMatrixBaseConfig: resolveMatrixBaseConfigMock,
|
||||
findMatrixAccountConfig: findMatrixAccountConfigMock,
|
||||
}));
|
||||
|
||||
vi.mock("./thread-bindings-shared.js", () => ({
|
||||
getMatrixThreadBindingManager: getManagerMock,
|
||||
listAllBindings: listAllBindingsMock,
|
||||
listBindingsForAccount: listBindingsForAccountMock,
|
||||
removeBindingRecord: removeBindingRecordMock,
|
||||
}));
|
||||
|
||||
import {
|
||||
handleMatrixSubagentDeliveryTarget,
|
||||
handleMatrixSubagentEnded,
|
||||
handleMatrixSubagentSpawning,
|
||||
} from "./subagent-hooks.js";
|
||||
|
||||
// A minimal fake api — only config is used by these hooks
|
||||
const fakeApi = { config: {} } as never;
|
||||
|
||||
function makeSpawnEvent(
|
||||
overrides: Partial<{
|
||||
threadRequested: boolean;
|
||||
channel: string;
|
||||
accountId: string;
|
||||
to: string;
|
||||
childSessionKey: string;
|
||||
agentId: string;
|
||||
label: string;
|
||||
}> = {},
|
||||
) {
|
||||
return {
|
||||
threadRequested: overrides.threadRequested ?? true,
|
||||
requester: {
|
||||
channel: overrides.channel ?? "matrix",
|
||||
accountId: overrides.accountId ?? "default",
|
||||
to: overrides.to ?? "room:!room123:example.org",
|
||||
},
|
||||
childSessionKey: overrides.childSessionKey ?? "agent:default:subagent:child",
|
||||
agentId: overrides.agentId ?? "worker",
|
||||
label: overrides.label,
|
||||
};
|
||||
}
|
||||
|
||||
describe("handleMatrixSubagentSpawning", () => {
|
||||
beforeEach(() => {
|
||||
bindMock.mockReset();
|
||||
getManagerMock.mockReset();
|
||||
resolveMatrixBaseConfigMock.mockReset();
|
||||
findMatrixAccountConfigMock.mockReset();
|
||||
// Default: bindings enabled, spawn enabled
|
||||
resolveMatrixBaseConfigMock.mockReturnValue({
|
||||
threadBindings: { enabled: true, spawnSubagentSessions: true },
|
||||
});
|
||||
findMatrixAccountConfigMock.mockReturnValue(undefined);
|
||||
// Default: manager exists
|
||||
getManagerMock.mockReturnValue({ persist: vi.fn() });
|
||||
// Default: bind resolves ok
|
||||
bindMock.mockResolvedValue({ conversation: {} });
|
||||
});
|
||||
|
||||
it("returns undefined when threadRequested is false", async () => {
|
||||
const result = await handleMatrixSubagentSpawning(
|
||||
fakeApi,
|
||||
makeSpawnEvent({ threadRequested: false }),
|
||||
);
|
||||
expect(result).toBeUndefined();
|
||||
expect(bindMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("returns undefined when channel is not matrix", async () => {
|
||||
const result = await handleMatrixSubagentSpawning(
|
||||
fakeApi,
|
||||
makeSpawnEvent({ channel: "slack" }),
|
||||
);
|
||||
expect(result).toBeUndefined();
|
||||
expect(bindMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("returns undefined when channel has mixed casing but is still matrix", async () => {
|
||||
// channel.trim().toLowerCase() must equal "matrix"
|
||||
const result = await handleMatrixSubagentSpawning(
|
||||
fakeApi,
|
||||
makeSpawnEvent({ channel: " Matrix " }),
|
||||
);
|
||||
expect(result).not.toBeUndefined();
|
||||
// It proceeds (no early-return for non-matrix)
|
||||
});
|
||||
|
||||
it("returns error when thread bindings are disabled", async () => {
|
||||
resolveMatrixBaseConfigMock.mockReturnValue({
|
||||
threadBindings: { enabled: false, spawnSubagentSessions: true },
|
||||
});
|
||||
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent());
|
||||
expect(result).toEqual(
|
||||
expect.objectContaining({
|
||||
status: "error",
|
||||
error: expect.stringContaining("thread bindings are disabled"),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("returns error when spawnSubagentSessions is false", async () => {
|
||||
resolveMatrixBaseConfigMock.mockReturnValue({
|
||||
threadBindings: { enabled: true, spawnSubagentSessions: false },
|
||||
});
|
||||
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent());
|
||||
expect(result).toEqual(
|
||||
expect.objectContaining({
|
||||
status: "error",
|
||||
error: expect.stringContaining("spawnSubagentSessions"),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("returns error when spawnSubagentSessions defaults to false (no config)", async () => {
|
||||
resolveMatrixBaseConfigMock.mockReturnValue({});
|
||||
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent());
|
||||
expect(result).toEqual(
|
||||
expect.objectContaining({
|
||||
status: "error",
|
||||
error: expect.stringContaining("spawnSubagentSessions"),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("returns error when requester.to has no room target", async () => {
|
||||
const result = await handleMatrixSubagentSpawning(
|
||||
fakeApi,
|
||||
makeSpawnEvent({ to: "@user:example.org" }),
|
||||
);
|
||||
expect(result).toEqual(
|
||||
expect.objectContaining({
|
||||
status: "error",
|
||||
error: expect.stringContaining("no room target"),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("returns error when requester.to is empty", async () => {
|
||||
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent({ to: "" }));
|
||||
expect(result).toEqual(
|
||||
expect.objectContaining({
|
||||
status: "error",
|
||||
error: expect.stringContaining("no room target"),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("returns error when no binding manager is available for the account", async () => {
|
||||
getManagerMock.mockReturnValue(null);
|
||||
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent());
|
||||
expect(result).toEqual(
|
||||
expect.objectContaining({
|
||||
status: "error",
|
||||
error: expect.stringContaining("No Matrix thread binding manager"),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("calls bind with the resolved room id and returns ok", async () => {
|
||||
bindMock.mockResolvedValue({ conversation: {} });
|
||||
const result = await handleMatrixSubagentSpawning(
|
||||
fakeApi,
|
||||
makeSpawnEvent({
|
||||
accountId: "ops",
|
||||
to: "room:!roomAbc:technerik.com",
|
||||
childSessionKey: "agent:ops:subagent:worker",
|
||||
agentId: "builder",
|
||||
label: "Build Agent",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(bindMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
targetSessionKey: "agent:ops:subagent:worker",
|
||||
targetKind: "subagent",
|
||||
conversation: expect.objectContaining({
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
conversationId: "!roomAbc:technerik.com",
|
||||
}),
|
||||
placement: "child",
|
||||
metadata: expect.objectContaining({
|
||||
agentId: "builder",
|
||||
label: "Build Agent",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
expect(result).toEqual({ status: "ok", threadBindingReady: true });
|
||||
});
|
||||
|
||||
it("uses 'default' as accountId when requester.accountId is absent", async () => {
|
||||
bindMock.mockResolvedValue({ conversation: {} });
|
||||
await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent({ accountId: undefined as never }));
|
||||
expect(getManagerMock).toHaveBeenCalledWith("default");
|
||||
expect(bindMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
conversation: expect.objectContaining({ accountId: "default" }),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("returns error when bind() throws", async () => {
|
||||
bindMock.mockRejectedValue(new Error("provider auth failed"));
|
||||
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent());
|
||||
expect(result).toEqual(
|
||||
expect.objectContaining({
|
||||
status: "error",
|
||||
error: expect.stringContaining("provider auth failed"),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("respects per-account threadBindings override over base config", async () => {
|
||||
// Base says spawnSubagentSessions=false; account override says true
|
||||
resolveMatrixBaseConfigMock.mockReturnValue({
|
||||
threadBindings: { enabled: true, spawnSubagentSessions: false },
|
||||
});
|
||||
findMatrixAccountConfigMock.mockReturnValue({
|
||||
threadBindings: { spawnSubagentSessions: true },
|
||||
});
|
||||
bindMock.mockResolvedValue({ conversation: {} });
|
||||
|
||||
const result = await handleMatrixSubagentSpawning(
|
||||
fakeApi,
|
||||
makeSpawnEvent({ accountId: "forge" }),
|
||||
);
|
||||
expect(result).toEqual({ status: "ok", threadBindingReady: true });
|
||||
});
|
||||
});
|
||||
|
||||
describe("handleMatrixSubagentEnded", () => {
|
||||
const mockManager = { persist: vi.fn() };
|
||||
|
||||
beforeEach(() => {
|
||||
getManagerMock.mockReset();
|
||||
listAllBindingsMock.mockReset();
|
||||
listBindingsForAccountMock.mockReset();
|
||||
removeBindingRecordMock.mockReset();
|
||||
mockManager.persist.mockReset();
|
||||
});
|
||||
|
||||
it("does nothing when no matching bindings exist", async () => {
|
||||
listBindingsForAccountMock.mockReturnValue([]);
|
||||
await handleMatrixSubagentEnded({
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
});
|
||||
expect(getManagerMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("removes matching bindings and calls persist on the manager", async () => {
|
||||
const binding = {
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread",
|
||||
parentConversationId: "!room:example",
|
||||
boundAt: 0,
|
||||
lastActivityAt: 0,
|
||||
};
|
||||
listBindingsForAccountMock.mockReturnValue([binding]);
|
||||
removeBindingRecordMock.mockReturnValue(true);
|
||||
getManagerMock.mockReturnValue(mockManager);
|
||||
mockManager.persist.mockResolvedValue(undefined);
|
||||
|
||||
await handleMatrixSubagentEnded({
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
});
|
||||
|
||||
expect(removeBindingRecordMock).toHaveBeenCalledWith(binding);
|
||||
expect(getManagerMock).toHaveBeenCalledWith("ops");
|
||||
expect(mockManager.persist).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("skips persist when removeBindingRecord returns false (binding not found in store)", async () => {
|
||||
const binding = {
|
||||
targetSessionKey: "agent:ops:subagent:orphan",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread",
|
||||
parentConversationId: "!room:example",
|
||||
boundAt: 0,
|
||||
lastActivityAt: 0,
|
||||
};
|
||||
listBindingsForAccountMock.mockReturnValue([binding]);
|
||||
removeBindingRecordMock.mockReturnValue(false);
|
||||
|
||||
await handleMatrixSubagentEnded({
|
||||
targetSessionKey: "agent:ops:subagent:orphan",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
});
|
||||
|
||||
expect(getManagerMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("falls back to listAllBindings when accountId is absent", async () => {
|
||||
const binding = {
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread",
|
||||
parentConversationId: "!room:example",
|
||||
boundAt: 0,
|
||||
lastActivityAt: 0,
|
||||
};
|
||||
listAllBindingsMock.mockReturnValue([binding]);
|
||||
removeBindingRecordMock.mockReturnValue(true);
|
||||
getManagerMock.mockReturnValue(mockManager);
|
||||
mockManager.persist.mockResolvedValue(undefined);
|
||||
|
||||
await handleMatrixSubagentEnded({
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
});
|
||||
|
||||
expect(listAllBindingsMock).toHaveBeenCalled();
|
||||
expect(listBindingsForAccountMock).not.toHaveBeenCalled();
|
||||
expect(mockManager.persist).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not double-persist when multiple bindings share the same account", async () => {
|
||||
const mkBinding = (conversationId: string) => ({
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
conversationId,
|
||||
parentConversationId: "!room:example",
|
||||
boundAt: 0,
|
||||
lastActivityAt: 0,
|
||||
});
|
||||
listBindingsForAccountMock.mockReturnValue([mkBinding("$t1"), mkBinding("$t2")]);
|
||||
removeBindingRecordMock.mockReturnValue(true);
|
||||
getManagerMock.mockReturnValue(mockManager);
|
||||
mockManager.persist.mockResolvedValue(undefined);
|
||||
|
||||
await handleMatrixSubagentEnded({
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
});
|
||||
|
||||
// persist must be called exactly once per unique accountId, not once per binding
|
||||
expect(mockManager.persist).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("handleMatrixSubagentDeliveryTarget", () => {
|
||||
beforeEach(() => {
|
||||
listAllBindingsMock.mockReset();
|
||||
listBindingsForAccountMock.mockReset();
|
||||
});
|
||||
|
||||
it("returns undefined when expectsCompletionMessage is false", () => {
|
||||
const result = handleMatrixSubagentDeliveryTarget({
|
||||
childSessionKey: "agent:ops:subagent:child",
|
||||
requesterOrigin: { channel: "matrix", accountId: "ops" },
|
||||
expectsCompletionMessage: false,
|
||||
});
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
|
||||
it("returns undefined when requester channel is not matrix", () => {
|
||||
listBindingsForAccountMock.mockReturnValue([]);
|
||||
const result = handleMatrixSubagentDeliveryTarget({
|
||||
childSessionKey: "agent:ops:subagent:child",
|
||||
requesterOrigin: { channel: "slack", accountId: "ops" },
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
|
||||
it("returns undefined when no bindings match the child session key", () => {
|
||||
listBindingsForAccountMock.mockReturnValue([
|
||||
{
|
||||
targetSessionKey: "agent:ops:subagent:OTHER",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread",
|
||||
parentConversationId: "!room:example",
|
||||
boundAt: 0,
|
||||
lastActivityAt: 0,
|
||||
},
|
||||
]);
|
||||
const result = handleMatrixSubagentDeliveryTarget({
|
||||
childSessionKey: "agent:ops:subagent:child",
|
||||
requesterOrigin: { channel: "matrix", accountId: "ops" },
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
|
||||
it("returns origin with threadId when binding has a distinct parentConversationId", () => {
|
||||
const binding = {
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread123",
|
||||
parentConversationId: "!room:example",
|
||||
boundAt: 0,
|
||||
lastActivityAt: 0,
|
||||
};
|
||||
listBindingsForAccountMock.mockReturnValue([binding]);
|
||||
|
||||
const result = handleMatrixSubagentDeliveryTarget({
|
||||
childSessionKey: "agent:ops:subagent:child",
|
||||
requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$thread123" },
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
|
||||
expect(result).toEqual({
|
||||
origin: {
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
to: "room:!room:example",
|
||||
threadId: "$thread123",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("returns origin without threadId when conversationId equals parentConversationId", () => {
|
||||
const binding = {
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
conversationId: "!room:example",
|
||||
parentConversationId: "!room:example",
|
||||
boundAt: 0,
|
||||
lastActivityAt: 0,
|
||||
};
|
||||
listBindingsForAccountMock.mockReturnValue([binding]);
|
||||
|
||||
const result = handleMatrixSubagentDeliveryTarget({
|
||||
childSessionKey: "agent:ops:subagent:child",
|
||||
requesterOrigin: { channel: "matrix", accountId: "ops" },
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
|
||||
expect(result).toEqual({
|
||||
origin: {
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
to: "room:!room:example",
|
||||
},
|
||||
});
|
||||
expect(result?.origin).not.toHaveProperty("threadId");
|
||||
});
|
||||
|
||||
it("returns origin without threadId when binding has no parentConversationId", () => {
|
||||
const binding = {
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
conversationId: "!room:example",
|
||||
boundAt: 0,
|
||||
lastActivityAt: 0,
|
||||
};
|
||||
listBindingsForAccountMock.mockReturnValue([binding]);
|
||||
|
||||
const result = handleMatrixSubagentDeliveryTarget({
|
||||
childSessionKey: "agent:ops:subagent:child",
|
||||
requesterOrigin: { channel: "matrix", accountId: "ops" },
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
|
||||
expect(result).toEqual({
|
||||
origin: {
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
to: "room:!room:example",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to the single binding when requesterOrigin threadId does not match any binding", () => {
|
||||
const binding = {
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread123",
|
||||
parentConversationId: "!room:example",
|
||||
boundAt: 0,
|
||||
lastActivityAt: 0,
|
||||
};
|
||||
listBindingsForAccountMock.mockReturnValue([binding]);
|
||||
|
||||
const result = handleMatrixSubagentDeliveryTarget({
|
||||
childSessionKey: "agent:ops:subagent:child",
|
||||
requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$threadOTHER" },
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
|
||||
// No threadId match, but single binding → falls back to it
|
||||
expect(result).toEqual({
|
||||
origin: {
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
to: "room:!room:example",
|
||||
threadId: "$thread123",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("returns undefined when multiple bindings exist and threadId matches none", () => {
|
||||
const mkBinding = (threadId: string) => ({
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
conversationId: threadId,
|
||||
parentConversationId: "!room:example",
|
||||
boundAt: 0,
|
||||
lastActivityAt: 0,
|
||||
});
|
||||
listBindingsForAccountMock.mockReturnValue([mkBinding("$t1"), mkBinding("$t2")]);
|
||||
|
||||
const result = handleMatrixSubagentDeliveryTarget({
|
||||
childSessionKey: "agent:ops:subagent:child",
|
||||
requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$tNONE" },
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
|
||||
it("uses listAllBindings when requesterOrigin has no accountId", () => {
|
||||
const binding = {
|
||||
targetSessionKey: "agent:ops:subagent:child",
|
||||
targetKind: "subagent",
|
||||
accountId: "ops",
|
||||
conversationId: "$thread123",
|
||||
parentConversationId: "!room:example",
|
||||
boundAt: 0,
|
||||
lastActivityAt: 0,
|
||||
};
|
||||
listAllBindingsMock.mockReturnValue([binding]);
|
||||
|
||||
const result = handleMatrixSubagentDeliveryTarget({
|
||||
childSessionKey: "agent:ops:subagent:child",
|
||||
requesterOrigin: { channel: "matrix" },
|
||||
expectsCompletionMessage: true,
|
||||
});
|
||||
|
||||
expect(listAllBindingsMock).toHaveBeenCalled();
|
||||
expect(listBindingsForAccountMock).not.toHaveBeenCalled();
|
||||
expect(result).toBeDefined();
|
||||
});
|
||||
});
|
||||
256
extensions/matrix/src/matrix/subagent-hooks.ts
Normal file
256
extensions/matrix/src/matrix/subagent-hooks.ts
Normal file
@@ -0,0 +1,256 @@
|
||||
import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-binding-runtime";
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core";
|
||||
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { findMatrixAccountConfig, resolveMatrixBaseConfig } from "./account-config.js";
|
||||
import { resolveMatrixTargetIdentity } from "./target-ids.js";
|
||||
import {
|
||||
getMatrixThreadBindingManager,
|
||||
listAllBindings,
|
||||
listBindingsForAccount,
|
||||
removeBindingRecord,
|
||||
} from "./thread-bindings-shared.js";
|
||||
|
||||
type MatrixSubagentSpawningEvent = {
|
||||
threadRequested: boolean;
|
||||
requester?: {
|
||||
channel?: string;
|
||||
accountId?: string;
|
||||
to?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
childSessionKey: string;
|
||||
agentId: string;
|
||||
label?: string;
|
||||
};
|
||||
|
||||
type MatrixSubagentEndedEvent = {
|
||||
targetSessionKey: string;
|
||||
targetKind: string;
|
||||
accountId?: string;
|
||||
};
|
||||
|
||||
type MatrixSubagentDeliveryTargetEvent = {
|
||||
childSessionKey: string;
|
||||
requesterOrigin?: {
|
||||
channel?: string;
|
||||
accountId?: string;
|
||||
to?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
expectsCompletionMessage: boolean;
|
||||
};
|
||||
|
||||
type SpawningResult =
|
||||
| { status: "ok"; threadBindingReady?: boolean }
|
||||
| { status: "error"; error: string };
|
||||
|
||||
type DeliveryTargetResult = {
|
||||
origin: {
|
||||
channel: string;
|
||||
accountId: string;
|
||||
to: string;
|
||||
threadId?: string;
|
||||
};
|
||||
};
|
||||
|
||||
function summarizeError(err: unknown): string {
|
||||
if (err instanceof Error) {
|
||||
return err.message;
|
||||
}
|
||||
if (typeof err === "string") {
|
||||
return err;
|
||||
}
|
||||
return "error";
|
||||
}
|
||||
|
||||
function resolveThreadBindingFlags(
|
||||
api: OpenClawPluginApi,
|
||||
accountId?: string,
|
||||
): { enabled: boolean; spawnSubagentSessions: boolean } {
|
||||
const matrix = resolveMatrixBaseConfig(api.config);
|
||||
const baseThreadBindings = matrix.threadBindings;
|
||||
const accountThreadBindings = accountId
|
||||
? findMatrixAccountConfig(api.config, accountId)?.threadBindings
|
||||
: undefined;
|
||||
return {
|
||||
enabled:
|
||||
accountThreadBindings?.enabled ??
|
||||
baseThreadBindings?.enabled ??
|
||||
api.config.session?.threadBindings?.enabled ??
|
||||
true,
|
||||
spawnSubagentSessions:
|
||||
accountThreadBindings?.spawnSubagentSessions ??
|
||||
baseThreadBindings?.spawnSubagentSessions ??
|
||||
false,
|
||||
};
|
||||
}
|
||||
|
||||
export async function handleMatrixSubagentSpawning(
|
||||
api: OpenClawPluginApi,
|
||||
event: MatrixSubagentSpawningEvent,
|
||||
): Promise<SpawningResult | undefined> {
|
||||
if (!event.threadRequested) {
|
||||
return undefined;
|
||||
}
|
||||
const channel = event.requester?.channel?.trim().toLowerCase();
|
||||
if (channel !== "matrix") {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const accountId = normalizeOptionalString(event.requester?.accountId) || undefined;
|
||||
const flags = resolveThreadBindingFlags(api, accountId);
|
||||
|
||||
if (!flags.enabled) {
|
||||
return {
|
||||
status: "error",
|
||||
error:
|
||||
"Matrix thread bindings are disabled (set channels.matrix.threadBindings.enabled=true to override for this account, or session.threadBindings.enabled=true globally).",
|
||||
};
|
||||
}
|
||||
if (!flags.spawnSubagentSessions) {
|
||||
return {
|
||||
status: "error",
|
||||
error:
|
||||
"Matrix thread-bound subagent spawns are disabled for this account (set channels.matrix.threadBindings.spawnSubagentSessions=true to enable).",
|
||||
};
|
||||
}
|
||||
|
||||
// Resolve the raw Matrix room ID from the requester's `to` field
|
||||
// (e.g. "room:!abc123:example.org" → "!abc123:example.org").
|
||||
const rawTo = normalizeOptionalString(event.requester?.to) ?? "";
|
||||
const matrixTarget = rawTo ? resolveMatrixTargetIdentity(rawTo) : null;
|
||||
const roomId = matrixTarget?.kind === "room" ? matrixTarget.id : "";
|
||||
|
||||
if (!roomId) {
|
||||
return {
|
||||
status: "error",
|
||||
error:
|
||||
"Cannot create Matrix thread binding: no room target in spawn request (requester.to must be a Matrix room ID).",
|
||||
};
|
||||
}
|
||||
|
||||
const resolvedAccountId = accountId || "default";
|
||||
|
||||
// Verify the thread binding manager is running for this account. The manager
|
||||
// holds the captured Matrix client the SessionBindingAdapter needs to send
|
||||
// the intro message that bootstraps the thread.
|
||||
const manager = getMatrixThreadBindingManager(resolvedAccountId);
|
||||
if (!manager) {
|
||||
return {
|
||||
status: "error",
|
||||
error: `No Matrix thread binding manager available for account "${resolvedAccountId}". Is the Matrix channel running?`,
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
// placement="child" tells the Matrix SessionBindingAdapter to:
|
||||
// 1. Send an intro message to the room, creating a new thread root event
|
||||
// 2. Use the returned event ID as boundConversationId (the thread ID)
|
||||
// 3. Register the binding record in the in-memory store and persist it
|
||||
//
|
||||
// We do NOT call setBindingRecord here — the adapter's bind() handles
|
||||
// record creation, thread creation, and persistence atomically.
|
||||
await getSessionBindingService().bind({
|
||||
targetSessionKey: event.childSessionKey,
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
channel: "matrix",
|
||||
accountId: resolvedAccountId,
|
||||
conversationId: roomId,
|
||||
},
|
||||
placement: "child",
|
||||
metadata: {
|
||||
agentId: event.agentId?.trim() || undefined,
|
||||
label: normalizeOptionalString(event.label) || undefined,
|
||||
boundBy: "system",
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
return {
|
||||
status: "error",
|
||||
error: `Matrix thread bind failed: ${summarizeError(err)}`,
|
||||
};
|
||||
}
|
||||
|
||||
return { status: "ok", threadBindingReady: true };
|
||||
}
|
||||
|
||||
export async function handleMatrixSubagentEnded(event: MatrixSubagentEndedEvent): Promise<void> {
|
||||
const accountId = normalizeOptionalString(event.accountId) || undefined;
|
||||
// Use the targeted account list when available; fall back to a full scan
|
||||
// so bindings are cleaned up even when accountId is absent.
|
||||
const candidates = accountId ? listBindingsForAccount(accountId) : listAllBindings();
|
||||
const matching = candidates.filter(
|
||||
(entry) => entry.targetSessionKey === event.targetSessionKey && entry.targetKind === "subagent",
|
||||
);
|
||||
const affectedAccountIds = new Set<string>();
|
||||
for (const binding of matching) {
|
||||
if (removeBindingRecord(binding)) {
|
||||
affectedAccountIds.add(binding.accountId);
|
||||
}
|
||||
}
|
||||
// Flush each affected account's manager so removals are persisted to disk.
|
||||
for (const acctId of affectedAccountIds) {
|
||||
const manager = getMatrixThreadBindingManager(acctId);
|
||||
await manager?.persist();
|
||||
}
|
||||
}
|
||||
|
||||
export function handleMatrixSubagentDeliveryTarget(
|
||||
event: MatrixSubagentDeliveryTargetEvent,
|
||||
): DeliveryTargetResult | undefined {
|
||||
if (!event.expectsCompletionMessage) {
|
||||
return undefined;
|
||||
}
|
||||
const requesterChannel = event.requesterOrigin?.channel?.trim().toLowerCase();
|
||||
if (requesterChannel !== "matrix") {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const requesterAccountId = normalizeOptionalString(event.requesterOrigin?.accountId);
|
||||
const requesterThreadId =
|
||||
event.requesterOrigin?.threadId != null && event.requesterOrigin.threadId !== ""
|
||||
? String(event.requesterOrigin.threadId).trim()
|
||||
: "";
|
||||
|
||||
// Search the targeted account when available; otherwise scan all accounts.
|
||||
const candidates = requesterAccountId
|
||||
? listBindingsForAccount(requesterAccountId)
|
||||
: listAllBindings();
|
||||
const bindings = candidates.filter(
|
||||
(entry) => entry.targetSessionKey === event.childSessionKey && entry.targetKind === "subagent",
|
||||
);
|
||||
if (bindings.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
let binding: (typeof bindings)[number] | undefined;
|
||||
if (requesterThreadId) {
|
||||
binding = bindings.find(
|
||||
(entry) =>
|
||||
entry.conversationId === requesterThreadId &&
|
||||
(!requesterAccountId || entry.accountId === requesterAccountId),
|
||||
);
|
||||
}
|
||||
if (!binding && bindings.length === 1) {
|
||||
binding = bindings[0];
|
||||
}
|
||||
if (!binding) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const roomId = binding.parentConversationId ?? binding.conversationId;
|
||||
const threadId =
|
||||
binding.parentConversationId && binding.parentConversationId !== binding.conversationId
|
||||
? binding.conversationId
|
||||
: undefined;
|
||||
|
||||
return {
|
||||
origin: {
|
||||
channel: "matrix",
|
||||
accountId: binding.accountId,
|
||||
to: `room:${roomId}`,
|
||||
...(threadId ? { threadId } : {}),
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -40,6 +40,7 @@ export type MatrixThreadBindingManager = {
|
||||
targetSessionKey: string;
|
||||
maxAgeMs: number;
|
||||
}) => MatrixThreadBindingRecord[];
|
||||
persist: () => Promise<void>;
|
||||
stop: () => void;
|
||||
};
|
||||
|
||||
@@ -135,6 +136,10 @@ export function listBindingsForAccount(accountId: string): MatrixThreadBindingRe
|
||||
);
|
||||
}
|
||||
|
||||
export function listAllBindings(): MatrixThreadBindingRecord[] {
|
||||
return [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()];
|
||||
}
|
||||
|
||||
export function getMatrixThreadBindingManagerEntry(
|
||||
accountId: string,
|
||||
): MatrixThreadBindingManagerCacheEntry | null {
|
||||
|
||||
@@ -292,6 +292,7 @@ export async function createMatrixThreadBindingManager(params: {
|
||||
accountId: params.accountId,
|
||||
getIdleTimeoutMs: () => defaults.idleTimeoutMs,
|
||||
getMaxAgeMs: () => defaults.maxAgeMs,
|
||||
persist,
|
||||
getByConversation: ({ conversationId, parentConversationId }) =>
|
||||
listBindingsForAccount(params.accountId).find((entry) => {
|
||||
if (entry.conversationId !== conversationId.trim()) {
|
||||
|
||||
Reference in New Issue
Block a user