matrix: fix sessions_spawn --thread subagent session spawning (#67643)

Merged via squash.

Prepared head SHA: 1e5127e217
Co-authored-by: eejohnso-ops <238848106+eejohnso-ops@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
EE
2026-04-17 01:17:56 -05:00
committed by GitHub
parent 857b9cd326
commit 1ce2596195
18 changed files with 1521 additions and 55 deletions

View File

@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
- Memory/sqlite-vec: emit the degraded sqlite-vec warning once per degraded episode instead of repeating it for every file write, while preserving the latch across safe-reindex rollback and resetting it when vector state is genuinely rebuilt. (#67898) Thanks @rubencu.
- Reply/block streaming: preserve post-stream incomplete-turn error payloads after block streaming already emitted content, so users get the warning instead of silence. (#67991) Thanks @obviyus.
- Telegram/streaming: clear the compaction replay guard after visible non-final boundaries so a post-tool assistant reply rotates to a fresh preview instead of editing the pre-compaction message. (#67993) Thanks @obviyus.
- Matrix: fix `sessions_spawn --thread` subagent session spawning — thread binding creation, cleanup on session end, and completion-message delivery target resolution now work end-to-end. (#67643) Thanks @eejohnso-ops and @gumadeiras.
## 2026.4.15

View File

@@ -7,12 +7,23 @@ const cliMocks = vi.hoisted(() => ({
registerMatrixCli: vi.fn(),
}));
const runtimeMocks = vi.hoisted(() => ({
ensureMatrixCryptoRuntime: vi.fn(async () => {}),
handleVerificationBootstrap: vi.fn(async () => {}),
handleVerificationStatus: vi.fn(async () => {}),
handleVerifyRecoveryKey: vi.fn(async () => {}),
setMatrixRuntime: vi.fn(),
}));
vi.mock("./src/cli.js", () => {
return {
registerMatrixCli: cliMocks.registerMatrixCli,
};
});
vi.mock("./plugin-entry.handlers.runtime.js", () => runtimeMocks);
vi.mock("./runtime-api.js", () => ({ setMatrixRuntime: runtimeMocks.setMatrixRuntime }));
describe("matrix plugin", () => {
it("registers matrix CLI through a descriptor-backed lazy registrar", async () => {
const registerCli = vi.fn();
@@ -56,4 +67,35 @@ describe("matrix plugin", () => {
expect(entry.id).toBe("matrix");
expect(entry.name).toBe("Matrix");
});
it("registers subagent lifecycle hooks during full registration", () => {
const on = vi.fn();
const registerChannel = vi.fn();
const registerGatewayMethod = vi.fn();
const api = createTestPluginApi({
id: "matrix",
name: "Matrix",
source: "test",
config: {},
runtime: {} as never,
registrationMode: "full",
on,
registerChannel,
registerGatewayMethod,
});
entry.register(api);
expect(registerChannel).toHaveBeenCalledWith({
plugin: expect.objectContaining({ id: "matrix" }),
});
expect(on.mock.calls.map(([hookName]) => hookName)).toEqual([
"subagent_spawning",
"subagent_ended",
"subagent_delivery_target",
]);
for (const [, handler] of on.mock.calls) {
expect(handler).toEqual(expect.any(Function));
}
});
});

View File

@@ -2,6 +2,15 @@ import { defineBundledChannelEntry } from "openclaw/plugin-sdk/channel-entry-con
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { registerMatrixCliMetadata } from "./cli-metadata.js";
type MatrixSubagentHooksModule = typeof import("./src/matrix/subagent-hooks.js");
let matrixSubagentHooksPromise: Promise<MatrixSubagentHooksModule> | null = null;
function loadMatrixSubagentHooksModule() {
matrixSubagentHooksPromise ??= import("./src/matrix/subagent-hooks.js");
return matrixSubagentHooksPromise;
}
export default defineBundledChannelEntry({
id: "matrix",
name: "Matrix",
@@ -47,5 +56,18 @@ export default defineBundledChannelEntry({
const { handleVerificationStatus } = await import("./plugin-entry.handlers.runtime.js");
await handleVerificationStatus(ctx);
});
api.on("subagent_spawning", async (event) => {
const { handleMatrixSubagentSpawning } = await loadMatrixSubagentHooksModule();
return await handleMatrixSubagentSpawning(api, event);
});
api.on("subagent_ended", async (event) => {
const { handleMatrixSubagentEnded } = await loadMatrixSubagentHooksModule();
await handleMatrixSubagentEnded(event);
});
api.on("subagent_delivery_target", async (event) => {
const { handleMatrixSubagentDeliveryTarget } = await loadMatrixSubagentHooksModule();
return handleMatrixSubagentDeliveryTarget(event);
});
},
});

View File

@@ -0,0 +1,734 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
// Hoisted stubs referenced in vi.mock factories below
const bindMock = vi.hoisted(() => vi.fn());
const unbindMock = vi.hoisted(() => vi.fn());
const getManagerMock = vi.hoisted(() => vi.fn());
const listAllBindingsMock = vi.hoisted(() => vi.fn((): any[] => []));
const listBindingsForAccountMock = vi.hoisted(() => vi.fn((): any[] => []));
const removeBindingRecordMock = vi.hoisted(() => vi.fn(() => false));
const resolveMatrixBaseConfigMock = vi.hoisted(() => vi.fn((): any => ({})));
const findMatrixAccountConfigMock = vi.hoisted(() => vi.fn((): any => undefined));
vi.mock("openclaw/plugin-sdk/conversation-binding-runtime", () => ({
getSessionBindingService: () => ({ bind: bindMock, unbind: unbindMock }),
}));
vi.mock("./account-config.js", () => ({
resolveMatrixBaseConfig: resolveMatrixBaseConfigMock,
findMatrixAccountConfig: findMatrixAccountConfigMock,
}));
vi.mock("./thread-bindings-shared.js", () => ({
getMatrixThreadBindingManager: getManagerMock,
listAllBindings: listAllBindingsMock,
listBindingsForAccount: listBindingsForAccountMock,
removeBindingRecord: removeBindingRecordMock,
resolveBindingKey: (params: {
accountId: string;
conversationId: string;
parentConversationId?: string;
}) =>
`${params.accountId}:${params.parentConversationId?.trim() || "-"}:${params.conversationId}`,
}));
import {
handleMatrixSubagentDeliveryTarget,
handleMatrixSubagentEnded,
handleMatrixSubagentSpawning,
} from "./subagent-hooks.js";
// A minimal fake api — only config is used by these hooks
const fakeApi = { config: {} } as never;
function makeSpawnEvent(
overrides: Partial<{
threadRequested: boolean;
channel: string;
accountId: string;
to: string;
childSessionKey: string;
agentId: string;
label: string;
}> = {},
) {
return {
threadRequested: overrides.threadRequested ?? true,
requester: {
channel: overrides.channel ?? "matrix",
accountId: overrides.accountId ?? "default",
to: overrides.to ?? "room:!room123:example.org",
},
childSessionKey: overrides.childSessionKey ?? "agent:default:subagent:child",
agentId: overrides.agentId ?? "worker",
label: overrides.label,
};
}
describe("handleMatrixSubagentSpawning", () => {
beforeEach(() => {
bindMock.mockReset();
getManagerMock.mockReset();
resolveMatrixBaseConfigMock.mockReset();
findMatrixAccountConfigMock.mockReset();
// Default: bindings enabled, spawn enabled
resolveMatrixBaseConfigMock.mockReturnValue({
threadBindings: { enabled: true, spawnSubagentSessions: true },
});
findMatrixAccountConfigMock.mockReturnValue(undefined);
// Default: manager exists
getManagerMock.mockReturnValue({ persist: vi.fn() });
// Default: bind resolves ok
bindMock.mockResolvedValue({
conversation: {
accountId: "default",
conversationId: "$thread-root",
parentConversationId: "!room123:example.org",
},
});
});
it("returns undefined when threadRequested is false", async () => {
const result = await handleMatrixSubagentSpawning(
fakeApi,
makeSpawnEvent({ threadRequested: false }),
);
expect(result).toBeUndefined();
expect(bindMock).not.toHaveBeenCalled();
});
it("returns undefined when channel is not matrix", async () => {
const result = await handleMatrixSubagentSpawning(
fakeApi,
makeSpawnEvent({ channel: "slack" }),
);
expect(result).toBeUndefined();
expect(bindMock).not.toHaveBeenCalled();
});
it("proceeds past channel check when channel is 'matrix' with mixed casing", async () => {
// channel.trim().toLowerCase() must equal "matrix" — mixed case is accepted
const result = await handleMatrixSubagentSpawning(
fakeApi,
makeSpawnEvent({ channel: " Matrix " }),
);
expect(result).not.toBeUndefined();
});
it("returns error when thread bindings are disabled", async () => {
resolveMatrixBaseConfigMock.mockReturnValue({
threadBindings: { enabled: false, spawnSubagentSessions: true },
});
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent());
expect(result).toEqual(
expect.objectContaining({
status: "error",
error: expect.stringContaining("thread bindings are disabled"),
}),
);
});
it("returns error when spawnSubagentSessions is false", async () => {
resolveMatrixBaseConfigMock.mockReturnValue({
threadBindings: { enabled: true, spawnSubagentSessions: false },
});
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent());
expect(result).toEqual(
expect.objectContaining({
status: "error",
error: expect.stringContaining("spawnSubagentSessions"),
}),
);
});
it("returns error when spawnSubagentSessions defaults to false (no config)", async () => {
resolveMatrixBaseConfigMock.mockReturnValue({});
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent());
expect(result).toEqual(
expect.objectContaining({
status: "error",
error: expect.stringContaining("spawnSubagentSessions"),
}),
);
});
it("returns error when requester.to has no room target", async () => {
const result = await handleMatrixSubagentSpawning(
fakeApi,
makeSpawnEvent({ to: "@user:example.org" }),
);
expect(result).toEqual(
expect.objectContaining({
status: "error",
error: expect.stringContaining("no room target"),
}),
);
});
it("returns error when requester.to is empty", async () => {
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent({ to: "" }));
expect(result).toEqual(
expect.objectContaining({
status: "error",
error: expect.stringContaining("no room target"),
}),
);
});
it("returns error when no binding manager is available for the account", async () => {
getManagerMock.mockReturnValue(null);
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent());
expect(result).toEqual(
expect.objectContaining({
status: "error",
error: expect.stringContaining("No Matrix thread binding manager"),
}),
);
});
it("calls bind with the resolved room id and returns ok", async () => {
bindMock.mockResolvedValue({
conversation: {
accountId: "ops",
conversationId: "$thread-ops",
parentConversationId: "!roomAbc:technerik.com",
},
});
const result = await handleMatrixSubagentSpawning(
fakeApi,
makeSpawnEvent({
accountId: "ops",
to: "room:!roomAbc:technerik.com",
childSessionKey: "agent:ops:subagent:worker",
agentId: "builder",
label: "Build Agent",
}),
);
expect(bindMock).toHaveBeenCalledWith(
expect.objectContaining({
targetSessionKey: "agent:ops:subagent:worker",
targetKind: "subagent",
conversation: expect.objectContaining({
channel: "matrix",
accountId: "ops",
conversationId: "!roomAbc:technerik.com",
}),
placement: "child",
metadata: expect.objectContaining({
agentId: "builder",
label: "Build Agent",
}),
}),
);
expect(result).toMatchObject({
status: "ok",
threadBindingReady: true,
deliveryOrigin: {
channel: "matrix",
accountId: "ops",
to: "room:!roomAbc:technerik.com",
threadId: "$thread-ops",
},
});
});
it("uses 'default' as accountId when requester.accountId is absent", async () => {
bindMock.mockResolvedValue({
conversation: {
accountId: "default",
conversationId: "$thread-default",
parentConversationId: "!room123:example.org",
},
});
await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent({ accountId: undefined as never }));
expect(getManagerMock).toHaveBeenCalledWith("default");
expect(bindMock).toHaveBeenCalledWith(
expect.objectContaining({
conversation: expect.objectContaining({ accountId: "default" }),
}),
);
});
it("returns error when bind() throws", async () => {
bindMock.mockRejectedValue(new Error("provider auth failed"));
const result = await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent());
expect(result).toEqual(
expect.objectContaining({
status: "error",
error: expect.stringContaining("provider auth failed"),
}),
);
});
it("respects per-account threadBindings override over base config", async () => {
// Base says spawnSubagentSessions=false; account override says true
resolveMatrixBaseConfigMock.mockReturnValue({
threadBindings: { enabled: true, spawnSubagentSessions: false },
});
findMatrixAccountConfigMock.mockReturnValue({
threadBindings: { spawnSubagentSessions: true },
});
bindMock.mockResolvedValue({ conversation: {} });
const result = await handleMatrixSubagentSpawning(
fakeApi,
makeSpawnEvent({ accountId: "forge" }),
);
expect(result).toMatchObject({ status: "ok", threadBindingReady: true });
});
});
describe("handleMatrixSubagentEnded", () => {
const mockManager = { persist: vi.fn() };
beforeEach(() => {
getManagerMock.mockReset();
listAllBindingsMock.mockReset();
listBindingsForAccountMock.mockReset();
removeBindingRecordMock.mockReset();
unbindMock.mockReset();
mockManager.persist.mockReset();
});
it("does nothing when no matching bindings exist", async () => {
listBindingsForAccountMock.mockReturnValue([]);
await handleMatrixSubagentEnded({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
});
expect(getManagerMock).not.toHaveBeenCalled();
});
it("removes matching bindings and calls persist on the manager", async () => {
const binding = {
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
};
listBindingsForAccountMock.mockReturnValue([binding]);
removeBindingRecordMock.mockReturnValue(true);
getManagerMock.mockReturnValue(mockManager);
mockManager.persist.mockResolvedValue(undefined);
await handleMatrixSubagentEnded({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
});
expect(removeBindingRecordMock).toHaveBeenCalledWith(binding);
expect(getManagerMock).toHaveBeenCalledWith("ops");
expect(mockManager.persist).toHaveBeenCalled();
});
it("sends farewell through the binding service when requested", async () => {
const binding = {
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
};
listBindingsForAccountMock.mockReturnValue([binding]);
unbindMock.mockResolvedValue([
{
bindingId: "ops:!room:example:$thread",
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
},
status: "active",
boundAt: 0,
},
]);
await handleMatrixSubagentEnded({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
reason: "spawn-failed",
sendFarewell: true,
});
expect(unbindMock).toHaveBeenCalledWith({
bindingId: "ops:!room:example:$thread",
reason: "spawn-failed",
});
expect(removeBindingRecordMock).not.toHaveBeenCalled();
expect(getManagerMock).not.toHaveBeenCalled();
});
it("skips persist when removeBindingRecord returns false (binding not found in store)", async () => {
const binding = {
targetSessionKey: "agent:ops:subagent:orphan",
targetKind: "subagent",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
};
listBindingsForAccountMock.mockReturnValue([binding]);
removeBindingRecordMock.mockReturnValue(false);
await handleMatrixSubagentEnded({
targetSessionKey: "agent:ops:subagent:orphan",
targetKind: "subagent",
accountId: "ops",
});
expect(getManagerMock).not.toHaveBeenCalled();
});
it("falls back to listAllBindings when accountId is absent", async () => {
const binding = {
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
};
listAllBindingsMock.mockReturnValue([binding]);
removeBindingRecordMock.mockReturnValue(true);
getManagerMock.mockReturnValue(mockManager);
mockManager.persist.mockResolvedValue(undefined);
await handleMatrixSubagentEnded({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
});
expect(listAllBindingsMock).toHaveBeenCalled();
expect(listBindingsForAccountMock).not.toHaveBeenCalled();
expect(mockManager.persist).toHaveBeenCalled();
});
it("does not double-persist when multiple bindings share the same account", async () => {
const mkBinding = (conversationId: string) => ({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
conversationId,
parentConversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
});
listBindingsForAccountMock.mockReturnValue([mkBinding("$t1"), mkBinding("$t2")]);
removeBindingRecordMock.mockReturnValue(true);
getManagerMock.mockReturnValue(mockManager);
mockManager.persist.mockResolvedValue(undefined);
await handleMatrixSubagentEnded({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
});
// persist must be called exactly once per unique accountId, not once per binding
expect(mockManager.persist).toHaveBeenCalledTimes(1);
});
});
describe("handleMatrixSubagentDeliveryTarget", () => {
beforeEach(() => {
listAllBindingsMock.mockReset();
listBindingsForAccountMock.mockReset();
});
it("returns undefined when expectsCompletionMessage is false", () => {
const result = handleMatrixSubagentDeliveryTarget({
childSessionKey: "agent:ops:subagent:child",
requesterOrigin: { channel: "matrix", accountId: "ops" },
expectsCompletionMessage: false,
});
expect(result).toBeUndefined();
});
it("returns undefined when requester channel is not matrix", () => {
listBindingsForAccountMock.mockReturnValue([]);
const result = handleMatrixSubagentDeliveryTarget({
childSessionKey: "agent:ops:subagent:child",
requesterOrigin: { channel: "slack", accountId: "ops" },
expectsCompletionMessage: true,
});
expect(result).toBeUndefined();
});
it("returns undefined when no bindings match the child session key", () => {
listBindingsForAccountMock.mockReturnValue([
{
targetSessionKey: "agent:ops:subagent:OTHER",
targetKind: "subagent",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
},
]);
const result = handleMatrixSubagentDeliveryTarget({
childSessionKey: "agent:ops:subagent:child",
requesterOrigin: { channel: "matrix", accountId: "ops" },
expectsCompletionMessage: true,
});
expect(result).toBeUndefined();
});
it("returns origin with threadId when binding has a distinct parentConversationId", () => {
const binding = {
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
conversationId: "$thread123",
parentConversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
};
listBindingsForAccountMock.mockReturnValue([binding]);
const result = handleMatrixSubagentDeliveryTarget({
childSessionKey: "agent:ops:subagent:child",
requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$thread123" },
expectsCompletionMessage: true,
});
expect(result).toEqual({
origin: {
channel: "matrix",
accountId: "ops",
to: "room:!room:example",
threadId: "$thread123",
},
});
});
it("returns origin without threadId when conversationId equals parentConversationId", () => {
const binding = {
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
conversationId: "!room:example",
parentConversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
};
listBindingsForAccountMock.mockReturnValue([binding]);
const result = handleMatrixSubagentDeliveryTarget({
childSessionKey: "agent:ops:subagent:child",
requesterOrigin: { channel: "matrix", accountId: "ops" },
expectsCompletionMessage: true,
});
expect(result).toEqual({
origin: {
channel: "matrix",
accountId: "ops",
to: "room:!room:example",
},
});
expect(result?.origin).not.toHaveProperty("threadId");
});
it("returns origin without threadId when binding has no parentConversationId", () => {
const binding = {
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
conversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
};
listBindingsForAccountMock.mockReturnValue([binding]);
const result = handleMatrixSubagentDeliveryTarget({
childSessionKey: "agent:ops:subagent:child",
requesterOrigin: { channel: "matrix", accountId: "ops" },
expectsCompletionMessage: true,
});
expect(result).toEqual({
origin: {
channel: "matrix",
accountId: "ops",
to: "room:!room:example",
},
});
});
it("falls back to the single binding when requesterOrigin threadId does not match any binding", () => {
const binding = {
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
conversationId: "$thread123",
parentConversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
};
listBindingsForAccountMock.mockReturnValue([binding]);
const result = handleMatrixSubagentDeliveryTarget({
childSessionKey: "agent:ops:subagent:child",
requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$threadOTHER" },
expectsCompletionMessage: true,
});
// No threadId match, but single binding → falls back to it
expect(result).toEqual({
origin: {
channel: "matrix",
accountId: "ops",
to: "room:!room:example",
threadId: "$thread123",
},
});
});
it("returns undefined when multiple bindings exist and threadId matches none", () => {
const mkBinding = (threadId: string) => ({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
conversationId: threadId,
parentConversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
});
listBindingsForAccountMock.mockReturnValue([mkBinding("$t1"), mkBinding("$t2")]);
const result = handleMatrixSubagentDeliveryTarget({
childSessionKey: "agent:ops:subagent:child",
requesterOrigin: { channel: "matrix", accountId: "ops", threadId: "$tNONE" },
expectsCompletionMessage: true,
});
expect(result).toBeUndefined();
});
it("uses listAllBindings when requesterOrigin has no accountId", () => {
const binding = {
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
accountId: "ops",
conversationId: "$thread123",
parentConversationId: "!room:example",
boundAt: 0,
lastActivityAt: 0,
};
listAllBindingsMock.mockReturnValue([binding]);
const result = handleMatrixSubagentDeliveryTarget({
childSessionKey: "agent:ops:subagent:child",
requesterOrigin: { channel: "matrix" },
expectsCompletionMessage: true,
});
expect(listAllBindingsMock).toHaveBeenCalled();
expect(listBindingsForAccountMock).not.toHaveBeenCalled();
expect(result).toBeDefined();
});
});
describe("concurrent spawns across accounts", () => {
beforeEach(() => {
bindMock.mockReset();
getManagerMock.mockReset();
resolveMatrixBaseConfigMock.mockReset();
findMatrixAccountConfigMock.mockReset();
resolveMatrixBaseConfigMock.mockReturnValue({
threadBindings: { enabled: true, spawnSubagentSessions: true },
});
findMatrixAccountConfigMock.mockReturnValue(undefined);
getManagerMock.mockReturnValue({ persist: vi.fn() });
});
it("resolves both spawns independently when two accounts fire simultaneously", async () => {
// Each account gets its own bind call that resolves with a distinct conversation
bindMock
.mockResolvedValueOnce({ conversation: { accountId: "ops", conversationId: "$t-ops" } })
.mockResolvedValueOnce({ conversation: { accountId: "forge", conversationId: "$t-forge" } });
const [opsResult, forgeResult] = await Promise.all([
handleMatrixSubagentSpawning(fakeApi, {
threadRequested: true,
requester: { channel: "matrix", accountId: "ops", to: "room:!room-ops:example.org" },
childSessionKey: "agent:ops:subagent:child-ops",
agentId: "worker-ops",
}),
handleMatrixSubagentSpawning(fakeApi, {
threadRequested: true,
requester: { channel: "matrix", accountId: "forge", to: "room:!room-forge:example.org" },
childSessionKey: "agent:forge:subagent:child-forge",
agentId: "worker-forge",
}),
]);
expect(opsResult).toMatchObject({ status: "ok", threadBindingReady: true });
expect(forgeResult).toMatchObject({ status: "ok", threadBindingReady: true });
expect(bindMock).toHaveBeenCalledTimes(2);
// Each bind call targeted the correct account's room
expect(bindMock).toHaveBeenCalledWith(
expect.objectContaining({
targetSessionKey: "agent:ops:subagent:child-ops",
conversation: expect.objectContaining({
accountId: "ops",
conversationId: "!room-ops:example.org",
}),
}),
);
expect(bindMock).toHaveBeenCalledWith(
expect.objectContaining({
targetSessionKey: "agent:forge:subagent:child-forge",
conversation: expect.objectContaining({
accountId: "forge",
conversationId: "!room-forge:example.org",
}),
}),
);
});
it("one account bind failure does not affect the other account's spawn", async () => {
bindMock
.mockRejectedValueOnce(new Error("ops provider auth failed"))
.mockResolvedValueOnce({ conversation: { accountId: "forge", conversationId: "$t-forge" } });
const [opsResult, forgeResult] = await Promise.all([
handleMatrixSubagentSpawning(fakeApi, {
threadRequested: true,
requester: { channel: "matrix", accountId: "ops", to: "room:!room-ops:example.org" },
childSessionKey: "agent:ops:subagent:child-ops",
agentId: "worker-ops",
}),
handleMatrixSubagentSpawning(fakeApi, {
threadRequested: true,
requester: { channel: "matrix", accountId: "forge", to: "room:!room-forge:example.org" },
childSessionKey: "agent:forge:subagent:child-forge",
agentId: "worker-forge",
}),
]);
expect(opsResult).toEqual(
expect.objectContaining({
status: "error",
error: expect.stringContaining("ops provider auth failed"),
}),
);
expect(forgeResult).toMatchObject({ status: "ok", threadBindingReady: true });
});
});

View File

@@ -0,0 +1,311 @@
import { DEFAULT_ACCOUNT_ID } from "openclaw/plugin-sdk/account-id";
import {
getSessionBindingService,
type SessionBindingRecord,
} from "openclaw/plugin-sdk/conversation-binding-runtime";
import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import { findMatrixAccountConfig, resolveMatrixBaseConfig } from "./account-config.js";
import { resolveMatrixTargetIdentity } from "./target-ids.js";
import {
getMatrixThreadBindingManager,
listAllBindings,
listBindingsForAccount,
removeBindingRecord,
resolveBindingKey,
} from "./thread-bindings-shared.js";
type MatrixSubagentSpawningEvent = {
threadRequested: boolean;
requester?: {
channel?: string;
accountId?: string;
to?: string;
threadId?: string | number;
};
childSessionKey: string;
agentId: string;
label?: string;
};
type MatrixSubagentEndedEvent = {
targetSessionKey: string;
targetKind: string;
accountId?: string;
reason?: string;
sendFarewell?: boolean;
};
type MatrixSubagentDeliveryTargetEvent = {
childSessionKey: string;
requesterOrigin?: {
channel?: string;
accountId?: string;
to?: string;
threadId?: string | number;
};
expectsCompletionMessage: boolean;
};
type MatrixDeliveryOrigin = {
channel: "matrix";
accountId: string;
to: string;
threadId?: string;
};
type SpawningResult =
| {
status: "ok";
threadBindingReady?: boolean;
deliveryOrigin?: MatrixDeliveryOrigin;
}
| { status: "error"; error: string };
type DeliveryTargetResult = {
origin: MatrixDeliveryOrigin;
};
function summarizeError(err: unknown): string {
if (err instanceof Error) {
return err.message;
}
if (typeof err === "string") {
return err;
}
return "error";
}
function resolveThreadBindingFlags(
api: OpenClawPluginApi,
accountId?: string,
): { enabled: boolean; spawnSubagentSessions: boolean } {
const matrix = resolveMatrixBaseConfig(api.config);
const baseThreadBindings = matrix.threadBindings;
const accountThreadBindings = accountId
? findMatrixAccountConfig(api.config, accountId)?.threadBindings
: undefined;
return {
enabled:
accountThreadBindings?.enabled ??
baseThreadBindings?.enabled ??
api.config.session?.threadBindings?.enabled ??
true,
spawnSubagentSessions:
accountThreadBindings?.spawnSubagentSessions ??
baseThreadBindings?.spawnSubagentSessions ??
false,
};
}
function resolveMatrixBindingThreadId(binding: SessionBindingRecord): string | undefined {
const { conversationId, parentConversationId } = binding.conversation;
return parentConversationId && parentConversationId !== conversationId
? conversationId
: undefined;
}
function resolveMatrixBindingDeliveryOrigin(
binding: SessionBindingRecord,
fallbackAccountId: string,
): MatrixDeliveryOrigin {
const boundRoomId =
binding.conversation.parentConversationId ?? binding.conversation.conversationId;
const threadId = resolveMatrixBindingThreadId(binding);
return {
channel: "matrix",
accountId: binding.conversation.accountId ?? fallbackAccountId,
to: `room:${boundRoomId}`,
...(threadId ? { threadId } : {}),
};
}
export async function handleMatrixSubagentSpawning(
api: OpenClawPluginApi,
event: MatrixSubagentSpawningEvent,
): Promise<SpawningResult | undefined> {
if (!event.threadRequested) {
return undefined;
}
const channel = event.requester?.channel?.trim().toLowerCase();
if (channel !== "matrix") {
return undefined;
}
// Normalize early so per-account config and manager lookup use the same id.
// Falls back to DEFAULT_ACCOUNT_ID so accounts.default.threadBindings.* is
// respected even when the requester omits accountId.
const accountId = normalizeOptionalString(event.requester?.accountId) || DEFAULT_ACCOUNT_ID;
const flags = resolveThreadBindingFlags(api, accountId);
if (!flags.enabled) {
return {
status: "error",
error:
"Matrix thread bindings are disabled (set channels.matrix.threadBindings.enabled=true to override for this account, or session.threadBindings.enabled=true globally).",
} satisfies SpawningResult;
}
if (!flags.spawnSubagentSessions) {
return {
status: "error",
error:
"Matrix thread-bound subagent spawns are disabled for this account (set channels.matrix.threadBindings.spawnSubagentSessions=true to enable).",
};
}
// Resolve the raw Matrix room ID from the requester's `to` field
// (e.g. "room:!abc123:example.org" → "!abc123:example.org").
const rawTo = normalizeOptionalString(event.requester?.to) ?? "";
const matrixTarget = rawTo ? resolveMatrixTargetIdentity(rawTo) : null;
const roomId = matrixTarget?.kind === "room" ? matrixTarget.id : "";
if (!roomId) {
return {
status: "error",
error:
"Cannot create Matrix thread binding: no room target in spawn request (requester.to must be a Matrix room ID).",
};
}
// Verify the thread binding manager is running for this account. The manager
// holds the captured Matrix client the SessionBindingAdapter needs to send
// the intro message that bootstraps the thread.
const manager = getMatrixThreadBindingManager(accountId);
if (!manager) {
return {
status: "error",
error: `No Matrix thread binding manager available for account "${accountId}". Is the Matrix channel running?`,
};
}
try {
// placement="child" tells the Matrix SessionBindingAdapter to:
// 1. Send an intro message to the room, creating a new thread root event
// 2. Use the returned event ID as boundConversationId (the thread ID)
// 3. Register the binding record in the in-memory store and persist it
//
// We do NOT call setBindingRecord here — the adapter's bind() handles
// record creation, thread creation, and persistence atomically.
const binding = await getSessionBindingService().bind({
targetSessionKey: event.childSessionKey,
targetKind: "subagent",
conversation: {
channel: "matrix",
accountId,
conversationId: roomId,
},
placement: "child",
metadata: {
agentId: event.agentId?.trim() || undefined,
label: normalizeOptionalString(event.label) || undefined,
boundBy: "system",
},
});
return {
status: "ok",
threadBindingReady: true,
deliveryOrigin: resolveMatrixBindingDeliveryOrigin(binding, accountId),
} satisfies SpawningResult;
} catch (err) {
return {
status: "error",
error: `Matrix thread bind failed: ${summarizeError(err)}`,
};
}
}
export async function handleMatrixSubagentEnded(event: MatrixSubagentEndedEvent): Promise<void> {
const accountId = normalizeOptionalString(event.accountId) || undefined;
// Use the targeted account list when available; fall back to a full scan
// so bindings are cleaned up even when accountId is absent.
const candidates = accountId ? listBindingsForAccount(accountId) : listAllBindings();
const matching = candidates.filter(
(entry) => entry.targetSessionKey === event.targetSessionKey && entry.targetKind === "subagent",
);
const removedBindingKeys = new Set<string>();
if (event.sendFarewell) {
const bindingService = getSessionBindingService();
const reason = normalizeOptionalString(event.reason) || "subagent-ended";
for (const binding of matching) {
const bindingId = resolveBindingKey(binding);
const removed = await bindingService.unbind({ bindingId, reason });
if (removed.some((entry) => entry.bindingId === bindingId)) {
removedBindingKeys.add(bindingId);
}
}
}
const affectedAccountIds = new Set<string>();
for (const binding of matching) {
if (removedBindingKeys.has(resolveBindingKey(binding))) {
continue;
}
if (removeBindingRecord(binding)) {
affectedAccountIds.add(binding.accountId);
}
}
// Flush each affected account's manager so removals are persisted to disk.
for (const acctId of affectedAccountIds) {
const manager = getMatrixThreadBindingManager(acctId);
await manager?.persist();
}
}
export function handleMatrixSubagentDeliveryTarget(
event: MatrixSubagentDeliveryTargetEvent,
): DeliveryTargetResult | undefined {
if (!event.expectsCompletionMessage) {
return undefined;
}
const requesterChannel = event.requesterOrigin?.channel?.trim().toLowerCase();
if (requesterChannel !== "matrix") {
return undefined;
}
const requesterAccountId = normalizeOptionalString(event.requesterOrigin?.accountId);
const requesterThreadId =
event.requesterOrigin?.threadId != null && event.requesterOrigin.threadId !== ""
? String(event.requesterOrigin.threadId).trim()
: "";
// Search the targeted account when available; otherwise scan all accounts.
const candidates = requesterAccountId
? listBindingsForAccount(requesterAccountId)
: listAllBindings();
const bindings = candidates.filter(
(entry) => entry.targetSessionKey === event.childSessionKey && entry.targetKind === "subagent",
);
if (bindings.length === 0) {
return undefined;
}
let binding: (typeof bindings)[number] | undefined;
if (requesterThreadId) {
binding = bindings.find(
(entry) =>
entry.conversationId === requesterThreadId &&
(!requesterAccountId || entry.accountId === requesterAccountId),
);
}
if (!binding && bindings.length === 1) {
binding = bindings[0];
}
if (!binding) {
return undefined;
}
const roomId = binding.parentConversationId ?? binding.conversationId;
const threadId =
binding.parentConversationId && binding.parentConversationId !== binding.conversationId
? binding.conversationId
: undefined;
return {
origin: {
channel: "matrix",
accountId: binding.accountId,
to: `room:${roomId}`,
...(threadId ? { threadId } : {}),
},
};
}

View File

@@ -40,6 +40,7 @@ export type MatrixThreadBindingManager = {
targetSessionKey: string;
maxAgeMs: number;
}) => MatrixThreadBindingRecord[];
persist: () => Promise<void>;
stop: () => void;
};
@@ -135,6 +136,10 @@ export function listBindingsForAccount(accountId: string): MatrixThreadBindingRe
);
}
export function listAllBindings(): MatrixThreadBindingRecord[] {
return [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()];
}
export function getMatrixThreadBindingManagerEntry(
accountId: string,
): MatrixThreadBindingManagerCacheEntry | null {

View File

@@ -292,6 +292,7 @@ export async function createMatrixThreadBindingManager(params: {
accountId: params.accountId,
getIdleTimeoutMs: () => defaults.idleTimeoutMs,
getMaxAgeMs: () => defaults.maxAgeMs,
persist,
getByConversation: ({ conversationId, parentConversationId }) =>
listBindingsForAccount(params.accountId).find((entry) => {
if (entry.conversationId !== conversationId.trim()) {

View File

@@ -10,7 +10,12 @@ export {
} from "../gateway/session-utils.js";
export { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
export { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
export { normalizeDeliveryContext } from "../utils/delivery-context.shared.js";
export {
mergeDeliveryContext,
normalizeDeliveryContext,
} from "../utils/delivery-context.shared.js";
export { resolveConversationDeliveryTarget } from "../utils/delivery-context.js";
export { getSessionBindingService } from "../infra/outbound/session-binding-service.js";
export { resolveAgentConfig } from "./agent-scope.js";
export { AGENT_LANE_SUBAGENT } from "./lanes.js";
export { resolveSubagentSpawnModelSelection } from "./model-selection.js";

View File

@@ -125,6 +125,22 @@ export async function loadSubagentSpawnModuleForTest(params: {
cfg?: Record<string, unknown>;
sessionKey?: string;
}) => { sandboxed: boolean };
getSessionBindingService?: () => {
listBySession: (targetSessionKey: string) => Array<{
status?: string;
conversation: {
channel: string;
accountId?: string;
conversationId: string;
parentConversationId?: string;
};
}>;
};
resolveConversationDeliveryTarget?: (params: {
channel?: string;
conversationId?: string | number;
parentConversationId?: string | number;
}) => { to?: string; threadId?: string };
workspaceDir?: string;
sessionStorePath?: string;
resetModules?: boolean;
@@ -165,6 +181,22 @@ export async function loadSubagentSpawnModuleForTest(params: {
isAdminOnlyMethod: (method: string) =>
method === "sessions.patch" || method === "sessions.delete",
pruneLegacyStoreKeys: (...args: unknown[]) => params.pruneLegacyStoreKeysMock?.(...args),
getSessionBindingService:
params.getSessionBindingService ?? (() => ({ listBySession: () => [] })),
resolveConversationDeliveryTarget:
params.resolveConversationDeliveryTarget ??
((targetParams: { channel?: string; conversationId?: string | number }) => ({
to: targetParams.conversationId
? `channel:${String(targetParams.conversationId)}`
: undefined,
})),
mergeDeliveryContext: (
primary?: Record<string, unknown>,
fallback?: Record<string, unknown>,
) => ({
...fallback,
...primary,
}),
resolveGatewaySessionStoreTarget: (targetParams: { key: string }) => ({
agentId: "main",
storePath: params.sessionStorePath ?? "/tmp/subagent-spawn-model-session.json",

View File

@@ -0,0 +1,183 @@
import os from "node:os";
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
createSubagentSpawnTestConfig,
installSessionStoreCaptureMock,
loadSubagentSpawnModuleForTest,
} from "./subagent-spawn.test-helpers.js";
import { installAcceptedSubagentGatewayMock } from "./test-helpers/subagent-gateway.js";
const hoisted = vi.hoisted(() => ({
callGatewayMock: vi.fn(),
updateSessionStoreMock: vi.fn(),
registerSubagentRunMock: vi.fn(),
emitSessionLifecycleEventMock: vi.fn(),
hookRunner: {
hasHooks: vi.fn(),
runSubagentSpawning: vi.fn(),
},
}));
describe("spawnSubagentDirect thread binding delivery", () => {
beforeEach(() => {
vi.resetModules();
hoisted.callGatewayMock.mockReset();
hoisted.updateSessionStoreMock.mockReset();
hoisted.registerSubagentRunMock.mockReset();
hoisted.emitSessionLifecycleEventMock.mockReset();
hoisted.hookRunner.hasHooks.mockReset();
hoisted.hookRunner.runSubagentSpawning.mockReset();
installAcceptedSubagentGatewayMock(hoisted.callGatewayMock);
installSessionStoreCaptureMock(hoisted.updateSessionStoreMock);
});
it("seeds a thread-bound child session from the binding created during spawn", async () => {
hoisted.hookRunner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "subagent_spawning",
);
hoisted.hookRunner.runSubagentSpawning.mockResolvedValue({
status: "ok",
threadBindingReady: true,
deliveryOrigin: {
channel: "matrix",
accountId: "sut",
to: "room:!room:example",
threadId: "$thread-root",
},
});
const { spawnSubagentDirect } = await loadSubagentSpawnModuleForTest({
callGatewayMock: hoisted.callGatewayMock,
loadConfig: () =>
createSubagentSpawnTestConfig(os.tmpdir(), {
agents: {
defaults: {
workspace: os.tmpdir(),
},
list: [{ id: "main", workspace: "/tmp/workspace-main" }],
},
}),
updateSessionStoreMock: hoisted.updateSessionStoreMock,
registerSubagentRunMock: hoisted.registerSubagentRunMock,
emitSessionLifecycleEventMock: hoisted.emitSessionLifecycleEventMock,
hookRunner: hoisted.hookRunner,
resolveSubagentSpawnModelSelection: () => "openai-codex/gpt-5.4",
resolveSandboxRuntimeStatus: () => ({ sandboxed: false }),
});
const result = await spawnSubagentDirect(
{
task: "reply with a marker",
thread: true,
mode: "session",
},
{
agentSessionKey: "agent:main:main",
agentChannel: "matrix",
agentAccountId: "sut",
agentTo: "room:!room:example",
},
);
expect(result.status).toBe("accepted");
const agentCall = hoisted.callGatewayMock.mock.calls.find(
([call]) => (call as { method?: string }).method === "agent",
)?.[0] as { params?: Record<string, unknown> } | undefined;
expect(agentCall?.params).toMatchObject({
channel: "matrix",
accountId: "sut",
to: "room:!room:example",
threadId: "$thread-root",
deliver: true,
});
expect(hoisted.registerSubagentRunMock).toHaveBeenCalledWith(
expect.objectContaining({
requesterOrigin: {
channel: "matrix",
accountId: "sut",
to: "room:!room:example",
threadId: "$thread-root",
},
expectsCompletionMessage: false,
spawnMode: "session",
}),
);
});
it("keeps completion announcements when only a generic binding is available", async () => {
hoisted.hookRunner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "subagent_spawning",
);
hoisted.hookRunner.runSubagentSpawning.mockResolvedValue({
status: "ok",
threadBindingReady: true,
});
const { spawnSubagentDirect } = await loadSubagentSpawnModuleForTest({
callGatewayMock: hoisted.callGatewayMock,
loadConfig: () =>
createSubagentSpawnTestConfig(os.tmpdir(), {
agents: {
defaults: {
workspace: os.tmpdir(),
},
list: [{ id: "main", workspace: "/tmp/workspace-main" }],
},
}),
updateSessionStoreMock: hoisted.updateSessionStoreMock,
registerSubagentRunMock: hoisted.registerSubagentRunMock,
emitSessionLifecycleEventMock: hoisted.emitSessionLifecycleEventMock,
hookRunner: hoisted.hookRunner,
getSessionBindingService: () => ({
listBySession: () => [
{
status: "active",
conversation: {
channel: "feishu",
accountId: "work",
conversationId: "oc_dm_chat_1",
},
},
],
}),
resolveConversationDeliveryTarget: () => ({
to: "channel:oc_dm_chat_1",
}),
resolveSubagentSpawnModelSelection: () => "openai-codex/gpt-5.4",
resolveSandboxRuntimeStatus: () => ({ sandboxed: false }),
});
const result = await spawnSubagentDirect(
{
task: "reply with a marker",
thread: true,
mode: "session",
},
{
agentSessionKey: "agent:main:main",
agentChannel: "matrix",
agentAccountId: "sut",
agentTo: "room:!parent:example",
},
);
expect(result.status).toBe("accepted");
const agentCall = hoisted.callGatewayMock.mock.calls.find(
([call]) => (call as { method?: string }).method === "agent",
)?.[0] as { params?: Record<string, unknown> } | undefined;
expect(agentCall?.params).toMatchObject({
channel: "matrix",
accountId: "sut",
to: "room:!parent:example",
deliver: false,
});
expect(hoisted.registerSubagentRunMock).toHaveBeenCalledWith(
expect.objectContaining({
expectsCompletionMessage: true,
requesterOrigin: {
channel: "matrix",
accountId: "sut",
to: "room:!parent:example",
},
}),
);
});
});

View File

@@ -7,6 +7,7 @@ import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalString,
} from "../shared/string-coerce.js";
import type { DeliveryContext } from "../utils/delivery-context.types.js";
import type { BootstrapContextMode } from "./bootstrap-files.js";
import {
mapToolContextToSpawnedRunMetadata,
@@ -41,6 +42,7 @@ import {
getGlobalHookRunner,
loadConfig,
mergeSessionEntry,
mergeDeliveryContext,
normalizeDeliveryContext,
pruneLegacyStoreKeys,
resolveAgentConfig,
@@ -295,7 +297,9 @@ async function ensureThreadBindingForSubagentSpawn(params: {
to?: string;
threadId?: string | number;
};
}): Promise<{ status: "ok" } | { status: "error"; error: string }> {
}): Promise<
{ status: "ok"; deliveryOrigin?: DeliveryContext } | { status: "error"; error: string }
> {
const hookRunner = params.hookRunner;
if (!hookRunner?.hasHooks("subagent_spawning")) {
return {
@@ -334,7 +338,11 @@ async function ensureThreadBindingForSubagentSpawn(params: {
"Unable to create or bind a thread for this subagent session. Session mode is unavailable for this target.",
};
}
return { status: "ok" };
const deliveryOrigin = normalizeDeliveryContext(result.deliveryOrigin);
return {
status: "ok",
...(deliveryOrigin ? { deliveryOrigin } : {}),
};
} catch (err) {
return {
status: "error",
@@ -343,6 +351,12 @@ async function ensureThreadBindingForSubagentSpawn(params: {
}
}
function hasRoutableDeliveryOrigin(
origin?: DeliveryContext,
): origin is DeliveryContext & { channel: string; to: string } {
return Boolean(origin?.channel && origin.to);
}
export async function spawnSubagentDirect(
params: SpawnSubagentParams,
ctx: SpawnSubagentContext,
@@ -388,6 +402,7 @@ export async function spawnSubagentDirect(
to: ctx.agentTo,
threadId: ctx.agentThreadId,
});
let childSessionOrigin = requesterOrigin;
const hookRunner = subagentSpawnDeps.getGlobalHookRunner();
const cfg = loadSubagentConfig();
@@ -400,6 +415,7 @@ export async function spawnSubagentDirect(
});
let modelApplied = false;
let threadBindingReady = false;
let hasBoundThreadDeliveryOrigin = false;
const { mainKey, alias } = resolveMainSessionAlias(cfg);
const requesterSessionKey = ctx.agentSessionKey;
const requesterInternalKey = requesterSessionKey
@@ -597,12 +613,15 @@ export async function spawnSubagentDirect(
};
}
threadBindingReady = true;
hasBoundThreadDeliveryOrigin = hasRoutableDeliveryOrigin(bindResult.deliveryOrigin);
childSessionOrigin =
mergeDeliveryContext(bindResult.deliveryOrigin, requesterOrigin) ?? childSessionOrigin;
}
const mountPathHint = sanitizeMountPathHint(params.attachMountPath);
let childSystemPrompt = buildSubagentSystemPrompt({
requesterSessionKey,
requesterOrigin,
requesterOrigin: childSessionOrigin,
childSessionKey,
label: label || undefined,
task,
@@ -698,6 +717,11 @@ export async function spawnSubagentDirect(
const childIdem = crypto.randomUUID();
let childRunId: string = childIdem;
const deliverInitialChildRunDirectly =
requestThreadBinding && spawnMode === "session" && hasBoundThreadDeliveryOrigin;
const shouldAnnounceCompletion = deliverInitialChildRunDirectly
? false
: expectsCompletionMessage;
try {
const {
spawnedBy: _spawnedBy,
@@ -709,12 +733,13 @@ export async function spawnSubagentDirect(
params: {
message: childTaskMessage,
sessionKey: childSessionKey,
channel: requesterOrigin?.channel,
to: requesterOrigin?.to ?? undefined,
accountId: requesterOrigin?.accountId ?? undefined,
threadId: requesterOrigin?.threadId != null ? String(requesterOrigin.threadId) : undefined,
channel: childSessionOrigin?.channel,
to: childSessionOrigin?.to ?? undefined,
accountId: childSessionOrigin?.accountId ?? undefined,
threadId:
childSessionOrigin?.threadId != null ? String(childSessionOrigin.threadId) : undefined,
idempotencyKey: childIdem,
deliver: false,
deliver: deliverInitialChildRunDirectly,
lane: AGENT_LANE_SUBAGENT,
extraSystemPrompt: childSystemPrompt,
thinking: thinkingOverride,
@@ -754,7 +779,7 @@ export async function spawnSubagentDirect(
targetKind: "subagent",
reason: "spawn-failed",
sendFarewell: true,
accountId: requesterOrigin?.accountId,
accountId: childSessionOrigin?.accountId,
runId: childRunId,
outcome: "error",
error: "Session failed to start",
@@ -802,7 +827,7 @@ export async function spawnSubagentDirect(
childSessionKey,
controllerSessionKey: requesterInternalKey,
requesterSessionKey: requesterInternalKey,
requesterOrigin,
requesterOrigin: childSessionOrigin,
requesterDisplayKey,
task,
cleanup,
@@ -810,7 +835,7 @@ export async function spawnSubagentDirect(
model: resolvedModel,
workspaceDir: spawnedMetadata.workspaceDir,
runTimeoutSeconds,
expectsCompletionMessage,
expectsCompletionMessage: shouldAnnounceCompletion,
spawnMode,
attachmentsDir: attachmentAbsDir,
attachmentsRootDir: attachmentRootDir,

View File

@@ -86,6 +86,35 @@ describe("createChannelOutboundRuntimeSend", () => {
);
});
it("accepts plugin outbound thread and reply aliases", async () => {
const sendText = vi.fn(async () => ({ channel: "matrix", messageId: "$reply" }));
mocks.loadChannelOutboundAdapter.mockResolvedValue({
sendText,
});
const { createChannelOutboundRuntimeSend } = await import("./channel-outbound-send.js");
const runtimeSend = createChannelOutboundRuntimeSend({
channelId: "matrix" as never,
unavailableMessage: "unavailable",
});
await runtimeSend.sendMessage("room:!ops:example.org", "hello thread", {
cfg: {},
accountId: "sut",
replyToId: "$parent",
threadId: "$thread-root",
});
expect(sendText).toHaveBeenCalledWith(
expect.objectContaining({
accountId: "sut",
replyToId: "$parent",
threadId: "$thread-root",
to: "room:!ops:example.org",
}),
);
});
it("falls back to sendText when media is present but sendMedia is unavailable", async () => {
const sendText = vi.fn(async () => ({ channel: "whatsapp", messageId: "wa-3" }));
mocks.loadChannelOutboundAdapter.mockResolvedValue({

View File

@@ -12,7 +12,9 @@ type RuntimeSendOpts = {
mediaLocalRoots?: readonly string[];
mediaReadFile?: (filePath: string) => Promise<Buffer>;
accountId?: string;
threadId?: string | number | null;
messageThreadId?: string | number;
replyToId?: string | number | null;
replyToMessageId?: string | number;
silent?: boolean;
forceDocument?: boolean;
@@ -20,6 +22,15 @@ type RuntimeSendOpts = {
gatewayClientScopes?: readonly string[];
};
function resolveRuntimeThreadId(opts: RuntimeSendOpts): string | number | undefined {
return opts.messageThreadId ?? opts.threadId ?? undefined;
}
function resolveRuntimeReplyToId(opts: RuntimeSendOpts): string | undefined {
const raw = opts.replyToMessageId ?? opts.replyToId;
return raw == null ? undefined : normalizeOptionalString(String(raw));
}
export function createChannelOutboundRuntimeSend(params: {
channelId: ChannelId;
unavailableMessage: string;
@@ -27,32 +38,9 @@ export function createChannelOutboundRuntimeSend(params: {
return {
sendMessage: async (to: string, text: string, opts: RuntimeSendOpts = {}) => {
const outbound = await loadChannelOutboundAdapter(params.channelId);
const hasMedia = Boolean(opts.mediaUrl);
if (hasMedia && outbound?.sendMedia) {
return await outbound.sendMedia({
cfg: opts.cfg ?? loadConfig(),
to,
text,
mediaUrl: opts.mediaUrl,
mediaAccess: opts.mediaAccess,
mediaLocalRoots: opts.mediaLocalRoots,
mediaReadFile: opts.mediaReadFile,
accountId: opts.accountId,
threadId: opts.messageThreadId,
replyToId:
opts.replyToMessageId == null
? undefined
: normalizeOptionalString(String(opts.replyToMessageId)),
silent: opts.silent,
forceDocument: opts.forceDocument,
gifPlayback: opts.gifPlayback,
gatewayClientScopes: opts.gatewayClientScopes,
});
}
if (!outbound?.sendText) {
throw new Error(params.unavailableMessage);
}
return await outbound.sendText({
const threadId = resolveRuntimeThreadId(opts);
const replyToId = resolveRuntimeReplyToId(opts);
const buildContext = () => ({
cfg: opts.cfg ?? loadConfig(),
to,
text,
@@ -61,16 +49,21 @@ export function createChannelOutboundRuntimeSend(params: {
mediaLocalRoots: opts.mediaLocalRoots,
mediaReadFile: opts.mediaReadFile,
accountId: opts.accountId,
threadId: opts.messageThreadId,
replyToId:
opts.replyToMessageId == null
? undefined
: normalizeOptionalString(String(opts.replyToMessageId)),
threadId,
replyToId,
silent: opts.silent,
forceDocument: opts.forceDocument,
gifPlayback: opts.gifPlayback,
gatewayClientScopes: opts.gatewayClientScopes,
});
const hasMedia = Boolean(opts.mediaUrl);
if (hasMedia && outbound?.sendMedia) {
return await outbound.sendMedia(buildContext());
}
if (!outbound?.sendText) {
throw new Error(params.unavailableMessage);
}
return await outbound.sendText(buildContext());
},
};
}

View File

@@ -437,7 +437,7 @@ describe("session binding service", () => {
}
});
it("keeps the first live adapter authoritative until it unregisters", () => {
it("keeps the newest live adapter authoritative until it unregisters", () => {
const firstBinding = {
bindingId: "first-binding",
targetSessionKey: "agent:main",
@@ -457,17 +457,30 @@ describe("session binding service", () => {
targetSessionKey === "agent:main" ? [firstBinding] : [],
resolveByConversation: () => null,
};
const secondBinding = {
bindingId: "second-binding",
targetSessionKey: "agent:main",
targetKind: "session" as const,
conversation: {
channel: "demo-binding",
accountId: "default",
conversationId: "thread-2",
},
status: "active" as const,
boundAt: 2,
};
const secondAdapter: SessionBindingAdapter = {
channel: "Demo-Binding",
accountId: "DEFAULT",
listBySession: () => [],
listBySession: (targetSessionKey) =>
targetSessionKey === "agent:main" ? [secondBinding] : [],
resolveByConversation: () => null,
};
registerSessionBindingAdapter(firstAdapter);
registerSessionBindingAdapter(secondAdapter);
expect(getSessionBindingService().listBySession("agent:main")).toEqual([firstBinding]);
expect(getSessionBindingService().listBySession("agent:main")).toEqual([secondBinding]);
unregisterSessionBindingAdapter({
channel: "demo-binding",
@@ -529,13 +542,13 @@ describe("session binding service", () => {
conversationId: "thread-1",
}),
});
expect(firstBind).toHaveBeenCalledTimes(1);
expect(secondBind).not.toHaveBeenCalled();
expect(firstBind).not.toHaveBeenCalled();
expect(secondBind).toHaveBeenCalledTimes(1);
first.unregisterSessionBindingAdapter({
second.unregisterSessionBindingAdapter({
channel: "demo-binding",
accountId: "default",
adapter: firstAdapter,
adapter: secondAdapter,
});
await expect(
@@ -558,10 +571,10 @@ describe("session binding service", () => {
expect(firstBind).toHaveBeenCalledTimes(1);
expect(secondBind).toHaveBeenCalledTimes(1);
second.unregisterSessionBindingAdapter({
first.unregisterSessionBindingAdapter({
channel: "demo-binding",
accountId: "default",
adapter: secondAdapter,
adapter: firstAdapter,
});
await expect(

View File

@@ -135,7 +135,7 @@ const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap<string, SessionBindingAdapt
function getActiveAdapterForKey(key: string): SessionBindingAdapter | null {
const registrations = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key);
return registrations?.[0]?.normalizedAdapter ?? null;
return registrations?.at(-1)?.normalizedAdapter ?? null;
}
export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void {
@@ -210,7 +210,7 @@ function resolveAdapterForChannelAccount(params: {
function getActiveRegisteredAdapters(): SessionBindingAdapter[] {
return [...ADAPTERS_BY_CHANNEL_ACCOUNT.values()]
.map((registrations) => registrations[0]?.normalizedAdapter ?? null)
.map((registrations) => registrations.at(-1)?.normalizedAdapter ?? null)
.filter((adapter): adapter is SessionBindingAdapter => Boolean(adapter));
}

View File

@@ -421,6 +421,12 @@ export type PluginHookSubagentSpawningResult =
| {
status: "ok";
threadBindingReady?: boolean;
deliveryOrigin?: {
channel?: string;
accountId?: string;
to?: string;
threadId?: string | number;
};
}
| {
status: "error";

View File

@@ -258,9 +258,11 @@ export function createHookRunner(
if (next.status === "error") {
return next;
}
const deliveryOrigin = acc?.deliveryOrigin ?? next.deliveryOrigin;
return {
status: "ok",
threadBindingReady: Boolean(acc?.threadBindingReady || next.threadBindingReady),
...(deliveryOrigin ? { deliveryOrigin } : {}),
};
};

View File

@@ -2,7 +2,7 @@
* Test: subagent_spawning, subagent_delivery_target, subagent_spawned & subagent_ended hook wiring
*/
import { describe, expect, it, vi } from "vitest";
import { createHookRunnerWithRegistry } from "./hooks.test-helpers.js";
import { addStaticTestHooks, createHookRunnerWithRegistry } from "./hooks.test-helpers.js";
describe("subagent hook runner methods", () => {
const baseRequester = {
@@ -162,4 +162,66 @@ describe("subagent hook runner methods", () => {
expect(runner.hasHooks("subagent_spawned")).toBe(false);
expect(runner.hasHooks("subagent_ended")).toBe(false);
});
it("runSubagentSpawning preserves higher-priority delivery origins", async () => {
const { registry, runner } = createHookRunnerWithRegistry([]);
addStaticTestHooks(registry, {
hookName: "subagent_spawning",
hooks: [
{
pluginId: "high",
priority: 100,
result: {
status: "ok",
threadBindingReady: true,
deliveryOrigin: {
channel: "matrix",
accountId: "ops",
to: "room:!high:example",
threadId: "$high",
},
},
},
{
pluginId: "low",
priority: 10,
result: {
status: "ok",
threadBindingReady: true,
deliveryOrigin: {
channel: "matrix",
accountId: "ops",
to: "room:!low:example",
threadId: "$low",
},
},
},
],
});
const result = await runner.runSubagentSpawning(
{
childSessionKey: "agent:main:subagent:child",
agentId: "main",
mode: "session",
requester: baseRequester,
threadRequested: true,
},
{
childSessionKey: "agent:main:subagent:child",
requesterSessionKey: "agent:main:main",
},
);
expect(result).toEqual({
status: "ok",
threadBindingReady: true,
deliveryOrigin: {
channel: "matrix",
accountId: "ops",
to: "room:!high:example",
threadId: "$high",
},
});
});
});