mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-07 14:50:42 +00:00
Matrix: preserve subagent thread delivery
This commit is contained in:
@@ -72,7 +72,13 @@ describe("handleMatrixSubagentSpawning", () => {
|
||||
// Default: manager exists
|
||||
getManagerMock.mockReturnValue({ persist: vi.fn() });
|
||||
// Default: bind resolves ok
|
||||
bindMock.mockResolvedValue({ conversation: {} });
|
||||
bindMock.mockResolvedValue({
|
||||
conversation: {
|
||||
accountId: "default",
|
||||
conversationId: "$thread-root",
|
||||
parentConversationId: "!room123:example.org",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("returns undefined when threadRequested is false", async () => {
|
||||
@@ -174,7 +180,13 @@ describe("handleMatrixSubagentSpawning", () => {
|
||||
});
|
||||
|
||||
it("calls bind with the resolved room id and returns ok", async () => {
|
||||
bindMock.mockResolvedValue({ conversation: {} });
|
||||
bindMock.mockResolvedValue({
|
||||
conversation: {
|
||||
accountId: "ops",
|
||||
conversationId: "$thread-ops",
|
||||
parentConversationId: "!roomAbc:technerik.com",
|
||||
},
|
||||
});
|
||||
const result = await handleMatrixSubagentSpawning(
|
||||
fakeApi,
|
||||
makeSpawnEvent({
|
||||
@@ -202,11 +214,26 @@ describe("handleMatrixSubagentSpawning", () => {
|
||||
}),
|
||||
}),
|
||||
);
|
||||
expect(result).toEqual({ status: "ok", threadBindingReady: true });
|
||||
expect(result).toMatchObject({
|
||||
status: "ok",
|
||||
threadBindingReady: true,
|
||||
deliveryOrigin: {
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
to: "room:!roomAbc:technerik.com",
|
||||
threadId: "$thread-ops",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("uses 'default' as accountId when requester.accountId is absent", async () => {
|
||||
bindMock.mockResolvedValue({ conversation: {} });
|
||||
bindMock.mockResolvedValue({
|
||||
conversation: {
|
||||
accountId: "default",
|
||||
conversationId: "$thread-default",
|
||||
parentConversationId: "!room123:example.org",
|
||||
},
|
||||
});
|
||||
await handleMatrixSubagentSpawning(fakeApi, makeSpawnEvent({ accountId: undefined as never }));
|
||||
expect(getManagerMock).toHaveBeenCalledWith("default");
|
||||
expect(bindMock).toHaveBeenCalledWith(
|
||||
@@ -241,7 +268,7 @@ describe("handleMatrixSubagentSpawning", () => {
|
||||
fakeApi,
|
||||
makeSpawnEvent({ accountId: "forge" }),
|
||||
);
|
||||
expect(result).toEqual({ status: "ok", threadBindingReady: true });
|
||||
expect(result).toMatchObject({ status: "ok", threadBindingReady: true });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -600,8 +627,8 @@ describe("concurrent spawns across accounts", () => {
|
||||
}),
|
||||
]);
|
||||
|
||||
expect(opsResult).toEqual({ status: "ok", threadBindingReady: true });
|
||||
expect(forgeResult).toEqual({ status: "ok", threadBindingReady: true });
|
||||
expect(opsResult).toMatchObject({ status: "ok", threadBindingReady: true });
|
||||
expect(forgeResult).toMatchObject({ status: "ok", threadBindingReady: true });
|
||||
expect(bindMock).toHaveBeenCalledTimes(2);
|
||||
|
||||
// Each bind call targeted the correct account's room
|
||||
@@ -651,6 +678,6 @@ describe("concurrent spawns across accounts", () => {
|
||||
error: expect.stringContaining("ops provider auth failed"),
|
||||
}),
|
||||
);
|
||||
expect(forgeResult).toEqual({ status: "ok", threadBindingReady: true });
|
||||
expect(forgeResult).toMatchObject({ status: "ok", threadBindingReady: true });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -42,7 +42,16 @@ type MatrixSubagentDeliveryTargetEvent = {
|
||||
};
|
||||
|
||||
type SpawningResult =
|
||||
| { status: "ok"; threadBindingReady?: boolean }
|
||||
| {
|
||||
status: "ok";
|
||||
threadBindingReady?: boolean;
|
||||
deliveryOrigin?: {
|
||||
channel: string;
|
||||
accountId: string;
|
||||
to: string;
|
||||
threadId?: string;
|
||||
};
|
||||
}
|
||||
| { status: "error"; error: string };
|
||||
|
||||
type DeliveryTargetResult = {
|
||||
@@ -109,7 +118,7 @@ export async function handleMatrixSubagentSpawning(
|
||||
status: "error",
|
||||
error:
|
||||
"Matrix thread bindings are disabled (set channels.matrix.threadBindings.enabled=true to override for this account, or session.threadBindings.enabled=true globally).",
|
||||
};
|
||||
} satisfies SpawningResult;
|
||||
}
|
||||
if (!flags.spawnSubagentSessions) {
|
||||
return {
|
||||
@@ -152,7 +161,7 @@ export async function handleMatrixSubagentSpawning(
|
||||
//
|
||||
// We do NOT call setBindingRecord here — the adapter's bind() handles
|
||||
// record creation, thread creation, and persistence atomically.
|
||||
await getSessionBindingService().bind({
|
||||
const binding = await getSessionBindingService().bind({
|
||||
targetSessionKey: event.childSessionKey,
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
@@ -167,14 +176,30 @@ export async function handleMatrixSubagentSpawning(
|
||||
boundBy: "system",
|
||||
},
|
||||
});
|
||||
const boundRoomId =
|
||||
binding.conversation.parentConversationId ?? binding.conversation.conversationId;
|
||||
const threadId =
|
||||
binding.conversation.parentConversationId &&
|
||||
binding.conversation.parentConversationId !== binding.conversation.conversationId
|
||||
? binding.conversation.conversationId
|
||||
: undefined;
|
||||
const result = {
|
||||
status: "ok",
|
||||
threadBindingReady: true,
|
||||
deliveryOrigin: {
|
||||
channel: "matrix",
|
||||
accountId: binding.conversation.accountId ?? accountId,
|
||||
to: `room:${boundRoomId}`,
|
||||
...(threadId ? { threadId } : {}),
|
||||
},
|
||||
} satisfies SpawningResult;
|
||||
return result;
|
||||
} catch (err) {
|
||||
return {
|
||||
status: "error",
|
||||
error: `Matrix thread bind failed: ${summarizeError(err)}`,
|
||||
};
|
||||
}
|
||||
|
||||
return { status: "ok", threadBindingReady: true };
|
||||
}
|
||||
|
||||
export async function handleMatrixSubagentEnded(event: MatrixSubagentEndedEvent): Promise<void> {
|
||||
|
||||
@@ -10,7 +10,12 @@ export {
|
||||
} from "../gateway/session-utils.js";
|
||||
export { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
|
||||
export { emitSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
|
||||
export { normalizeDeliveryContext } from "../utils/delivery-context.shared.js";
|
||||
export {
|
||||
mergeDeliveryContext,
|
||||
normalizeDeliveryContext,
|
||||
} from "../utils/delivery-context.shared.js";
|
||||
export { resolveConversationDeliveryTarget } from "../utils/delivery-context.js";
|
||||
export { getSessionBindingService } from "../infra/outbound/session-binding-service.js";
|
||||
export { resolveAgentConfig } from "./agent-scope.js";
|
||||
export { AGENT_LANE_SUBAGENT } from "./lanes.js";
|
||||
export { resolveSubagentSpawnModelSelection } from "./model-selection.js";
|
||||
|
||||
@@ -125,6 +125,22 @@ export async function loadSubagentSpawnModuleForTest(params: {
|
||||
cfg?: Record<string, unknown>;
|
||||
sessionKey?: string;
|
||||
}) => { sandboxed: boolean };
|
||||
getSessionBindingService?: () => {
|
||||
listBySession: (targetSessionKey: string) => Array<{
|
||||
status?: string;
|
||||
conversation: {
|
||||
channel: string;
|
||||
accountId?: string;
|
||||
conversationId: string;
|
||||
parentConversationId?: string;
|
||||
};
|
||||
}>;
|
||||
};
|
||||
resolveConversationDeliveryTarget?: (params: {
|
||||
channel?: string;
|
||||
conversationId?: string | number;
|
||||
parentConversationId?: string | number;
|
||||
}) => { to?: string; threadId?: string };
|
||||
workspaceDir?: string;
|
||||
sessionStorePath?: string;
|
||||
resetModules?: boolean;
|
||||
@@ -165,6 +181,22 @@ export async function loadSubagentSpawnModuleForTest(params: {
|
||||
isAdminOnlyMethod: (method: string) =>
|
||||
method === "sessions.patch" || method === "sessions.delete",
|
||||
pruneLegacyStoreKeys: (...args: unknown[]) => params.pruneLegacyStoreKeysMock?.(...args),
|
||||
getSessionBindingService:
|
||||
params.getSessionBindingService ?? (() => ({ listBySession: () => [] })),
|
||||
resolveConversationDeliveryTarget:
|
||||
params.resolveConversationDeliveryTarget ??
|
||||
((targetParams: { channel?: string; conversationId?: string | number }) => ({
|
||||
to: targetParams.conversationId
|
||||
? `channel:${String(targetParams.conversationId)}`
|
||||
: undefined,
|
||||
})),
|
||||
mergeDeliveryContext: (
|
||||
primary?: Record<string, unknown>,
|
||||
fallback?: Record<string, unknown>,
|
||||
) => ({
|
||||
...fallback,
|
||||
...primary,
|
||||
}),
|
||||
resolveGatewaySessionStoreTarget: (targetParams: { key: string }) => ({
|
||||
agentId: "main",
|
||||
storePath: params.sessionStorePath ?? "/tmp/subagent-spawn-model-session.json",
|
||||
|
||||
110
src/agents/subagent-spawn.thread-binding.test.ts
Normal file
110
src/agents/subagent-spawn.thread-binding.test.ts
Normal file
@@ -0,0 +1,110 @@
|
||||
import os from "node:os";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
createSubagentSpawnTestConfig,
|
||||
installSessionStoreCaptureMock,
|
||||
loadSubagentSpawnModuleForTest,
|
||||
} from "./subagent-spawn.test-helpers.js";
|
||||
import { installAcceptedSubagentGatewayMock } from "./test-helpers/subagent-gateway.js";
|
||||
|
||||
const hoisted = vi.hoisted(() => ({
|
||||
callGatewayMock: vi.fn(),
|
||||
updateSessionStoreMock: vi.fn(),
|
||||
registerSubagentRunMock: vi.fn(),
|
||||
emitSessionLifecycleEventMock: vi.fn(),
|
||||
hookRunner: {
|
||||
hasHooks: vi.fn(),
|
||||
runSubagentSpawning: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
describe("spawnSubagentDirect thread binding delivery", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetModules();
|
||||
hoisted.callGatewayMock.mockReset();
|
||||
hoisted.updateSessionStoreMock.mockReset();
|
||||
hoisted.registerSubagentRunMock.mockReset();
|
||||
hoisted.emitSessionLifecycleEventMock.mockReset();
|
||||
hoisted.hookRunner.hasHooks.mockReset();
|
||||
hoisted.hookRunner.runSubagentSpawning.mockReset();
|
||||
installAcceptedSubagentGatewayMock(hoisted.callGatewayMock);
|
||||
installSessionStoreCaptureMock(hoisted.updateSessionStoreMock);
|
||||
});
|
||||
|
||||
it("seeds a thread-bound child session from the binding created during spawn", async () => {
|
||||
hoisted.hookRunner.hasHooks.mockImplementation(
|
||||
(hookName?: string) => hookName === "subagent_spawning",
|
||||
);
|
||||
hoisted.hookRunner.runSubagentSpawning.mockResolvedValue({
|
||||
status: "ok",
|
||||
threadBindingReady: true,
|
||||
deliveryOrigin: {
|
||||
channel: "matrix",
|
||||
accountId: "sut",
|
||||
to: "room:!room:example",
|
||||
threadId: "$thread-root",
|
||||
},
|
||||
});
|
||||
const { spawnSubagentDirect } = await loadSubagentSpawnModuleForTest({
|
||||
callGatewayMock: hoisted.callGatewayMock,
|
||||
loadConfig: () =>
|
||||
createSubagentSpawnTestConfig(os.tmpdir(), {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: os.tmpdir(),
|
||||
},
|
||||
list: [{ id: "main", workspace: "/tmp/workspace-main" }],
|
||||
},
|
||||
}),
|
||||
updateSessionStoreMock: hoisted.updateSessionStoreMock,
|
||||
registerSubagentRunMock: hoisted.registerSubagentRunMock,
|
||||
emitSessionLifecycleEventMock: hoisted.emitSessionLifecycleEventMock,
|
||||
hookRunner: hoisted.hookRunner,
|
||||
getSessionBindingService: () => ({ listBySession: () => [] }),
|
||||
resolveConversationDeliveryTarget: () => ({
|
||||
to: "room:!room:example",
|
||||
threadId: "$thread-root",
|
||||
}),
|
||||
resolveSubagentSpawnModelSelection: () => "openai-codex/gpt-5.4",
|
||||
resolveSandboxRuntimeStatus: () => ({ sandboxed: false }),
|
||||
});
|
||||
|
||||
const result = await spawnSubagentDirect(
|
||||
{
|
||||
task: "reply with a marker",
|
||||
thread: true,
|
||||
mode: "session",
|
||||
},
|
||||
{
|
||||
agentSessionKey: "agent:main:main",
|
||||
agentChannel: "matrix",
|
||||
agentAccountId: "sut",
|
||||
agentTo: "room:!room:example",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe("accepted");
|
||||
const agentCall = hoisted.callGatewayMock.mock.calls.find(
|
||||
([call]) => (call as { method?: string }).method === "agent",
|
||||
)?.[0] as { params?: Record<string, unknown> } | undefined;
|
||||
expect(agentCall?.params).toMatchObject({
|
||||
channel: "matrix",
|
||||
accountId: "sut",
|
||||
to: "room:!room:example",
|
||||
threadId: "$thread-root",
|
||||
deliver: true,
|
||||
});
|
||||
expect(hoisted.registerSubagentRunMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
requesterOrigin: {
|
||||
channel: "matrix",
|
||||
accountId: "sut",
|
||||
to: "room:!room:example",
|
||||
threadId: "$thread-root",
|
||||
},
|
||||
expectsCompletionMessage: false,
|
||||
spawnMode: "session",
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -40,10 +40,13 @@ import {
|
||||
emitSessionLifecycleEvent,
|
||||
getGlobalHookRunner,
|
||||
loadConfig,
|
||||
getSessionBindingService,
|
||||
mergeSessionEntry,
|
||||
mergeDeliveryContext,
|
||||
normalizeDeliveryContext,
|
||||
pruneLegacyStoreKeys,
|
||||
resolveAgentConfig,
|
||||
resolveConversationDeliveryTarget,
|
||||
resolveDisplaySessionKey,
|
||||
resolveGatewaySessionStoreTarget,
|
||||
resolveInternalSessionKey,
|
||||
@@ -71,6 +74,13 @@ type SubagentSpawnDeps = {
|
||||
updateSessionStore: typeof updateSessionStore;
|
||||
};
|
||||
|
||||
type SubagentDeliveryOrigin = {
|
||||
channel?: string;
|
||||
accountId?: string;
|
||||
to?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
|
||||
const defaultSubagentSpawnDeps: SubagentSpawnDeps = {
|
||||
callGateway,
|
||||
getGlobalHookRunner,
|
||||
@@ -295,7 +305,9 @@ async function ensureThreadBindingForSubagentSpawn(params: {
|
||||
to?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
}): Promise<{ status: "ok" } | { status: "error"; error: string }> {
|
||||
}): Promise<
|
||||
{ status: "ok"; deliveryOrigin?: SubagentDeliveryOrigin } | { status: "error"; error: string }
|
||||
> {
|
||||
const hookRunner = params.hookRunner;
|
||||
if (!hookRunner?.hasHooks("subagent_spawning")) {
|
||||
return {
|
||||
@@ -334,7 +346,13 @@ async function ensureThreadBindingForSubagentSpawn(params: {
|
||||
"Unable to create or bind a thread for this subagent session. Session mode is unavailable for this target.",
|
||||
};
|
||||
}
|
||||
return { status: "ok" };
|
||||
const deliveryOrigin =
|
||||
normalizeDeliveryContext(result.deliveryOrigin) ??
|
||||
resolveThreadBindingDeliveryOrigin(params.childSessionKey);
|
||||
return {
|
||||
status: "ok",
|
||||
...(deliveryOrigin ? { deliveryOrigin } : {}),
|
||||
};
|
||||
} catch (err) {
|
||||
return {
|
||||
status: "error",
|
||||
@@ -343,6 +361,32 @@ async function ensureThreadBindingForSubagentSpawn(params: {
|
||||
}
|
||||
}
|
||||
|
||||
function resolveThreadBindingDeliveryOrigin(
|
||||
childSessionKey: string,
|
||||
): SubagentDeliveryOrigin | undefined {
|
||||
const activeBindings = getSessionBindingService()
|
||||
.listBySession(childSessionKey)
|
||||
.filter((binding) => binding.status === "active");
|
||||
if (activeBindings.length !== 1) {
|
||||
return undefined;
|
||||
}
|
||||
const binding = activeBindings[0];
|
||||
if (!binding) {
|
||||
return undefined;
|
||||
}
|
||||
const target = resolveConversationDeliveryTarget({
|
||||
channel: binding.conversation.channel,
|
||||
conversationId: binding.conversation.conversationId,
|
||||
parentConversationId: binding.conversation.parentConversationId,
|
||||
});
|
||||
return normalizeDeliveryContext({
|
||||
channel: binding.conversation.channel,
|
||||
accountId: binding.conversation.accountId,
|
||||
to: target.to,
|
||||
threadId: target.threadId,
|
||||
}) as SubagentDeliveryOrigin | undefined;
|
||||
}
|
||||
|
||||
export async function spawnSubagentDirect(
|
||||
params: SpawnSubagentParams,
|
||||
ctx: SpawnSubagentContext,
|
||||
@@ -387,7 +431,8 @@ export async function spawnSubagentDirect(
|
||||
accountId: ctx.agentAccountId,
|
||||
to: ctx.agentTo,
|
||||
threadId: ctx.agentThreadId,
|
||||
});
|
||||
}) as SubagentDeliveryOrigin | undefined;
|
||||
let childSessionOrigin = requesterOrigin;
|
||||
const hookRunner = subagentSpawnDeps.getGlobalHookRunner();
|
||||
const cfg = loadSubagentConfig();
|
||||
|
||||
@@ -597,12 +642,14 @@ export async function spawnSubagentDirect(
|
||||
};
|
||||
}
|
||||
threadBindingReady = true;
|
||||
childSessionOrigin =
|
||||
mergeDeliveryContext(bindResult.deliveryOrigin, requesterOrigin) ?? childSessionOrigin;
|
||||
}
|
||||
const mountPathHint = sanitizeMountPathHint(params.attachMountPath);
|
||||
|
||||
let childSystemPrompt = buildSubagentSystemPrompt({
|
||||
requesterSessionKey,
|
||||
requesterOrigin,
|
||||
requesterOrigin: childSessionOrigin,
|
||||
childSessionKey,
|
||||
label: label || undefined,
|
||||
task,
|
||||
@@ -698,6 +745,13 @@ export async function spawnSubagentDirect(
|
||||
|
||||
const childIdem = crypto.randomUUID();
|
||||
let childRunId: string = childIdem;
|
||||
const deliverInitialChildRunDirectly =
|
||||
requestThreadBinding &&
|
||||
spawnMode === "session" &&
|
||||
Boolean(childSessionOrigin?.channel && childSessionOrigin.to);
|
||||
const shouldAnnounceCompletion = deliverInitialChildRunDirectly
|
||||
? false
|
||||
: expectsCompletionMessage;
|
||||
try {
|
||||
const {
|
||||
spawnedBy: _spawnedBy,
|
||||
@@ -709,12 +763,13 @@ export async function spawnSubagentDirect(
|
||||
params: {
|
||||
message: childTaskMessage,
|
||||
sessionKey: childSessionKey,
|
||||
channel: requesterOrigin?.channel,
|
||||
to: requesterOrigin?.to ?? undefined,
|
||||
accountId: requesterOrigin?.accountId ?? undefined,
|
||||
threadId: requesterOrigin?.threadId != null ? String(requesterOrigin.threadId) : undefined,
|
||||
channel: childSessionOrigin?.channel,
|
||||
to: childSessionOrigin?.to ?? undefined,
|
||||
accountId: childSessionOrigin?.accountId ?? undefined,
|
||||
threadId:
|
||||
childSessionOrigin?.threadId != null ? String(childSessionOrigin.threadId) : undefined,
|
||||
idempotencyKey: childIdem,
|
||||
deliver: false,
|
||||
deliver: deliverInitialChildRunDirectly,
|
||||
lane: AGENT_LANE_SUBAGENT,
|
||||
extraSystemPrompt: childSystemPrompt,
|
||||
thinking: thinkingOverride,
|
||||
@@ -754,7 +809,7 @@ export async function spawnSubagentDirect(
|
||||
targetKind: "subagent",
|
||||
reason: "spawn-failed",
|
||||
sendFarewell: true,
|
||||
accountId: requesterOrigin?.accountId,
|
||||
accountId: childSessionOrigin?.accountId,
|
||||
runId: childRunId,
|
||||
outcome: "error",
|
||||
error: "Session failed to start",
|
||||
@@ -802,7 +857,7 @@ export async function spawnSubagentDirect(
|
||||
childSessionKey,
|
||||
controllerSessionKey: requesterInternalKey,
|
||||
requesterSessionKey: requesterInternalKey,
|
||||
requesterOrigin,
|
||||
requesterOrigin: childSessionOrigin,
|
||||
requesterDisplayKey,
|
||||
task,
|
||||
cleanup,
|
||||
@@ -810,7 +865,7 @@ export async function spawnSubagentDirect(
|
||||
model: resolvedModel,
|
||||
workspaceDir: spawnedMetadata.workspaceDir,
|
||||
runTimeoutSeconds,
|
||||
expectsCompletionMessage,
|
||||
expectsCompletionMessage: shouldAnnounceCompletion,
|
||||
spawnMode,
|
||||
attachmentsDir: attachmentAbsDir,
|
||||
attachmentsRootDir: attachmentRootDir,
|
||||
|
||||
@@ -86,6 +86,35 @@ describe("createChannelOutboundRuntimeSend", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("accepts plugin outbound thread and reply aliases", async () => {
|
||||
const sendText = vi.fn(async () => ({ channel: "matrix", messageId: "$reply" }));
|
||||
mocks.loadChannelOutboundAdapter.mockResolvedValue({
|
||||
sendText,
|
||||
});
|
||||
|
||||
const { createChannelOutboundRuntimeSend } = await import("./channel-outbound-send.js");
|
||||
const runtimeSend = createChannelOutboundRuntimeSend({
|
||||
channelId: "matrix" as never,
|
||||
unavailableMessage: "unavailable",
|
||||
});
|
||||
|
||||
await runtimeSend.sendMessage("room:!ops:example.org", "hello thread", {
|
||||
cfg: {},
|
||||
accountId: "sut",
|
||||
replyToId: "$parent",
|
||||
threadId: "$thread-root",
|
||||
});
|
||||
|
||||
expect(sendText).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
accountId: "sut",
|
||||
replyToId: "$parent",
|
||||
threadId: "$thread-root",
|
||||
to: "room:!ops:example.org",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back to sendText when media is present but sendMedia is unavailable", async () => {
|
||||
const sendText = vi.fn(async () => ({ channel: "whatsapp", messageId: "wa-3" }));
|
||||
mocks.loadChannelOutboundAdapter.mockResolvedValue({
|
||||
|
||||
@@ -12,7 +12,9 @@ type RuntimeSendOpts = {
|
||||
mediaLocalRoots?: readonly string[];
|
||||
mediaReadFile?: (filePath: string) => Promise<Buffer>;
|
||||
accountId?: string;
|
||||
threadId?: string | number | null;
|
||||
messageThreadId?: string | number;
|
||||
replyToId?: string | number | null;
|
||||
replyToMessageId?: string | number;
|
||||
silent?: boolean;
|
||||
forceDocument?: boolean;
|
||||
@@ -20,6 +22,15 @@ type RuntimeSendOpts = {
|
||||
gatewayClientScopes?: readonly string[];
|
||||
};
|
||||
|
||||
function resolveRuntimeThreadId(opts: RuntimeSendOpts): string | number | undefined {
|
||||
return opts.messageThreadId ?? opts.threadId ?? undefined;
|
||||
}
|
||||
|
||||
function resolveRuntimeReplyToId(opts: RuntimeSendOpts): string | undefined {
|
||||
const raw = opts.replyToMessageId ?? opts.replyToId;
|
||||
return raw == null ? undefined : normalizeOptionalString(String(raw));
|
||||
}
|
||||
|
||||
export function createChannelOutboundRuntimeSend(params: {
|
||||
channelId: ChannelId;
|
||||
unavailableMessage: string;
|
||||
@@ -27,6 +38,8 @@ export function createChannelOutboundRuntimeSend(params: {
|
||||
return {
|
||||
sendMessage: async (to: string, text: string, opts: RuntimeSendOpts = {}) => {
|
||||
const outbound = await loadChannelOutboundAdapter(params.channelId);
|
||||
const threadId = resolveRuntimeThreadId(opts);
|
||||
const replyToId = resolveRuntimeReplyToId(opts);
|
||||
const hasMedia = Boolean(opts.mediaUrl);
|
||||
if (hasMedia && outbound?.sendMedia) {
|
||||
return await outbound.sendMedia({
|
||||
@@ -38,11 +51,8 @@ export function createChannelOutboundRuntimeSend(params: {
|
||||
mediaLocalRoots: opts.mediaLocalRoots,
|
||||
mediaReadFile: opts.mediaReadFile,
|
||||
accountId: opts.accountId,
|
||||
threadId: opts.messageThreadId,
|
||||
replyToId:
|
||||
opts.replyToMessageId == null
|
||||
? undefined
|
||||
: normalizeOptionalString(String(opts.replyToMessageId)),
|
||||
threadId,
|
||||
replyToId,
|
||||
silent: opts.silent,
|
||||
forceDocument: opts.forceDocument,
|
||||
gifPlayback: opts.gifPlayback,
|
||||
@@ -61,11 +71,8 @@ export function createChannelOutboundRuntimeSend(params: {
|
||||
mediaLocalRoots: opts.mediaLocalRoots,
|
||||
mediaReadFile: opts.mediaReadFile,
|
||||
accountId: opts.accountId,
|
||||
threadId: opts.messageThreadId,
|
||||
replyToId:
|
||||
opts.replyToMessageId == null
|
||||
? undefined
|
||||
: normalizeOptionalString(String(opts.replyToMessageId)),
|
||||
threadId,
|
||||
replyToId,
|
||||
silent: opts.silent,
|
||||
forceDocument: opts.forceDocument,
|
||||
gifPlayback: opts.gifPlayback,
|
||||
|
||||
@@ -437,7 +437,7 @@ describe("session binding service", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps the first live adapter authoritative until it unregisters", () => {
|
||||
it("keeps the newest live adapter authoritative until it unregisters", () => {
|
||||
const firstBinding = {
|
||||
bindingId: "first-binding",
|
||||
targetSessionKey: "agent:main",
|
||||
@@ -457,17 +457,30 @@ describe("session binding service", () => {
|
||||
targetSessionKey === "agent:main" ? [firstBinding] : [],
|
||||
resolveByConversation: () => null,
|
||||
};
|
||||
const secondBinding = {
|
||||
bindingId: "second-binding",
|
||||
targetSessionKey: "agent:main",
|
||||
targetKind: "session" as const,
|
||||
conversation: {
|
||||
channel: "demo-binding",
|
||||
accountId: "default",
|
||||
conversationId: "thread-2",
|
||||
},
|
||||
status: "active" as const,
|
||||
boundAt: 2,
|
||||
};
|
||||
const secondAdapter: SessionBindingAdapter = {
|
||||
channel: "Demo-Binding",
|
||||
accountId: "DEFAULT",
|
||||
listBySession: () => [],
|
||||
listBySession: (targetSessionKey) =>
|
||||
targetSessionKey === "agent:main" ? [secondBinding] : [],
|
||||
resolveByConversation: () => null,
|
||||
};
|
||||
|
||||
registerSessionBindingAdapter(firstAdapter);
|
||||
registerSessionBindingAdapter(secondAdapter);
|
||||
|
||||
expect(getSessionBindingService().listBySession("agent:main")).toEqual([firstBinding]);
|
||||
expect(getSessionBindingService().listBySession("agent:main")).toEqual([secondBinding]);
|
||||
|
||||
unregisterSessionBindingAdapter({
|
||||
channel: "demo-binding",
|
||||
@@ -529,13 +542,13 @@ describe("session binding service", () => {
|
||||
conversationId: "thread-1",
|
||||
}),
|
||||
});
|
||||
expect(firstBind).toHaveBeenCalledTimes(1);
|
||||
expect(secondBind).not.toHaveBeenCalled();
|
||||
expect(firstBind).not.toHaveBeenCalled();
|
||||
expect(secondBind).toHaveBeenCalledTimes(1);
|
||||
|
||||
first.unregisterSessionBindingAdapter({
|
||||
second.unregisterSessionBindingAdapter({
|
||||
channel: "demo-binding",
|
||||
accountId: "default",
|
||||
adapter: firstAdapter,
|
||||
adapter: secondAdapter,
|
||||
});
|
||||
|
||||
await expect(
|
||||
@@ -558,10 +571,10 @@ describe("session binding service", () => {
|
||||
expect(firstBind).toHaveBeenCalledTimes(1);
|
||||
expect(secondBind).toHaveBeenCalledTimes(1);
|
||||
|
||||
second.unregisterSessionBindingAdapter({
|
||||
first.unregisterSessionBindingAdapter({
|
||||
channel: "demo-binding",
|
||||
accountId: "default",
|
||||
adapter: secondAdapter,
|
||||
adapter: firstAdapter,
|
||||
});
|
||||
|
||||
await expect(
|
||||
|
||||
@@ -135,7 +135,7 @@ const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap<string, SessionBindingAdapt
|
||||
|
||||
function getActiveAdapterForKey(key: string): SessionBindingAdapter | null {
|
||||
const registrations = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key);
|
||||
return registrations?.[0]?.normalizedAdapter ?? null;
|
||||
return registrations?.at(-1)?.normalizedAdapter ?? null;
|
||||
}
|
||||
|
||||
export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void {
|
||||
@@ -210,7 +210,7 @@ function resolveAdapterForChannelAccount(params: {
|
||||
|
||||
function getActiveRegisteredAdapters(): SessionBindingAdapter[] {
|
||||
return [...ADAPTERS_BY_CHANNEL_ACCOUNT.values()]
|
||||
.map((registrations) => registrations[0]?.normalizedAdapter ?? null)
|
||||
.map((registrations) => registrations.at(-1)?.normalizedAdapter ?? null)
|
||||
.filter((adapter): adapter is SessionBindingAdapter => Boolean(adapter));
|
||||
}
|
||||
|
||||
|
||||
@@ -421,6 +421,12 @@ export type PluginHookSubagentSpawningResult =
|
||||
| {
|
||||
status: "ok";
|
||||
threadBindingReady?: boolean;
|
||||
deliveryOrigin?: {
|
||||
channel?: string;
|
||||
accountId?: string;
|
||||
to?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
}
|
||||
| {
|
||||
status: "error";
|
||||
|
||||
@@ -258,9 +258,11 @@ export function createHookRunner(
|
||||
if (next.status === "error") {
|
||||
return next;
|
||||
}
|
||||
const deliveryOrigin = next.deliveryOrigin ?? acc?.deliveryOrigin;
|
||||
return {
|
||||
status: "ok",
|
||||
threadBindingReady: Boolean(acc?.threadBindingReady || next.threadBindingReady),
|
||||
...(deliveryOrigin ? { deliveryOrigin } : {}),
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user