refactor(channels): route remaining turns through kernel

This commit is contained in:
Peter Steinberger
2026-04-29 22:53:29 +01:00
parent 9a9cd0c0ab
commit 9a3a341d93
19 changed files with 1206 additions and 623 deletions

View File

@@ -1714,50 +1714,127 @@ async function processMessageAfterDedupe(
},
},
});
await core.channel.turn.dispatchAssembled({
cfg: config,
await core.channel.turn.run({
channel: "bluebubbles",
accountId: account.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload, info) => {
const rawReplyToId =
privateApiEnabled && typeof payload.replyToId === "string"
? payload.replyToId.trim()
: "";
// Resolve short ID (e.g., "5") to full UUID, scoped to the chat
// this deliver path is already routing for (cross-chat guard).
const replyToMessageGuid = rawReplyToId
? resolveBlueBubblesMessageId(rawReplyToId, {
requireKnownShortId: true,
chatContext: {
chatGuid: chatGuidForActions ?? chatGuid,
chatIdentifier,
chatId,
},
})
: "";
const mediaList = resolveOutboundMediaUrls(payload);
if (mediaList.length > 0) {
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg: config,
channel: "bluebubbles",
accountId: account.accountId,
});
const text = sanitizeReplyDirectiveText(
core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode),
);
await sendMediaWithLeadingCaption({
mediaUrls: mediaList,
caption: text,
send: async ({ mediaUrl, caption }) => {
const cachedBody = (caption ?? "").trim() || "<media:attachment>";
raw: ctxPayload,
adapter: {
ingest: () => ({
id: String(ctxPayload.MessageSid ?? message.messageId),
timestamp: message.timestamp,
rawText: rawBody,
textForAgent: rawBody,
textForCommands: commandBody,
raw: ctxPayload,
}),
resolveTurn: () => ({
cfg: config,
channel: "bluebubbles",
accountId: account.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload, info) => {
const rawReplyToId =
privateApiEnabled && typeof payload.replyToId === "string"
? payload.replyToId.trim()
: "";
// Resolve short ID (e.g., "5") to full UUID, scoped to the chat
// this deliver path is already routing for (cross-chat guard).
const replyToMessageGuid = rawReplyToId
? resolveBlueBubblesMessageId(rawReplyToId, {
requireKnownShortId: true,
chatContext: {
chatGuid: chatGuidForActions ?? chatGuid,
chatIdentifier,
chatId,
},
})
: "";
const mediaList = resolveOutboundMediaUrls(payload);
if (mediaList.length > 0) {
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg: config,
channel: "bluebubbles",
accountId: account.accountId,
});
const text = sanitizeReplyDirectiveText(
core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode),
);
await sendMediaWithLeadingCaption({
mediaUrls: mediaList,
caption: text,
send: async ({ mediaUrl, caption }) => {
const cachedBody = (caption ?? "").trim() || "<media:attachment>";
const pendingId = rememberPendingOutboundMessageId({
accountId: account.accountId,
sessionKey: route.sessionKey,
outboundTarget,
chatGuid: chatGuidForActions ?? chatGuid,
chatIdentifier,
chatId,
snippet: cachedBody,
});
let result: Awaited<ReturnType<typeof sendBlueBubblesMedia>>;
try {
result = await sendBlueBubblesMedia({
cfg: config,
to: outboundTarget,
mediaUrl,
caption: caption ?? undefined,
replyToId: replyToMessageGuid || null,
accountId: account.accountId,
asVoice: payload.audioAsVoice === true,
});
} catch (err) {
forgetPendingOutboundMessageId(pendingId);
throw err;
}
if (maybeEnqueueOutboundMessageId(result.messageId, cachedBody)) {
forgetPendingOutboundMessageId(pendingId);
}
sentMessage = true;
statusSink?.({ lastOutboundAt: Date.now() });
if (info.kind === "block") {
restartTypingSoon();
}
},
});
return;
}
const textLimit =
account.config.textChunkLimit && account.config.textChunkLimit > 0
? account.config.textChunkLimit
: DEFAULT_TEXT_LIMIT;
const chunkMode = account.config.chunkMode ?? "length";
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg: config,
channel: "bluebubbles",
accountId: account.accountId,
});
const text = sanitizeReplyDirectiveText(
core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode),
);
const chunks =
chunkMode === "newline"
? resolveTextChunksWithFallback(
text,
core.channel.text.chunkTextWithMode(text, textLimit, chunkMode),
)
: resolveTextChunksWithFallback(
text,
core.channel.text.chunkMarkdownText(text, textLimit),
);
if (!chunks.length) {
return;
}
for (const chunk of chunks) {
const pendingId = rememberPendingOutboundMessageId({
accountId: account.accountId,
sessionKey: route.sessionKey,
@@ -1765,24 +1842,20 @@ async function processMessageAfterDedupe(
chatGuid: chatGuidForActions ?? chatGuid,
chatIdentifier,
chatId,
snippet: cachedBody,
snippet: chunk,
});
let result: Awaited<ReturnType<typeof sendBlueBubblesMedia>>;
let result: Awaited<ReturnType<typeof sendMessageBlueBubbles>>;
try {
result = await sendBlueBubblesMedia({
result = await sendMessageBlueBubbles(outboundTarget, chunk, {
cfg: config,
to: outboundTarget,
mediaUrl,
caption: caption ?? undefined,
replyToId: replyToMessageGuid || null,
accountId: account.accountId,
asVoice: payload.audioAsVoice === true,
replyToMessageGuid: replyToMessageGuid || undefined,
});
} catch (err) {
forgetPendingOutboundMessageId(pendingId);
throw err;
}
if (maybeEnqueueOutboundMessageId(result.messageId, cachedBody)) {
if (maybeEnqueueOutboundMessageId(result.messageId, chunk)) {
forgetPendingOutboundMessageId(pendingId);
}
sentMessage = true;
@@ -1790,101 +1863,43 @@ async function processMessageAfterDedupe(
if (info.kind === "block") {
restartTypingSoon();
}
},
});
return;
}
const textLimit =
account.config.textChunkLimit && account.config.textChunkLimit > 0
? account.config.textChunkLimit
: DEFAULT_TEXT_LIMIT;
const chunkMode = account.config.chunkMode ?? "length";
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg: config,
channel: "bluebubbles",
accountId: account.accountId,
});
const text = sanitizeReplyDirectiveText(
core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode),
);
const chunks =
chunkMode === "newline"
? resolveTextChunksWithFallback(
text,
core.channel.text.chunkTextWithMode(text, textLimit, chunkMode),
)
: resolveTextChunksWithFallback(
text,
core.channel.text.chunkMarkdownText(text, textLimit),
);
if (!chunks.length) {
return;
}
for (const chunk of chunks) {
const pendingId = rememberPendingOutboundMessageId({
accountId: account.accountId,
sessionKey: route.sessionKey,
outboundTarget,
chatGuid: chatGuidForActions ?? chatGuid,
chatIdentifier,
chatId,
snippet: chunk,
});
let result: Awaited<ReturnType<typeof sendMessageBlueBubbles>>;
try {
result = await sendMessageBlueBubbles(outboundTarget, chunk, {
cfg: config,
accountId: account.accountId,
replyToMessageGuid: replyToMessageGuid || undefined,
});
} catch (err) {
forgetPendingOutboundMessageId(pendingId);
throw err;
}
if (maybeEnqueueOutboundMessageId(result.messageId, chunk)) {
forgetPendingOutboundMessageId(pendingId);
}
sentMessage = true;
statusSink?.({ lastOutboundAt: Date.now() });
if (info.kind === "block") {
restartTypingSoon();
}
}
},
onError: (err, info) => {
// Flag the outer dedupe wrapper so it releases the claim instead
// of committing. Without this, a transient BlueBubbles send failure
// would permanently block replay-retry for 7 days and the user
// would never receive a reply to that message.
//
// Only the terminal `final` delivery represents the user-visible
// answer. The dispatcher continues past `tool` / `block` failures
// and may still deliver `final` successfully — releasing the
// dedupe claim for those would invite a replay that re-runs tool
// side effects and resends partially-delivered content.
if (info.kind === "final") {
dedupeSignal.deliveryFailed = true;
}
runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${sanitizeForLog(err)}`);
},
},
dispatcherOptions: {
...replyPipeline,
onReplyStart: typingCallbacks?.onReplyStart,
onIdle: typingCallbacks?.onIdle,
},
replyOptions: {
onModelSelected,
disableBlockStreaming:
typeof account.config.blockStreaming === "boolean"
? !account.config.blockStreaming
: undefined,
},
record: {
onRecordError: (err) => {
runtime.error?.(`[bluebubbles] failed updating session meta: ${sanitizeForLog(err)}`);
},
}
},
onError: (err, info) => {
// Flag the outer dedupe wrapper so it releases the claim instead
// of committing. Without this, a transient BlueBubbles send failure
// would permanently block replay-retry for 7 days and the user
// would never receive a reply to that message.
//
// Only the terminal `final` delivery represents the user-visible
// answer. The dispatcher continues past `tool` / `block` failures
// and may still deliver `final` successfully — releasing the
// dedupe claim for those would invite a replay that re-runs tool
// side effects and resends partially-delivered content.
if (info.kind === "final") {
dedupeSignal.deliveryFailed = true;
}
runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${sanitizeForLog(err)}`);
},
},
dispatcherOptions: {
...replyPipeline,
onReplyStart: typingCallbacks?.onReplyStart,
onIdle: typingCallbacks?.onIdle,
},
replyOptions: {
onModelSelected,
disableBlockStreaming:
typeof account.config.blockStreaming === "boolean"
? !account.config.blockStreaming
: undefined,
},
record: {
onRecordError: (err) => {
runtime.error?.(`[bluebubbles] failed updating session meta: ${sanitizeForLog(err)}`);
},
},
}),
},
});
} finally {

View File

@@ -195,35 +195,65 @@ async function processMessageWithPipeline(params: {
body: rawBody,
});
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: body,
BodyForAgent: rawBody,
RawBody: rawBody,
CommandBody: rawBody,
From: `googlechat:${senderId}`,
To: `googlechat:${spaceId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "channel" : "direct",
ConversationLabel: fromLabel,
SenderName: senderName || undefined,
SenderId: senderId,
SenderUsername: senderEmail,
WasMentioned: isGroup ? effectiveWasMentioned : undefined,
CommandAuthorized: commandAuthorized,
Provider: "googlechat",
Surface: "googlechat",
MessageSid: message.name,
MessageSidFull: message.name,
ReplyToId: message.thread?.name,
ReplyToIdFull: message.thread?.name,
MediaPath: mediaPath,
MediaType: mediaType,
MediaUrl: mediaPath,
GroupSpace: isGroup ? (space.displayName ?? undefined) : undefined,
GroupSystemPrompt: isGroup ? groupSystemPrompt : undefined,
OriginatingChannel: "googlechat",
OriginatingTo: `googlechat:${spaceId}`,
const ctxPayload = core.channel.turn.buildContext({
channel: "googlechat",
accountId: route.accountId,
messageId: message.name,
messageIdFull: message.name,
timestamp: event.eventTime ? Date.parse(event.eventTime) : undefined,
from: `googlechat:${senderId}`,
sender: {
id: senderId,
name: senderName || undefined,
username: senderEmail,
},
conversation: {
kind: isGroup ? "channel" : "direct",
id: spaceId,
label: fromLabel,
routePeer: {
kind: isGroup ? "group" : "direct",
id: spaceId,
},
},
route: {
agentId: route.agentId,
accountId: route.accountId,
routeSessionKey: route.sessionKey,
},
reply: {
to: `googlechat:${spaceId}`,
originatingTo: `googlechat:${spaceId}`,
replyToId: message.thread?.name,
replyToIdFull: message.thread?.name,
},
message: {
body,
bodyForAgent: rawBody,
rawBody,
commandBody: rawBody,
envelopeFrom: fromLabel,
},
media:
mediaPath || mediaType
? [
{
path: mediaPath,
url: mediaPath,
contentType: mediaType,
},
]
: undefined,
supplemental: {
groupSystemPrompt: isGroup ? groupSystemPrompt : undefined,
},
extra: {
ChatType: isGroup ? "channel" : "direct",
WasMentioned: isGroup ? effectiveWasMentioned : undefined,
CommandAuthorized: commandAuthorized,
GroupSubject: undefined,
GroupSpace: isGroup ? (space.displayName ?? undefined) : undefined,
},
});
// Typing indicator setup
@@ -265,47 +295,60 @@ async function processMessageWithPipeline(params: {
accountId: route.accountId,
});
await core.channel.turn.dispatchAssembled({
cfg: config,
await core.channel.turn.runResolved({
channel: "googlechat",
accountId: route.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverGoogleChatReply({
payload,
account,
spaceId,
runtime,
core,
config,
statusSink,
typingMessageName,
});
// Only use typing message for first delivery
typingMessageName = undefined;
},
onError: (err, info) => {
runtime.error?.(
`[${account.accountId}] Google Chat ${info.kind} reply failed: ${String(err)}`,
);
},
raw: message,
input: {
id: message.name ?? spaceId,
timestamp: event.eventTime ? Date.parse(event.eventTime) : undefined,
rawText: rawBody,
textForAgent: rawBody,
textForCommands: rawBody,
raw: message,
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`googlechat: failed updating session meta: ${String(err)}`);
resolveTurn: () => ({
cfg: config,
channel: "googlechat",
accountId: route.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverGoogleChatReply({
payload,
account,
spaceId,
runtime,
core,
config,
statusSink,
typingMessageName,
});
// Only use typing message for first delivery
typingMessageName = undefined;
},
onError: (err, info) => {
runtime.error?.(
`[${account.accountId}] Google Chat ${info.kind} reply failed: ${String(err)}`,
);
},
},
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`googlechat: failed updating session meta: ${String(err)}`);
},
},
}),
});
}

View File

@@ -1,6 +1,7 @@
import type { IncomingMessage, ServerResponse } from "node:http";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import {
buildChannelTurnContextMock,
dispatchReplyWithBufferedBlockDispatcher,
finalizeInboundContextMock,
registerPluginHttpRouteMock,
@@ -35,6 +36,7 @@ describe("Synology channel wiring integration", () => {
beforeEach(() => {
registerPluginHttpRouteMock.mockClear();
dispatchReplyWithBufferedBlockDispatcher.mockClear();
buildChannelTurnContextMock.mockClear();
finalizeInboundContextMock.mockClear();
resolveAgentRouteMock.mockClear();
setSynologyRuntimeConfigForTest({});

View File

@@ -19,6 +19,49 @@ export const dispatchReplyWithBufferedBlockDispatcher: Mock<
export const finalizeInboundContextMock: Mock<
(ctx: Record<string, unknown>) => Record<string, unknown>
> = vi.fn((ctx) => ctx);
export const buildChannelTurnContextMock: Mock<
(params: {
channel: string;
accountId?: string;
timestamp?: number;
from: string;
sender: { id: string; name?: string };
conversation: { kind: string; label?: string };
route: {
accountId?: string;
routeSessionKey: string;
dispatchSessionKey?: string;
};
reply: { to: string; originatingTo: string };
message: {
rawBody: string;
bodyForAgent?: string;
commandBody?: string;
};
extra?: Record<string, unknown>;
}) => Record<string, unknown>
> = vi.fn((params) =>
finalizeInboundContextMock({
Body: params.message.rawBody,
BodyForAgent: params.message.bodyForAgent ?? params.message.rawBody,
RawBody: params.message.rawBody,
CommandBody: params.message.commandBody ?? params.message.rawBody,
From: params.from,
To: params.reply.to,
SessionKey: params.route.dispatchSessionKey ?? params.route.routeSessionKey,
AccountId: params.route.accountId ?? params.accountId,
OriginatingChannel: params.channel,
OriginatingTo: params.reply.originatingTo,
ChatType: params.conversation.kind,
SenderName: params.sender.name,
SenderId: params.sender.id,
Provider: params.channel,
Surface: params.channel,
ConversationLabel: params.conversation.label,
Timestamp: params.timestamp,
...params.extra,
}),
);
export const resolveAgentRouteMock: Mock<
(params: { accountId?: string }) => { agentId: string; sessionKey: string; accountId: string }
> = vi.fn((params) => {
@@ -100,6 +143,60 @@ vi.mock("./runtime.js", () => ({
recordInboundSession: vi.fn(async () => undefined),
},
turn: {
run: vi.fn(async (params) => {
const input = await params.adapter.ingest(params.raw);
if (!input) {
return { admission: { kind: "drop", reason: "ingest-null" }, dispatched: false };
}
const resolved = await params.adapter.resolveTurn(input, {
kind: "message",
canStartAgentTurn: true,
});
const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({
ctx: resolved.ctxPayload,
cfg: mockRuntimeConfig,
dispatcherOptions: {
...resolved.dispatcherOptions,
deliver: resolved.delivery.deliver,
onError: resolved.delivery.onError,
},
});
return {
admission: { kind: "dispatch" },
dispatched: true,
dispatchResult,
ctxPayload: resolved.ctxPayload,
routeSessionKey: resolved.routeSessionKey,
};
}),
runResolved: vi.fn(async (params) => {
const input =
typeof params.input === "function" ? await params.input(params.raw) : params.input;
if (!input) {
return { admission: { kind: "drop", reason: "ingest-null" }, dispatched: false };
}
const resolved = await params.resolveTurn(input, {
kind: "message",
canStartAgentTurn: true,
});
const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({
ctx: resolved.ctxPayload,
cfg: mockRuntimeConfig,
dispatcherOptions: {
...resolved.dispatcherOptions,
deliver: resolved.delivery.deliver,
onError: resolved.delivery.onError,
},
});
return {
admission: { kind: "dispatch" },
dispatched: true,
dispatchResult,
ctxPayload: resolved.ctxPayload,
routeSessionKey: resolved.routeSessionKey,
};
}),
buildContext: buildChannelTurnContextMock,
dispatchAssembled: vi.fn(async (params) => ({
dispatchResult: await params.dispatchReplyWithBufferedBlockDispatcher({
ctx: params.ctxPayload,

View File

@@ -1,7 +1,3 @@
import type { ResolvedSynologyChatAccount } from "./types.js";
const CHANNEL_ID = "synology-chat";
export type SynologyInboundMessage = {
body: string;
from: string;
@@ -13,30 +9,4 @@ export type SynologyInboundMessage = {
chatUserId?: string;
};
export function buildSynologyChatInboundContext<TContext>(params: {
finalizeInboundContext: (ctx: Record<string, unknown>) => TContext;
account: ResolvedSynologyChatAccount;
msg: SynologyInboundMessage;
sessionKey: string;
}): TContext {
const { account, msg, sessionKey } = params;
return params.finalizeInboundContext({
Body: msg.body,
RawBody: msg.body,
CommandBody: msg.body,
From: `synology-chat:${msg.from}`,
To: `synology-chat:${msg.from}`,
SessionKey: sessionKey,
AccountId: account.accountId,
OriginatingChannel: CHANNEL_ID,
OriginatingTo: `synology-chat:${msg.from}`,
ChatType: msg.chatType,
SenderName: msg.senderName,
SenderId: msg.from,
Provider: CHANNEL_ID,
Surface: CHANNEL_ID,
ConversationLabel: msg.senderName || msg.from,
Timestamp: Date.now(),
CommandAuthorized: msg.commandAuthorized,
});
}
export type { ResolvedSynologyChatAccount } from "./types.js";

View File

@@ -1,6 +1,6 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { sendMessage } from "./client.js";
import { buildSynologyChatInboundContext, type SynologyInboundMessage } from "./inbound-context.js";
import type { SynologyInboundMessage } from "./inbound-context.js";
import { getSynologyRuntime } from "./runtime.js";
import { buildSynologyChatInboundSessionKey } from "./session-key.js";
import type { ResolvedSynologyChatAccount } from "./types.js";
@@ -71,46 +71,97 @@ export async function dispatchSynologyChatInboundTurn(params: {
account: params.account,
userId: params.msg.from,
});
const msgCtx = buildSynologyChatInboundContext({
finalizeInboundContext: resolved.rt.channel.reply.finalizeInboundContext,
account: params.account,
msg: params.msg,
sessionKey: resolved.sessionKey,
});
const storePath = resolved.rt.channel.session.resolveStorePath(currentCfg.session?.store, {
agentId: resolved.route.agentId,
});
await resolved.rt.channel.turn.dispatchAssembled({
cfg: currentCfg,
await resolved.rt.channel.turn.runResolved({
channel: CHANNEL_ID,
accountId: params.account.accountId,
agentId: resolved.route.agentId,
routeSessionKey: resolved.route.sessionKey,
storePath,
ctxPayload: msgCtx,
recordInboundSession: resolved.rt.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
resolved.rt.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverSynologyChatReply({
account: params.account,
sendUserId,
payload,
});
},
},
dispatcherOptions: {
onReplyStart: () => {
params.log?.info?.(`Agent reply started for ${params.msg.from}`);
},
},
record: {
onRecordError: (err) => {
params.log?.info?.(`Session metadata update failed for ${params.msg.from}`, err);
},
raw: params.msg,
input: (msg) => ({
id: `${params.account.accountId}:${msg.from}`,
timestamp: Date.now(),
rawText: msg.body,
textForAgent: msg.body,
textForCommands: msg.body,
raw: msg,
}),
resolveTurn: (input) => {
const chatKind =
params.msg.chatType === "group" || params.msg.chatType === "channel"
? params.msg.chatType
: "direct";
const msgCtx = resolved.rt.channel.turn.buildContext({
channel: CHANNEL_ID,
accountId: params.account.accountId,
timestamp: input.timestamp,
from: `synology-chat:${params.msg.from}`,
sender: {
id: params.msg.from,
name: params.msg.senderName,
},
conversation: {
kind: chatKind,
id: params.msg.from,
label: params.msg.senderName || params.msg.from,
routePeer: {
kind: "direct",
id: params.msg.from,
},
},
route: {
agentId: resolved.route.agentId,
accountId: params.account.accountId,
routeSessionKey: resolved.sessionKey,
dispatchSessionKey: resolved.sessionKey,
},
reply: {
to: `synology-chat:${params.msg.from}`,
originatingTo: `synology-chat:${params.msg.from}`,
},
message: {
rawBody: input.rawText,
commandBody: input.textForCommands,
bodyForAgent: input.textForAgent,
envelopeFrom: params.msg.senderName,
},
extra: {
ChatType: params.msg.chatType,
CommandAuthorized: params.msg.commandAuthorized,
},
});
const storePath = resolved.rt.channel.session.resolveStorePath(currentCfg.session?.store, {
agentId: resolved.route.agentId,
});
return {
cfg: currentCfg,
channel: CHANNEL_ID,
accountId: params.account.accountId,
agentId: resolved.route.agentId,
routeSessionKey: resolved.route.sessionKey,
storePath,
ctxPayload: msgCtx,
recordInboundSession: resolved.rt.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
resolved.rt.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverSynologyChatReply({
account: params.account,
sendUserId,
payload,
});
},
},
dispatcherOptions: {
onReplyStart: () => {
params.log?.info?.(`Agent reply started for ${params.msg.from}`);
},
},
record: {
onRecordError: (err) => {
params.log?.info?.(`Session metadata update failed for ${params.msg.from}`, err);
},
},
};
},
});

View File

@@ -516,31 +516,58 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<v
// Strip bot ship mention for CommandBody so "/status" is recognized as command-only
const commandBody = isGroup ? stripBotMention(messageText, botShipName) : messageText;
const tlonConversationId = isGroup ? (groupChannel ?? channelNest ?? senderShip) : senderShip;
const rawTurnMessage = {
messageId,
messageText,
timestamp,
};
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: body,
RawBody: messageText,
CommandBody: commandBody,
From: isGroup ? `tlon:group:${groupChannel}` : `tlon:${senderShip}`,
To: `tlon:${botShipName}`,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
ConversationLabel: fromLabel,
SenderName: senderShip,
SenderId: senderShip,
SenderRole: senderRole,
CommandAuthorized: commandAuthorized,
CommandSource: "text" as const,
Provider: "tlon",
Surface: "tlon",
MessageSid: messageId,
// Include downloaded media attachments
...(attachments.length > 0 && { Attachments: attachments }),
OriginatingChannel: "tlon",
OriginatingTo: `tlon:${isGroup ? groupChannel : botShipName}`,
// Include thread context for automatic reply routing
...(parentId && { ThreadId: parentId, ReplyToId: parentId }),
const ctxPayload = core.channel.turn.buildContext({
channel: "tlon",
accountId: route.accountId,
messageId,
timestamp,
from: isGroup ? `tlon:group:${groupChannel}` : `tlon:${senderShip}`,
sender: {
id: senderShip,
name: senderShip,
roles: [senderRole],
},
conversation: {
kind: isGroup ? "group" : "direct",
id: tlonConversationId,
label: fromLabel,
routePeer: {
kind: isGroup ? "group" : "direct",
id: tlonConversationId,
},
},
route: {
agentId: route.agentId,
accountId: route.accountId,
routeSessionKey: route.sessionKey,
},
reply: {
to: `tlon:${botShipName}`,
originatingTo: `tlon:${isGroup ? groupChannel : botShipName}`,
replyToId: parentId ?? undefined,
},
message: {
body,
bodyForAgent: commandBody,
rawBody: messageText,
commandBody,
envelopeFrom: fromLabel,
},
extra: {
GroupSubject: undefined,
SenderRole: senderRole,
CommandAuthorized: commandAuthorized,
CommandSource: "text" as const,
...(attachments.length > 0 && { Attachments: attachments }),
...(parentId && { ThreadId: parentId }),
},
});
const dispatchStartTime = Date.now();
@@ -554,76 +581,96 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<v
agentId: route.agentId,
});
await core.channel.turn.dispatchAssembled({
cfg,
await core.channel.turn.run({
channel: "tlon",
accountId: route.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload: ReplyPayload) => {
let replyText = payload.text;
if (!replyText) {
return;
}
raw: rawTurnMessage,
adapter: {
ingest: (raw) => ({
id: raw.messageId,
timestamp: raw.timestamp,
rawText: raw.messageText,
textForAgent: commandBody,
textForCommands: commandBody,
raw,
}),
resolveTurn: () => ({
cfg,
channel: "tlon",
accountId: route.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload: ReplyPayload) => {
let replyText = payload.text;
if (!replyText) {
return;
}
// Use settings store value if set, otherwise fall back to file config
const showSignature = effectiveShowModelSig;
if (showSignature) {
const extPayload = payload as {
metadata?: { model?: string };
model?: string;
};
const defaultModel = cfg.agents?.defaults?.model;
const modelInfo =
extPayload.metadata?.model ||
extPayload.model ||
(typeof defaultModel === "string" ? defaultModel : defaultModel?.primary);
replyText = `${replyText}\n\n_[Generated by ${formatModelName(modelInfo)}]_`;
}
// Use settings store value if set, otherwise fall back to file config
const showSignature = effectiveShowModelSig;
if (showSignature) {
const extPayload = payload as {
metadata?: { model?: string };
model?: string;
};
const defaultModel = cfg.agents?.defaults?.model;
const modelInfo =
extPayload.metadata?.model ||
extPayload.model ||
(typeof defaultModel === "string" ? defaultModel : defaultModel?.primary);
replyText = `${replyText}\n\n_[Generated by ${formatModelName(modelInfo)}]_`;
}
if (isGroup && groupChannel) {
const parsed = parseChannelNest(groupChannel);
if (!parsed) {
return;
}
await sendGroupMessage({
api: api,
fromShip: botShipName,
hostShip: parsed.hostShip,
channelName: parsed.channelName,
text: replyText,
replyToId: parentId ?? undefined,
});
// Track thread participation for future replies without mention
if (parentId) {
participatedThreads.add(parentId);
runtime.log?.(`[tlon] Now tracking thread for future replies: ${parentId}`);
}
} else {
await sendDm({ api: api, fromShip: botShipName, toShip: senderShip, text: replyText });
}
},
onError: (err, info) => {
const dispatchDuration = Date.now() - dispatchStartTime;
runtime.error?.(
`[tlon] ${info.kind} reply failed after ${dispatchDuration}ms: ${String(err)}`,
);
},
},
dispatcherOptions: {
responsePrefix,
humanDelay,
},
record: {
onRecordError: (err) => {
runtime.error?.(`[tlon] failed updating session meta: ${String(err)}`);
},
if (isGroup && groupChannel) {
const parsed = parseChannelNest(groupChannel);
if (!parsed) {
return;
}
await sendGroupMessage({
api: api,
fromShip: botShipName,
hostShip: parsed.hostShip,
channelName: parsed.channelName,
text: replyText,
replyToId: parentId ?? undefined,
});
// Track thread participation for future replies without mention
if (parentId) {
participatedThreads.add(parentId);
runtime.log?.(`[tlon] Now tracking thread for future replies: ${parentId}`);
}
} else {
await sendDm({
api: api,
fromShip: botShipName,
toShip: senderShip,
text: replyText,
});
}
},
onError: (err, info) => {
const dispatchDuration = Date.now() - dispatchStartTime;
runtime.error?.(
`[tlon] ${info.kind} reply failed after ${dispatchDuration}ms: ${String(err)}`,
);
},
},
dispatcherOptions: {
responsePrefix,
humanDelay,
},
record: {
onRecordError: (err) => {
runtime.error?.(`[tlon] failed updating session meta: ${String(err)}`);
},
},
}),
},
});
};

View File

@@ -51,116 +51,126 @@ async function processTwitchMessage(params: {
const { message, account, accountId, config, runtime, core, statusSink } = params;
const cfg = config as OpenClawConfig;
const route = core.channel.routing.resolveAgentRoute({
cfg,
await core.channel.turn.runResolved({
channel: "twitch",
accountId,
peer: {
kind: "group", // Twitch chat is always group-like
id: message.channel,
},
});
const rawBody = message.message;
const senderId = message.userId ?? message.username;
const body = core.channel.reply.formatAgentEnvelope({
channel: "Twitch",
from: message.displayName ?? message.username,
timestamp: message.timestamp?.getTime(),
envelope: core.channel.reply.resolveEnvelopeFormatOptions(cfg),
body: rawBody,
});
const ctxPayload = core.channel.turn.buildContext({
channel: "twitch",
accountId,
messageId: message.id,
timestamp: message.timestamp?.getTime(),
from: `twitch:user:${senderId}`,
sender: {
id: senderId,
name: message.displayName ?? message.username,
username: message.username,
},
conversation: {
kind: "group",
id: message.channel,
label: message.channel,
routePeer: {
kind: "group",
id: message.channel,
},
},
route: {
agentId: route.agentId,
accountId: route.accountId,
routeSessionKey: route.sessionKey,
},
reply: {
to: `twitch:channel:${message.channel}`,
originatingTo: `twitch:channel:${message.channel}`,
},
message: {
body,
rawBody,
bodyForAgent: rawBody,
commandBody: rawBody,
envelopeFrom: message.displayName ?? message.username,
},
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg,
channel: "twitch",
accountId,
});
const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({
cfg,
agentId: route.agentId,
channel: "twitch",
accountId,
});
await core.channel.turn.dispatchAssembled({
cfg,
channel: "twitch",
accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverTwitchReply({
payload,
channel: message.channel,
account,
accountId,
config,
tableMode,
runtime,
statusSink,
});
},
onError: (err, info) => {
runtime.error?.(`Twitch ${info.kind} reply failed: ${String(err)}`);
},
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`Failed updating session meta: ${String(err)}`);
},
raw: message,
input: (incoming) => ({
id: incoming.id ?? `${incoming.channel}:${incoming.timestamp?.getTime() ?? Date.now()}`,
timestamp: incoming.timestamp?.getTime(),
rawText: incoming.message,
textForAgent: incoming.message,
textForCommands: incoming.message,
raw: incoming,
}),
resolveTurn: (input) => {
const route = core.channel.routing.resolveAgentRoute({
cfg,
channel: "twitch",
accountId,
peer: {
kind: "group",
id: message.channel,
},
});
const senderId = message.userId ?? message.username;
const fromLabel = message.displayName ?? message.username;
const body = core.channel.reply.formatAgentEnvelope({
channel: "Twitch",
from: fromLabel,
timestamp: input.timestamp,
envelope: core.channel.reply.resolveEnvelopeFormatOptions(cfg),
body: input.rawText,
});
const ctxPayload = core.channel.turn.buildContext({
channel: "twitch",
accountId,
messageId: input.id,
timestamp: input.timestamp,
from: `twitch:user:${senderId}`,
sender: {
id: senderId,
name: fromLabel,
username: message.username,
},
conversation: {
kind: "group",
id: message.channel,
label: message.channel,
routePeer: {
kind: "group",
id: message.channel,
},
},
route: {
agentId: route.agentId,
accountId: route.accountId,
routeSessionKey: route.sessionKey,
},
reply: {
to: `twitch:channel:${message.channel}`,
originatingTo: `twitch:channel:${message.channel}`,
},
message: {
body,
rawBody: input.rawText,
bodyForAgent: input.textForAgent,
commandBody: input.textForCommands,
envelopeFrom: fromLabel,
},
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg,
channel: "twitch",
accountId,
});
const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({
cfg,
agentId: route.agentId,
channel: "twitch",
accountId,
});
return {
cfg,
channel: "twitch",
accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverTwitchReply({
payload,
channel: message.channel,
account,
accountId,
config,
tableMode,
runtime,
statusSink,
});
},
onError: (err, info) => {
runtime.error?.(`Twitch ${info.kind} reply failed: ${String(err)}`);
},
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`Failed updating session meta: ${String(err)}`);
},
},
};
},
});
}

View File

@@ -92,20 +92,17 @@ describe("Zalo reply-once lifecycle", () => {
},
);
expect(finalizeInboundContextMock).toHaveBeenCalledTimes(1);
expect(finalizeInboundContextMock).toHaveBeenCalledWith(
expect.objectContaining({
AccountId: "acct-zalo-lifecycle",
SessionKey: "agent:main:zalo:direct:dm-chat-1",
MessageSid: expect.stringContaining("zalo-replay-"),
From: "zalo:user-1",
To: "zalo:dm-chat-1",
}),
);
expect(recordInboundSessionMock).toHaveBeenCalledTimes(1);
expect(recordInboundSessionMock).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:main:zalo:direct:dm-chat-1",
ctx: expect.objectContaining({
AccountId: "acct-zalo-lifecycle",
SessionKey: "agent:main:zalo:direct:dm-chat-1",
MessageSid: expect.stringContaining("zalo-replay-"),
From: "zalo:user-1",
To: "zalo:dm-chat-1",
}),
}),
);
expect(sendMessageMock).toHaveBeenCalledTimes(1);

View File

@@ -582,28 +582,55 @@ async function processMessageWithPipeline(params: ZaloMessagePipelineParams): Pr
body: rawBody,
});
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: body,
BodyForAgent: rawBody,
RawBody: rawBody,
CommandBody: rawBody,
From: isGroup ? `zalo:group:${chatId}` : `zalo:${senderId}`,
To: `zalo:${chatId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
ConversationLabel: fromLabel,
SenderName: senderName || undefined,
SenderId: senderId,
CommandAuthorized: commandAuthorized,
Provider: "zalo",
Surface: "zalo",
MessageSid: message_id,
MediaPath: mediaPath,
MediaType: mediaType,
MediaUrl: mediaPath,
OriginatingChannel: "zalo",
OriginatingTo: `zalo:${chatId}`,
const ctxPayload = core.channel.turn.buildContext({
channel: "zalo",
accountId: route.accountId,
messageId: message_id,
timestamp: date ? date * 1000 : undefined,
from: isGroup ? `zalo:group:${chatId}` : `zalo:${senderId}`,
sender: {
id: senderId,
name: senderName || undefined,
},
conversation: {
kind: isGroup ? "group" : "direct",
id: chatId,
label: fromLabel,
routePeer: {
kind: isGroup ? "group" : "direct",
id: chatId,
},
},
route: {
agentId: route.agentId,
accountId: route.accountId,
routeSessionKey: route.sessionKey,
},
reply: {
to: `zalo:${chatId}`,
originatingTo: `zalo:${chatId}`,
},
message: {
body,
bodyForAgent: rawBody,
rawBody,
commandBody: rawBody,
envelopeFrom: fromLabel,
},
media:
mediaPath || mediaType
? [
{
path: mediaPath,
url: mediaPath,
contentType: mediaType,
},
]
: undefined,
extra: {
CommandAuthorized: commandAuthorized,
GroupSubject: undefined,
},
});
const tableMode = core.channel.text.resolveMarkdownTableMode({
@@ -640,50 +667,63 @@ async function processMessageWithPipeline(params: ZaloMessagePipelineParams): Pr
},
});
await core.channel.turn.dispatchAssembled({
cfg: config,
await core.channel.turn.runResolved({
channel: "zalo",
accountId: account.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverZaloReply({
payload,
token,
chatId,
runtime,
core,
config,
webhookUrl: params.webhookUrl,
webhookPath: params.webhookPath,
proxyUrl: account.config.proxy,
mediaMaxBytes: params.mediaMaxMb * 1024 * 1024,
canHostMedia: params.canHostMedia,
accountId: account.accountId,
statusSink,
fetcher,
tableMode,
});
},
onError: (err, info) => {
runtime.error?.(`[${account.accountId}] Zalo ${info.kind} reply failed: ${String(err)}`);
},
raw: message,
input: {
id: message_id,
timestamp: date ? date * 1000 : undefined,
rawText: rawBody,
textForAgent: rawBody,
textForCommands: rawBody,
raw: message,
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`zalo: failed updating session meta: ${String(err)}`);
resolveTurn: () => ({
cfg: config,
channel: "zalo",
accountId: account.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverZaloReply({
payload,
token,
chatId,
runtime,
core,
config,
webhookUrl: params.webhookUrl,
webhookPath: params.webhookPath,
proxyUrl: account.config.proxy,
mediaMaxBytes: params.mediaMaxMb * 1024 * 1024,
canHostMedia: params.canHostMedia,
accountId: account.accountId,
statusSink,
fetcher,
tableMode,
});
},
onError: (err, info) => {
runtime.error?.(`[${account.accountId}] Zalo ${info.kind} reply failed: ${String(err)}`);
},
},
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`zalo: failed updating session meta: ${String(err)}`);
},
},
}),
});
}

View File

@@ -122,6 +122,50 @@ export function createImageUpdate(params?: {
export function createImageLifecycleCore() {
const finalizeInboundContextMock = vi.fn((ctx: Record<string, unknown>) => ctx);
const buildChannelTurnContextMock = vi.fn(
(params: {
channel: string;
accountId?: string;
messageId?: string;
timestamp?: number;
from: string;
sender: { id: string; name?: string };
conversation: { kind: string; label?: string };
route: {
accountId?: string;
routeSessionKey: string;
dispatchSessionKey?: string;
};
reply: { to: string; originatingTo: string };
message: { body?: string; rawBody: string; bodyForAgent?: string; commandBody?: string };
media?: Array<{ path?: string; url?: string; contentType?: string }>;
extra?: Record<string, unknown>;
}) =>
finalizeInboundContextMock({
Body: params.message.body ?? params.message.rawBody,
BodyForAgent: params.message.bodyForAgent ?? params.message.rawBody,
RawBody: params.message.rawBody,
CommandBody: params.message.commandBody ?? params.message.rawBody,
From: params.from,
To: params.reply.to,
SessionKey: params.route.dispatchSessionKey ?? params.route.routeSessionKey,
AccountId: params.route.accountId ?? params.accountId,
ChatType: params.conversation.kind,
ConversationLabel: params.conversation.label,
SenderName: params.sender.name,
SenderId: params.sender.id,
Provider: params.channel,
Surface: params.channel,
MessageSid: params.messageId,
Timestamp: params.timestamp,
MediaPath: params.media?.[0]?.path,
MediaType: params.media?.[0]?.contentType,
MediaUrl: params.media?.[0]?.url ?? params.media?.[0]?.path,
OriginatingChannel: params.channel,
OriginatingTo: params.reply.originatingTo,
...params.extra,
}),
);
const recordInboundSessionMock = vi.fn(async () => undefined);
const fetchRemoteMediaMock = vi.fn(async () => ({
buffer: Buffer.from("image-bytes"),
@@ -188,6 +232,103 @@ export function createImageLifecycleCore() {
) as unknown as PluginRuntime["channel"]["reply"]["dispatchReplyWithBufferedBlockDispatcher"],
},
turn: {
run: vi.fn(async (params: Parameters<PluginRuntime["channel"]["turn"]["run"]>[0]) => {
const input = await params.adapter.ingest(params.raw);
if (!input) {
return {
admission: { kind: "drop" as const, reason: "ingest-null" },
dispatched: false,
};
}
const resolved = await params.adapter.resolveTurn(
input,
{
kind: "message",
canStartAgentTurn: true,
},
{},
);
await resolved.recordInboundSession({
storePath: resolved.storePath,
sessionKey: resolved.ctxPayload.SessionKey ?? resolved.routeSessionKey,
ctx: resolved.ctxPayload,
groupResolution: resolved.record?.groupResolution,
createIfMissing: resolved.record?.createIfMissing,
updateLastRoute: resolved.record?.updateLastRoute,
onRecordError: resolved.record?.onRecordError ?? (() => undefined),
});
const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({
ctx: resolved.ctxPayload,
cfg: resolved.cfg,
dispatcherOptions: {
...resolved.dispatcherOptions,
deliver: async (payload, info) => {
await resolved.delivery.deliver(payload, info);
},
onError: resolved.delivery.onError,
},
replyOptions: resolved.replyOptions,
replyResolver: resolved.replyResolver,
});
return {
admission: { kind: "dispatch" as const },
dispatched: true,
ctxPayload: resolved.ctxPayload,
routeSessionKey: resolved.routeSessionKey,
dispatchResult,
};
}) as unknown as PluginRuntime["channel"]["turn"]["run"],
runResolved: vi.fn(
async (params: Parameters<PluginRuntime["channel"]["turn"]["runResolved"]>[0]) => {
const input =
typeof params.input === "function" ? await params.input(params.raw) : params.input;
if (!input) {
return {
admission: { kind: "drop" as const, reason: "ingest-null" },
dispatched: false,
};
}
const resolved = await params.resolveTurn(
input,
{
kind: "message",
canStartAgentTurn: true,
},
{},
);
await resolved.recordInboundSession({
storePath: resolved.storePath,
sessionKey: resolved.ctxPayload.SessionKey ?? resolved.routeSessionKey,
ctx: resolved.ctxPayload,
groupResolution: resolved.record?.groupResolution,
createIfMissing: resolved.record?.createIfMissing,
updateLastRoute: resolved.record?.updateLastRoute,
onRecordError: resolved.record?.onRecordError ?? (() => undefined),
});
const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({
ctx: resolved.ctxPayload,
cfg: resolved.cfg,
dispatcherOptions: {
...resolved.dispatcherOptions,
deliver: async (payload, info) => {
await resolved.delivery.deliver(payload, info);
},
onError: resolved.delivery.onError,
},
replyOptions: resolved.replyOptions,
replyResolver: resolved.replyResolver,
});
return {
admission: { kind: "dispatch" as const },
dispatched: true,
ctxPayload: resolved.ctxPayload,
routeSessionKey: resolved.routeSessionKey,
dispatchResult,
};
},
) as unknown as PluginRuntime["channel"]["turn"]["runResolved"],
buildContext:
buildChannelTurnContextMock as unknown as PluginRuntime["channel"]["turn"]["buildContext"],
dispatchAssembled: vi.fn(
async (turn: Parameters<PluginRuntime["channel"]["turn"]["dispatchAssembled"]>[0]) => {
await turn.recordInboundSession({

View File

@@ -122,6 +122,68 @@ function installRuntime(params: {
};
},
);
const runTurn = vi.fn(async (params: Parameters<PluginRuntime["channel"]["turn"]["run"]>[0]) => {
const input = await params.adapter.ingest(params.raw);
if (!input) {
return { admission: { kind: "drop" as const, reason: "ingest-null" }, dispatched: false };
}
const resolved = await params.adapter.resolveTurn(
input,
{
kind: "message",
canStartAgentTurn: true,
},
{},
);
return await dispatchAssembled(resolved);
});
const runResolvedTurn = vi.fn(
async (params: Parameters<PluginRuntime["channel"]["turn"]["runResolved"]>[0]) => {
const input =
typeof params.input === "function" ? await params.input(params.raw) : params.input;
if (!input) {
return {
admission: { kind: "drop" as const, reason: "ingest-null" },
dispatched: false,
};
}
const resolved = await params.resolveTurn(
input,
{
kind: "message",
canStartAgentTurn: true,
},
{},
);
return await dispatchAssembled(resolved);
},
);
const buildContext = vi.fn(
(params: Parameters<PluginRuntime["channel"]["turn"]["buildContext"]>[0]) =>
({
Body: params.message.body ?? params.message.rawBody,
BodyForAgent: params.message.bodyForAgent ?? params.message.rawBody,
InboundHistory: params.message.inboundHistory,
RawBody: params.message.rawBody,
CommandBody: params.message.commandBody ?? params.message.rawBody,
BodyForCommands: params.message.commandBody ?? params.message.rawBody,
From: params.from,
To: params.reply.to,
SessionKey: params.route.dispatchSessionKey ?? params.route.routeSessionKey,
AccountId: params.route.accountId ?? params.accountId,
ChatType: params.conversation.kind,
ConversationLabel: params.conversation.label,
SenderName: params.sender.name,
SenderId: params.sender.id,
Provider: params.provider ?? params.channel,
Surface: params.surface ?? params.provider ?? params.channel,
MessageSid: params.messageId,
MessageSidFull: params.messageIdFull,
OriginatingChannel: params.channel,
OriginatingTo: params.reply.originatingTo,
...params.extra,
}) as ReturnType<PluginRuntime["channel"]["turn"]["buildContext"]>,
);
const buildAgentSessionKey = vi.fn(
(input: {
agentId: string;
@@ -201,6 +263,9 @@ function installRuntime(params: {
dispatchReplyWithBufferedBlockDispatcher,
},
turn: {
run: runTurn as unknown as PluginRuntime["channel"]["turn"]["run"],
runResolved: runResolvedTurn as unknown as PluginRuntime["channel"]["turn"]["runResolved"],
buildContext: buildContext as unknown as PluginRuntime["channel"]["turn"]["buildContext"],
dispatchAssembled:
dispatchAssembled as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"],
},

View File

@@ -592,40 +592,62 @@ async function processMessage(
: undefined;
const normalizedTo = isGroup ? `zalouser:group:${chatId}` : `zalouser:${chatId}`;
const messageSid = resolveZalouserMessageSid({
msgId: message.msgId,
cliMsgId: message.cliMsgId,
fallback: `${message.timestampMs}`,
});
const messageSidFull = formatZalouserMessageSidFull({
msgId: message.msgId,
cliMsgId: message.cliMsgId,
});
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: combinedBody,
BodyForAgent: rawBody,
InboundHistory: inboundHistory,
RawBody: rawBody,
CommandBody: commandBody,
BodyForCommands: commandBody,
From: isGroup ? `zalouser:group:${chatId}` : `zalouser:${senderId}`,
To: normalizedTo,
SessionKey: inboundSessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
ConversationLabel: fromLabel,
GroupSubject: isGroup ? groupName || undefined : undefined,
GroupChannel: isGroup ? groupName || undefined : undefined,
GroupMembers: isGroup ? groupMembers : undefined,
SenderName: senderName || undefined,
SenderId: senderId,
WasMentioned: isGroup ? mentionDecision.effectiveWasMentioned : undefined,
CommandAuthorized: commandAuthorized,
Provider: "zalouser",
Surface: "zalouser",
MessageSid: resolveZalouserMessageSid({
msgId: message.msgId,
cliMsgId: message.cliMsgId,
fallback: `${message.timestampMs}`,
}),
MessageSidFull: formatZalouserMessageSidFull({
msgId: message.msgId,
cliMsgId: message.cliMsgId,
}),
OriginatingChannel: "zalouser",
OriginatingTo: normalizedTo,
const ctxPayload = core.channel.turn.buildContext({
channel: "zalouser",
accountId: route.accountId,
messageId: messageSid,
messageIdFull: messageSidFull,
timestamp: message.timestampMs,
from: isGroup ? `zalouser:group:${chatId}` : `zalouser:${senderId}`,
sender: {
id: senderId,
name: senderName || undefined,
},
conversation: {
kind: isGroup ? "group" : "direct",
id: chatId,
label: fromLabel,
routePeer: {
kind: isGroup ? "group" : "direct",
id: chatId,
},
},
route: {
agentId: route.agentId,
accountId: route.accountId,
routeSessionKey: route.sessionKey,
dispatchSessionKey: inboundSessionKey,
},
reply: {
to: normalizedTo,
originatingTo: normalizedTo,
},
message: {
body: combinedBody,
bodyForAgent: rawBody,
rawBody,
commandBody,
inboundHistory,
envelopeFrom: fromLabel,
},
extra: {
BodyForCommands: commandBody,
GroupSubject: isGroup ? groupName || undefined : undefined,
GroupChannel: isGroup ? groupName || undefined : undefined,
GroupMembers: isGroup ? groupMembers : undefined,
WasMentioned: isGroup ? mentionDecision.effectiveWasMentioned : undefined,
CommandAuthorized: commandAuthorized,
},
});
const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({
@@ -649,49 +671,64 @@ async function processMessage(
},
});
await core.channel.turn.dispatchAssembled({
cfg: config,
await core.channel.turn.runResolved({
channel: "zalouser",
accountId: account.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverZalouserReply({
payload: payload as { text?: string; mediaUrls?: string[]; mediaUrl?: string },
profile: account.profile,
chatId,
isGroup,
runtime,
core,
config,
accountId: account.accountId,
statusSink,
tableMode: core.channel.text.resolveMarkdownTableMode({
cfg: config,
channel: "zalouser",
raw: message,
input: {
id: messageSid ?? `${message.timestampMs}`,
timestamp: message.timestampMs,
rawText: rawBody,
textForAgent: rawBody,
textForCommands: commandBody,
raw: message,
},
resolveTurn: () => ({
cfg: config,
channel: "zalouser",
accountId: account.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverZalouserReply({
payload: payload as { text?: string; mediaUrls?: string[]; mediaUrl?: string },
profile: account.profile,
chatId,
isGroup,
runtime,
core,
config,
accountId: account.accountId,
}),
});
statusSink,
tableMode: core.channel.text.resolveMarkdownTableMode({
cfg: config,
channel: "zalouser",
accountId: account.accountId,
}),
});
},
onError: (err, info) => {
runtime.error(
`[${account.accountId}] Zalouser ${info.kind} reply failed: ${String(err)}`,
);
},
},
onError: (err, info) => {
runtime.error(`[${account.accountId}] Zalouser ${info.kind} reply failed: ${String(err)}`);
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`);
record: {
onRecordError: (err) => {
runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`);
},
},
},
}),
});
if (isGroup && historyKey) {
clearHistoryEntriesIfEnabled({

View File

@@ -1,5 +1,5 @@
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import type { FinalizedMsgContext, MsgContext } from "../../auto-reply/templating.js";
import type { FinalizedMsgContext } from "../../auto-reply/templating.js";
import type {
AccessFacts,
ConversationFacts,
@@ -28,7 +28,7 @@ export type BuildChannelTurnContextParams = {
access?: AccessFacts;
media?: InboundMediaFacts[];
supplemental?: SupplementalContextFacts;
extra?: MsgContext;
extra?: Record<string, unknown>;
};
function compactStrings(values: Array<string | undefined>): string[] | undefined {

View File

@@ -12,6 +12,7 @@ import type {
PreparedChannelTurn,
PreflightFacts,
RunChannelTurnParams,
RunResolvedChannelTurnParams,
} from "./types.js";
export type {
AccessFacts,
@@ -37,6 +38,7 @@ export type {
ReplyPlanFacts,
RouteFacts,
RunChannelTurnParams,
RunResolvedChannelTurnParams,
SenderFacts,
SupplementalContextFacts,
} from "./types.js";
@@ -307,3 +309,18 @@ export async function runChannelTurn<TRaw>(
return result;
}
export async function runResolvedChannelTurn<TRaw>(
params: RunResolvedChannelTurnParams<TRaw>,
): Promise<ChannelTurnResult> {
return await runChannelTurn({
channel: params.channel,
accountId: params.accountId,
raw: params.raw,
log: params.log,
adapter: {
ingest: (raw) => (typeof params.input === "function" ? params.input(raw) : params.input),
resolveTurn: params.resolveTurn,
},
});
}

View File

@@ -293,3 +293,18 @@ export type RunChannelTurnParams<TRaw> = {
adapter: ChannelTurnAdapter<TRaw>;
log?: (event: ChannelTurnLogEvent) => void;
};
export type RunResolvedChannelTurnParams<TRaw> = {
channel: string;
accountId?: string;
raw: TRaw;
input:
| NormalizedTurnInput
| ((raw: TRaw) => Promise<NormalizedTurnInput | null> | NormalizedTurnInput | null);
resolveTurn: (
input: NormalizedTurnInput,
eventClass: ChannelEventClass,
preflight: PreflightFacts,
) => Promise<ChannelTurnResolved> | ChannelTurnResolved;
log?: (event: ChannelTurnLogEvent) => void;
};

View File

@@ -199,11 +199,21 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
To: params.reply.to,
SessionKey: params.route.dispatchSessionKey ?? params.route.routeSessionKey,
AccountId: params.route.accountId ?? params.accountId,
MessageSid: params.messageId,
MessageSidFull: params.messageIdFull,
ReplyToId: params.reply.replyToId ?? params.supplemental?.quote?.id,
ReplyToIdFull: params.reply.replyToIdFull ?? params.supplemental?.quote?.fullId,
MediaPath: params.media?.[0]?.path,
MediaUrl: params.media?.[0]?.url ?? params.media?.[0]?.path,
MediaType: params.media?.[0]?.contentType ?? params.media?.[0]?.kind,
ChatType: params.conversation.kind,
ConversationLabel: params.conversation.label,
SenderName: params.sender.name ?? params.sender.displayLabel,
SenderId: params.sender.id,
SenderUsername: params.sender.username,
Timestamp: params.timestamp,
WasMentioned: params.access?.mentions?.wasMentioned,
GroupSystemPrompt: params.supplemental?.groupSystemPrompt,
Provider: params.provider ?? params.channel,
Surface: params.surface ?? params.provider ?? params.channel,
OriginatingChannel: params.channel,
@@ -214,6 +224,28 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
...params.extra,
}) as ReturnType<PluginRuntime["channel"]["turn"]["buildContext"]>,
) as unknown as PluginRuntime["channel"]["turn"]["buildContext"];
const runResolvedChannelTurnMock = vi.fn(
async (params: Parameters<PluginRuntime["channel"]["turn"]["runResolved"]>[0]) => {
const input =
typeof params.input === "function" ? await params.input(params.raw) : params.input;
if (!input) {
return {
admission: { kind: "drop" as const, reason: "ingest-null" },
dispatched: false,
};
}
return await runChannelTurnMock({
channel: params.channel,
accountId: params.accountId,
raw: params.raw,
log: params.log,
adapter: {
ingest: () => input,
resolveTurn: params.resolveTurn,
},
});
},
) as unknown as PluginRuntime["channel"]["turn"]["runResolved"];
const base: PluginRuntime = {
version: "1.0.0-test",
config: {
@@ -568,6 +600,7 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
},
turn: {
run: runChannelTurnMock,
runResolved: runResolvedChannelTurnMock,
buildContext: buildChannelTurnContextMock,
runPrepared: runPreparedChannelTurnMock,
dispatchAssembled: dispatchAssembledChannelTurnMock,

View File

@@ -55,6 +55,7 @@ import {
dispatchAssembledChannelTurn,
runChannelTurn,
runPreparedChannelTurn,
runResolvedChannelTurn,
} from "../../channels/turn/kernel.js";
import {
resolveChannelGroupPolicy,
@@ -173,6 +174,7 @@ export function createRuntimeChannel(): PluginRuntime["channel"] {
},
turn: {
run: runChannelTurn,
runResolved: runResolvedChannelTurn,
buildContext: buildChannelTurnContext,
runPrepared: runPreparedChannelTurn,
dispatchAssembled: dispatchAssembledChannelTurn,

View File

@@ -153,6 +153,7 @@ export type PluginRuntimeChannel = {
};
turn: {
run: typeof import("../../channels/turn/kernel.js").runChannelTurn;
runResolved: typeof import("../../channels/turn/kernel.js").runResolvedChannelTurn;
buildContext: typeof import("../../channels/turn/kernel.js").buildChannelTurnContext;
runPrepared: typeof import("../../channels/turn/kernel.js").runPreparedChannelTurn;
dispatchAssembled: typeof import("../../channels/turn/kernel.js").dispatchAssembledChannelTurn;