From 9a3a341d93721f808d9deeae6e39c727249d2824 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 29 Apr 2026 22:53:29 +0100 Subject: [PATCH] refactor(channels): route remaining turns through kernel --- .../bluebubbles/src/monitor-processing.ts | 307 +++++++++--------- extensions/googlechat/src/monitor.ts | 175 ++++++---- .../src/channel.integration.test.ts | 2 + .../synology-chat/src/channel.test-mocks.ts | 97 ++++++ .../synology-chat/src/inbound-context.ts | 32 +- extensions/synology-chat/src/inbound-turn.ts | 127 +++++--- extensions/tlon/src/monitor/index.ts | 227 ++++++++----- extensions/twitch/src/monitor.ts | 226 +++++++------ .../src/monitor.reply-once.lifecycle.test.ts | 17 +- extensions/zalo/src/monitor.ts | 164 ++++++---- .../test-support/lifecycle-test-support.ts | 141 ++++++++ .../zalouser/src/monitor.group-gating.test.ts | 65 ++++ extensions/zalouser/src/monitor.ts | 177 ++++++---- src/channels/turn/context.ts | 4 +- src/channels/turn/kernel.ts | 17 + src/channels/turn/types.ts | 15 + .../test-helpers/plugin-runtime-mock.ts | 33 ++ src/plugins/runtime/runtime-channel.ts | 2 + src/plugins/runtime/types-channel.ts | 1 + 19 files changed, 1206 insertions(+), 623 deletions(-) diff --git a/extensions/bluebubbles/src/monitor-processing.ts b/extensions/bluebubbles/src/monitor-processing.ts index 2fb42e38684..5daeed82873 100644 --- a/extensions/bluebubbles/src/monitor-processing.ts +++ b/extensions/bluebubbles/src/monitor-processing.ts @@ -1714,50 +1714,127 @@ async function processMessageAfterDedupe( }, }, }); - await core.channel.turn.dispatchAssembled({ - cfg: config, + await core.channel.turn.run({ channel: "bluebubbles", accountId: account.accountId, - agentId: route.agentId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver: async (payload, info) => { - const rawReplyToId = - privateApiEnabled && typeof payload.replyToId === "string" - ? payload.replyToId.trim() - : ""; - // Resolve short ID (e.g., "5") to full UUID, scoped to the chat - // this deliver path is already routing for (cross-chat guard). - const replyToMessageGuid = rawReplyToId - ? resolveBlueBubblesMessageId(rawReplyToId, { - requireKnownShortId: true, - chatContext: { - chatGuid: chatGuidForActions ?? chatGuid, - chatIdentifier, - chatId, - }, - }) - : ""; - const mediaList = resolveOutboundMediaUrls(payload); - if (mediaList.length > 0) { - const tableMode = core.channel.text.resolveMarkdownTableMode({ - cfg: config, - channel: "bluebubbles", - accountId: account.accountId, - }); - const text = sanitizeReplyDirectiveText( - core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode), - ); - await sendMediaWithLeadingCaption({ - mediaUrls: mediaList, - caption: text, - send: async ({ mediaUrl, caption }) => { - const cachedBody = (caption ?? "").trim() || ""; + raw: ctxPayload, + adapter: { + ingest: () => ({ + id: String(ctxPayload.MessageSid ?? message.messageId), + timestamp: message.timestamp, + rawText: rawBody, + textForAgent: rawBody, + textForCommands: commandBody, + raw: ctxPayload, + }), + resolveTurn: () => ({ + cfg: config, + channel: "bluebubbles", + accountId: account.accountId, + agentId: route.agentId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload, info) => { + const rawReplyToId = + privateApiEnabled && typeof payload.replyToId === "string" + ? payload.replyToId.trim() + : ""; + // Resolve short ID (e.g., "5") to full UUID, scoped to the chat + // this deliver path is already routing for (cross-chat guard). + const replyToMessageGuid = rawReplyToId + ? resolveBlueBubblesMessageId(rawReplyToId, { + requireKnownShortId: true, + chatContext: { + chatGuid: chatGuidForActions ?? chatGuid, + chatIdentifier, + chatId, + }, + }) + : ""; + const mediaList = resolveOutboundMediaUrls(payload); + if (mediaList.length > 0) { + const tableMode = core.channel.text.resolveMarkdownTableMode({ + cfg: config, + channel: "bluebubbles", + accountId: account.accountId, + }); + const text = sanitizeReplyDirectiveText( + core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode), + ); + await sendMediaWithLeadingCaption({ + mediaUrls: mediaList, + caption: text, + send: async ({ mediaUrl, caption }) => { + const cachedBody = (caption ?? "").trim() || ""; + const pendingId = rememberPendingOutboundMessageId({ + accountId: account.accountId, + sessionKey: route.sessionKey, + outboundTarget, + chatGuid: chatGuidForActions ?? chatGuid, + chatIdentifier, + chatId, + snippet: cachedBody, + }); + let result: Awaited>; + try { + result = await sendBlueBubblesMedia({ + cfg: config, + to: outboundTarget, + mediaUrl, + caption: caption ?? undefined, + replyToId: replyToMessageGuid || null, + accountId: account.accountId, + asVoice: payload.audioAsVoice === true, + }); + } catch (err) { + forgetPendingOutboundMessageId(pendingId); + throw err; + } + if (maybeEnqueueOutboundMessageId(result.messageId, cachedBody)) { + forgetPendingOutboundMessageId(pendingId); + } + sentMessage = true; + statusSink?.({ lastOutboundAt: Date.now() }); + if (info.kind === "block") { + restartTypingSoon(); + } + }, + }); + return; + } + + const textLimit = + account.config.textChunkLimit && account.config.textChunkLimit > 0 + ? account.config.textChunkLimit + : DEFAULT_TEXT_LIMIT; + const chunkMode = account.config.chunkMode ?? "length"; + const tableMode = core.channel.text.resolveMarkdownTableMode({ + cfg: config, + channel: "bluebubbles", + accountId: account.accountId, + }); + const text = sanitizeReplyDirectiveText( + core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode), + ); + const chunks = + chunkMode === "newline" + ? resolveTextChunksWithFallback( + text, + core.channel.text.chunkTextWithMode(text, textLimit, chunkMode), + ) + : resolveTextChunksWithFallback( + text, + core.channel.text.chunkMarkdownText(text, textLimit), + ); + if (!chunks.length) { + return; + } + for (const chunk of chunks) { const pendingId = rememberPendingOutboundMessageId({ accountId: account.accountId, sessionKey: route.sessionKey, @@ -1765,24 +1842,20 @@ async function processMessageAfterDedupe( chatGuid: chatGuidForActions ?? chatGuid, chatIdentifier, chatId, - snippet: cachedBody, + snippet: chunk, }); - let result: Awaited>; + let result: Awaited>; try { - result = await sendBlueBubblesMedia({ + result = await sendMessageBlueBubbles(outboundTarget, chunk, { cfg: config, - to: outboundTarget, - mediaUrl, - caption: caption ?? undefined, - replyToId: replyToMessageGuid || null, accountId: account.accountId, - asVoice: payload.audioAsVoice === true, + replyToMessageGuid: replyToMessageGuid || undefined, }); } catch (err) { forgetPendingOutboundMessageId(pendingId); throw err; } - if (maybeEnqueueOutboundMessageId(result.messageId, cachedBody)) { + if (maybeEnqueueOutboundMessageId(result.messageId, chunk)) { forgetPendingOutboundMessageId(pendingId); } sentMessage = true; @@ -1790,101 +1863,43 @@ async function processMessageAfterDedupe( if (info.kind === "block") { restartTypingSoon(); } - }, - }); - return; - } - - const textLimit = - account.config.textChunkLimit && account.config.textChunkLimit > 0 - ? account.config.textChunkLimit - : DEFAULT_TEXT_LIMIT; - const chunkMode = account.config.chunkMode ?? "length"; - const tableMode = core.channel.text.resolveMarkdownTableMode({ - cfg: config, - channel: "bluebubbles", - accountId: account.accountId, - }); - const text = sanitizeReplyDirectiveText( - core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode), - ); - const chunks = - chunkMode === "newline" - ? resolveTextChunksWithFallback( - text, - core.channel.text.chunkTextWithMode(text, textLimit, chunkMode), - ) - : resolveTextChunksWithFallback( - text, - core.channel.text.chunkMarkdownText(text, textLimit), - ); - if (!chunks.length) { - return; - } - for (const chunk of chunks) { - const pendingId = rememberPendingOutboundMessageId({ - accountId: account.accountId, - sessionKey: route.sessionKey, - outboundTarget, - chatGuid: chatGuidForActions ?? chatGuid, - chatIdentifier, - chatId, - snippet: chunk, - }); - let result: Awaited>; - try { - result = await sendMessageBlueBubbles(outboundTarget, chunk, { - cfg: config, - accountId: account.accountId, - replyToMessageGuid: replyToMessageGuid || undefined, - }); - } catch (err) { - forgetPendingOutboundMessageId(pendingId); - throw err; - } - if (maybeEnqueueOutboundMessageId(result.messageId, chunk)) { - forgetPendingOutboundMessageId(pendingId); - } - sentMessage = true; - statusSink?.({ lastOutboundAt: Date.now() }); - if (info.kind === "block") { - restartTypingSoon(); - } - } - }, - onError: (err, info) => { - // Flag the outer dedupe wrapper so it releases the claim instead - // of committing. Without this, a transient BlueBubbles send failure - // would permanently block replay-retry for 7 days and the user - // would never receive a reply to that message. - // - // Only the terminal `final` delivery represents the user-visible - // answer. The dispatcher continues past `tool` / `block` failures - // and may still deliver `final` successfully — releasing the - // dedupe claim for those would invite a replay that re-runs tool - // side effects and resends partially-delivered content. - if (info.kind === "final") { - dedupeSignal.deliveryFailed = true; - } - runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${sanitizeForLog(err)}`); - }, - }, - dispatcherOptions: { - ...replyPipeline, - onReplyStart: typingCallbacks?.onReplyStart, - onIdle: typingCallbacks?.onIdle, - }, - replyOptions: { - onModelSelected, - disableBlockStreaming: - typeof account.config.blockStreaming === "boolean" - ? !account.config.blockStreaming - : undefined, - }, - record: { - onRecordError: (err) => { - runtime.error?.(`[bluebubbles] failed updating session meta: ${sanitizeForLog(err)}`); - }, + } + }, + onError: (err, info) => { + // Flag the outer dedupe wrapper so it releases the claim instead + // of committing. Without this, a transient BlueBubbles send failure + // would permanently block replay-retry for 7 days and the user + // would never receive a reply to that message. + // + // Only the terminal `final` delivery represents the user-visible + // answer. The dispatcher continues past `tool` / `block` failures + // and may still deliver `final` successfully — releasing the + // dedupe claim for those would invite a replay that re-runs tool + // side effects and resends partially-delivered content. + if (info.kind === "final") { + dedupeSignal.deliveryFailed = true; + } + runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${sanitizeForLog(err)}`); + }, + }, + dispatcherOptions: { + ...replyPipeline, + onReplyStart: typingCallbacks?.onReplyStart, + onIdle: typingCallbacks?.onIdle, + }, + replyOptions: { + onModelSelected, + disableBlockStreaming: + typeof account.config.blockStreaming === "boolean" + ? !account.config.blockStreaming + : undefined, + }, + record: { + onRecordError: (err) => { + runtime.error?.(`[bluebubbles] failed updating session meta: ${sanitizeForLog(err)}`); + }, + }, + }), }, }); } finally { diff --git a/extensions/googlechat/src/monitor.ts b/extensions/googlechat/src/monitor.ts index 83c7a6a2d01..25fad685113 100644 --- a/extensions/googlechat/src/monitor.ts +++ b/extensions/googlechat/src/monitor.ts @@ -195,35 +195,65 @@ async function processMessageWithPipeline(params: { body: rawBody, }); - const ctxPayload = core.channel.reply.finalizeInboundContext({ - Body: body, - BodyForAgent: rawBody, - RawBody: rawBody, - CommandBody: rawBody, - From: `googlechat:${senderId}`, - To: `googlechat:${spaceId}`, - SessionKey: route.sessionKey, - AccountId: route.accountId, - ChatType: isGroup ? "channel" : "direct", - ConversationLabel: fromLabel, - SenderName: senderName || undefined, - SenderId: senderId, - SenderUsername: senderEmail, - WasMentioned: isGroup ? effectiveWasMentioned : undefined, - CommandAuthorized: commandAuthorized, - Provider: "googlechat", - Surface: "googlechat", - MessageSid: message.name, - MessageSidFull: message.name, - ReplyToId: message.thread?.name, - ReplyToIdFull: message.thread?.name, - MediaPath: mediaPath, - MediaType: mediaType, - MediaUrl: mediaPath, - GroupSpace: isGroup ? (space.displayName ?? undefined) : undefined, - GroupSystemPrompt: isGroup ? groupSystemPrompt : undefined, - OriginatingChannel: "googlechat", - OriginatingTo: `googlechat:${spaceId}`, + const ctxPayload = core.channel.turn.buildContext({ + channel: "googlechat", + accountId: route.accountId, + messageId: message.name, + messageIdFull: message.name, + timestamp: event.eventTime ? Date.parse(event.eventTime) : undefined, + from: `googlechat:${senderId}`, + sender: { + id: senderId, + name: senderName || undefined, + username: senderEmail, + }, + conversation: { + kind: isGroup ? "channel" : "direct", + id: spaceId, + label: fromLabel, + routePeer: { + kind: isGroup ? "group" : "direct", + id: spaceId, + }, + }, + route: { + agentId: route.agentId, + accountId: route.accountId, + routeSessionKey: route.sessionKey, + }, + reply: { + to: `googlechat:${spaceId}`, + originatingTo: `googlechat:${spaceId}`, + replyToId: message.thread?.name, + replyToIdFull: message.thread?.name, + }, + message: { + body, + bodyForAgent: rawBody, + rawBody, + commandBody: rawBody, + envelopeFrom: fromLabel, + }, + media: + mediaPath || mediaType + ? [ + { + path: mediaPath, + url: mediaPath, + contentType: mediaType, + }, + ] + : undefined, + supplemental: { + groupSystemPrompt: isGroup ? groupSystemPrompt : undefined, + }, + extra: { + ChatType: isGroup ? "channel" : "direct", + WasMentioned: isGroup ? effectiveWasMentioned : undefined, + CommandAuthorized: commandAuthorized, + GroupSubject: undefined, + GroupSpace: isGroup ? (space.displayName ?? undefined) : undefined, + }, }); // Typing indicator setup @@ -265,47 +295,60 @@ async function processMessageWithPipeline(params: { accountId: route.accountId, }); - await core.channel.turn.dispatchAssembled({ - cfg: config, + await core.channel.turn.runResolved({ channel: "googlechat", accountId: route.accountId, - agentId: route.agentId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver: async (payload) => { - await deliverGoogleChatReply({ - payload, - account, - spaceId, - runtime, - core, - config, - statusSink, - typingMessageName, - }); - // Only use typing message for first delivery - typingMessageName = undefined; - }, - onError: (err, info) => { - runtime.error?.( - `[${account.accountId}] Google Chat ${info.kind} reply failed: ${String(err)}`, - ); - }, + raw: message, + input: { + id: message.name ?? spaceId, + timestamp: event.eventTime ? Date.parse(event.eventTime) : undefined, + rawText: rawBody, + textForAgent: rawBody, + textForCommands: rawBody, + raw: message, }, - dispatcherOptions: replyPipeline, - replyOptions: { - onModelSelected, - }, - record: { - onRecordError: (err) => { - runtime.error?.(`googlechat: failed updating session meta: ${String(err)}`); + resolveTurn: () => ({ + cfg: config, + channel: "googlechat", + accountId: route.accountId, + agentId: route.agentId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload) => { + await deliverGoogleChatReply({ + payload, + account, + spaceId, + runtime, + core, + config, + statusSink, + typingMessageName, + }); + // Only use typing message for first delivery + typingMessageName = undefined; + }, + onError: (err, info) => { + runtime.error?.( + `[${account.accountId}] Google Chat ${info.kind} reply failed: ${String(err)}`, + ); + }, }, - }, + dispatcherOptions: replyPipeline, + replyOptions: { + onModelSelected, + }, + record: { + onRecordError: (err) => { + runtime.error?.(`googlechat: failed updating session meta: ${String(err)}`); + }, + }, + }), }); } diff --git a/extensions/synology-chat/src/channel.integration.test.ts b/extensions/synology-chat/src/channel.integration.test.ts index a8db3045386..867e2e58ef5 100644 --- a/extensions/synology-chat/src/channel.integration.test.ts +++ b/extensions/synology-chat/src/channel.integration.test.ts @@ -1,6 +1,7 @@ import type { IncomingMessage, ServerResponse } from "node:http"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { + buildChannelTurnContextMock, dispatchReplyWithBufferedBlockDispatcher, finalizeInboundContextMock, registerPluginHttpRouteMock, @@ -35,6 +36,7 @@ describe("Synology channel wiring integration", () => { beforeEach(() => { registerPluginHttpRouteMock.mockClear(); dispatchReplyWithBufferedBlockDispatcher.mockClear(); + buildChannelTurnContextMock.mockClear(); finalizeInboundContextMock.mockClear(); resolveAgentRouteMock.mockClear(); setSynologyRuntimeConfigForTest({}); diff --git a/extensions/synology-chat/src/channel.test-mocks.ts b/extensions/synology-chat/src/channel.test-mocks.ts index ddcfda1c00d..a62d2ebc95f 100644 --- a/extensions/synology-chat/src/channel.test-mocks.ts +++ b/extensions/synology-chat/src/channel.test-mocks.ts @@ -19,6 +19,49 @@ export const dispatchReplyWithBufferedBlockDispatcher: Mock< export const finalizeInboundContextMock: Mock< (ctx: Record) => Record > = vi.fn((ctx) => ctx); +export const buildChannelTurnContextMock: Mock< + (params: { + channel: string; + accountId?: string; + timestamp?: number; + from: string; + sender: { id: string; name?: string }; + conversation: { kind: string; label?: string }; + route: { + accountId?: string; + routeSessionKey: string; + dispatchSessionKey?: string; + }; + reply: { to: string; originatingTo: string }; + message: { + rawBody: string; + bodyForAgent?: string; + commandBody?: string; + }; + extra?: Record; + }) => Record +> = vi.fn((params) => + finalizeInboundContextMock({ + Body: params.message.rawBody, + BodyForAgent: params.message.bodyForAgent ?? params.message.rawBody, + RawBody: params.message.rawBody, + CommandBody: params.message.commandBody ?? params.message.rawBody, + From: params.from, + To: params.reply.to, + SessionKey: params.route.dispatchSessionKey ?? params.route.routeSessionKey, + AccountId: params.route.accountId ?? params.accountId, + OriginatingChannel: params.channel, + OriginatingTo: params.reply.originatingTo, + ChatType: params.conversation.kind, + SenderName: params.sender.name, + SenderId: params.sender.id, + Provider: params.channel, + Surface: params.channel, + ConversationLabel: params.conversation.label, + Timestamp: params.timestamp, + ...params.extra, + }), +); export const resolveAgentRouteMock: Mock< (params: { accountId?: string }) => { agentId: string; sessionKey: string; accountId: string } > = vi.fn((params) => { @@ -100,6 +143,60 @@ vi.mock("./runtime.js", () => ({ recordInboundSession: vi.fn(async () => undefined), }, turn: { + run: vi.fn(async (params) => { + const input = await params.adapter.ingest(params.raw); + if (!input) { + return { admission: { kind: "drop", reason: "ingest-null" }, dispatched: false }; + } + const resolved = await params.adapter.resolveTurn(input, { + kind: "message", + canStartAgentTurn: true, + }); + const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({ + ctx: resolved.ctxPayload, + cfg: mockRuntimeConfig, + dispatcherOptions: { + ...resolved.dispatcherOptions, + deliver: resolved.delivery.deliver, + onError: resolved.delivery.onError, + }, + }); + return { + admission: { kind: "dispatch" }, + dispatched: true, + dispatchResult, + ctxPayload: resolved.ctxPayload, + routeSessionKey: resolved.routeSessionKey, + }; + }), + runResolved: vi.fn(async (params) => { + const input = + typeof params.input === "function" ? await params.input(params.raw) : params.input; + if (!input) { + return { admission: { kind: "drop", reason: "ingest-null" }, dispatched: false }; + } + const resolved = await params.resolveTurn(input, { + kind: "message", + canStartAgentTurn: true, + }); + const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({ + ctx: resolved.ctxPayload, + cfg: mockRuntimeConfig, + dispatcherOptions: { + ...resolved.dispatcherOptions, + deliver: resolved.delivery.deliver, + onError: resolved.delivery.onError, + }, + }); + return { + admission: { kind: "dispatch" }, + dispatched: true, + dispatchResult, + ctxPayload: resolved.ctxPayload, + routeSessionKey: resolved.routeSessionKey, + }; + }), + buildContext: buildChannelTurnContextMock, dispatchAssembled: vi.fn(async (params) => ({ dispatchResult: await params.dispatchReplyWithBufferedBlockDispatcher({ ctx: params.ctxPayload, diff --git a/extensions/synology-chat/src/inbound-context.ts b/extensions/synology-chat/src/inbound-context.ts index f6e11fd152a..c77d4ade101 100644 --- a/extensions/synology-chat/src/inbound-context.ts +++ b/extensions/synology-chat/src/inbound-context.ts @@ -1,7 +1,3 @@ -import type { ResolvedSynologyChatAccount } from "./types.js"; - -const CHANNEL_ID = "synology-chat"; - export type SynologyInboundMessage = { body: string; from: string; @@ -13,30 +9,4 @@ export type SynologyInboundMessage = { chatUserId?: string; }; -export function buildSynologyChatInboundContext(params: { - finalizeInboundContext: (ctx: Record) => TContext; - account: ResolvedSynologyChatAccount; - msg: SynologyInboundMessage; - sessionKey: string; -}): TContext { - const { account, msg, sessionKey } = params; - return params.finalizeInboundContext({ - Body: msg.body, - RawBody: msg.body, - CommandBody: msg.body, - From: `synology-chat:${msg.from}`, - To: `synology-chat:${msg.from}`, - SessionKey: sessionKey, - AccountId: account.accountId, - OriginatingChannel: CHANNEL_ID, - OriginatingTo: `synology-chat:${msg.from}`, - ChatType: msg.chatType, - SenderName: msg.senderName, - SenderId: msg.from, - Provider: CHANNEL_ID, - Surface: CHANNEL_ID, - ConversationLabel: msg.senderName || msg.from, - Timestamp: Date.now(), - CommandAuthorized: msg.commandAuthorized, - }); -} +export type { ResolvedSynologyChatAccount } from "./types.js"; diff --git a/extensions/synology-chat/src/inbound-turn.ts b/extensions/synology-chat/src/inbound-turn.ts index 9160356dadb..88e028c89ad 100644 --- a/extensions/synology-chat/src/inbound-turn.ts +++ b/extensions/synology-chat/src/inbound-turn.ts @@ -1,6 +1,6 @@ import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; import { sendMessage } from "./client.js"; -import { buildSynologyChatInboundContext, type SynologyInboundMessage } from "./inbound-context.js"; +import type { SynologyInboundMessage } from "./inbound-context.js"; import { getSynologyRuntime } from "./runtime.js"; import { buildSynologyChatInboundSessionKey } from "./session-key.js"; import type { ResolvedSynologyChatAccount } from "./types.js"; @@ -71,46 +71,97 @@ export async function dispatchSynologyChatInboundTurn(params: { account: params.account, userId: params.msg.from, }); - const msgCtx = buildSynologyChatInboundContext({ - finalizeInboundContext: resolved.rt.channel.reply.finalizeInboundContext, - account: params.account, - msg: params.msg, - sessionKey: resolved.sessionKey, - }); - const storePath = resolved.rt.channel.session.resolveStorePath(currentCfg.session?.store, { - agentId: resolved.route.agentId, - }); - - await resolved.rt.channel.turn.dispatchAssembled({ - cfg: currentCfg, + await resolved.rt.channel.turn.runResolved({ channel: CHANNEL_ID, accountId: params.account.accountId, - agentId: resolved.route.agentId, - routeSessionKey: resolved.route.sessionKey, - storePath, - ctxPayload: msgCtx, - recordInboundSession: resolved.rt.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - resolved.rt.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver: async (payload) => { - await deliverSynologyChatReply({ - account: params.account, - sendUserId, - payload, - }); - }, - }, - dispatcherOptions: { - onReplyStart: () => { - params.log?.info?.(`Agent reply started for ${params.msg.from}`); - }, - }, - record: { - onRecordError: (err) => { - params.log?.info?.(`Session metadata update failed for ${params.msg.from}`, err); - }, + raw: params.msg, + input: (msg) => ({ + id: `${params.account.accountId}:${msg.from}`, + timestamp: Date.now(), + rawText: msg.body, + textForAgent: msg.body, + textForCommands: msg.body, + raw: msg, + }), + resolveTurn: (input) => { + const chatKind = + params.msg.chatType === "group" || params.msg.chatType === "channel" + ? params.msg.chatType + : "direct"; + const msgCtx = resolved.rt.channel.turn.buildContext({ + channel: CHANNEL_ID, + accountId: params.account.accountId, + timestamp: input.timestamp, + from: `synology-chat:${params.msg.from}`, + sender: { + id: params.msg.from, + name: params.msg.senderName, + }, + conversation: { + kind: chatKind, + id: params.msg.from, + label: params.msg.senderName || params.msg.from, + routePeer: { + kind: "direct", + id: params.msg.from, + }, + }, + route: { + agentId: resolved.route.agentId, + accountId: params.account.accountId, + routeSessionKey: resolved.sessionKey, + dispatchSessionKey: resolved.sessionKey, + }, + reply: { + to: `synology-chat:${params.msg.from}`, + originatingTo: `synology-chat:${params.msg.from}`, + }, + message: { + rawBody: input.rawText, + commandBody: input.textForCommands, + bodyForAgent: input.textForAgent, + envelopeFrom: params.msg.senderName, + }, + extra: { + ChatType: params.msg.chatType, + CommandAuthorized: params.msg.commandAuthorized, + }, + }); + const storePath = resolved.rt.channel.session.resolveStorePath(currentCfg.session?.store, { + agentId: resolved.route.agentId, + }); + return { + cfg: currentCfg, + channel: CHANNEL_ID, + accountId: params.account.accountId, + agentId: resolved.route.agentId, + routeSessionKey: resolved.route.sessionKey, + storePath, + ctxPayload: msgCtx, + recordInboundSession: resolved.rt.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + resolved.rt.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload) => { + await deliverSynologyChatReply({ + account: params.account, + sendUserId, + payload, + }); + }, + }, + dispatcherOptions: { + onReplyStart: () => { + params.log?.info?.(`Agent reply started for ${params.msg.from}`); + }, + }, + record: { + onRecordError: (err) => { + params.log?.info?.(`Session metadata update failed for ${params.msg.from}`, err); + }, + }, + }; }, }); diff --git a/extensions/tlon/src/monitor/index.ts b/extensions/tlon/src/monitor/index.ts index 903e7f7e837..c527fd4e1f3 100644 --- a/extensions/tlon/src/monitor/index.ts +++ b/extensions/tlon/src/monitor/index.ts @@ -516,31 +516,58 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise 0 && { Attachments: attachments }), - OriginatingChannel: "tlon", - OriginatingTo: `tlon:${isGroup ? groupChannel : botShipName}`, - // Include thread context for automatic reply routing - ...(parentId && { ThreadId: parentId, ReplyToId: parentId }), + const ctxPayload = core.channel.turn.buildContext({ + channel: "tlon", + accountId: route.accountId, + messageId, + timestamp, + from: isGroup ? `tlon:group:${groupChannel}` : `tlon:${senderShip}`, + sender: { + id: senderShip, + name: senderShip, + roles: [senderRole], + }, + conversation: { + kind: isGroup ? "group" : "direct", + id: tlonConversationId, + label: fromLabel, + routePeer: { + kind: isGroup ? "group" : "direct", + id: tlonConversationId, + }, + }, + route: { + agentId: route.agentId, + accountId: route.accountId, + routeSessionKey: route.sessionKey, + }, + reply: { + to: `tlon:${botShipName}`, + originatingTo: `tlon:${isGroup ? groupChannel : botShipName}`, + replyToId: parentId ?? undefined, + }, + message: { + body, + bodyForAgent: commandBody, + rawBody: messageText, + commandBody, + envelopeFrom: fromLabel, + }, + extra: { + GroupSubject: undefined, + SenderRole: senderRole, + CommandAuthorized: commandAuthorized, + CommandSource: "text" as const, + ...(attachments.length > 0 && { Attachments: attachments }), + ...(parentId && { ThreadId: parentId }), + }, }); const dispatchStartTime = Date.now(); @@ -554,76 +581,96 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise { - let replyText = payload.text; - if (!replyText) { - return; - } + raw: rawTurnMessage, + adapter: { + ingest: (raw) => ({ + id: raw.messageId, + timestamp: raw.timestamp, + rawText: raw.messageText, + textForAgent: commandBody, + textForCommands: commandBody, + raw, + }), + resolveTurn: () => ({ + cfg, + channel: "tlon", + accountId: route.accountId, + agentId: route.agentId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload: ReplyPayload) => { + let replyText = payload.text; + if (!replyText) { + return; + } - // Use settings store value if set, otherwise fall back to file config - const showSignature = effectiveShowModelSig; - if (showSignature) { - const extPayload = payload as { - metadata?: { model?: string }; - model?: string; - }; - const defaultModel = cfg.agents?.defaults?.model; - const modelInfo = - extPayload.metadata?.model || - extPayload.model || - (typeof defaultModel === "string" ? defaultModel : defaultModel?.primary); - replyText = `${replyText}\n\n_[Generated by ${formatModelName(modelInfo)}]_`; - } + // Use settings store value if set, otherwise fall back to file config + const showSignature = effectiveShowModelSig; + if (showSignature) { + const extPayload = payload as { + metadata?: { model?: string }; + model?: string; + }; + const defaultModel = cfg.agents?.defaults?.model; + const modelInfo = + extPayload.metadata?.model || + extPayload.model || + (typeof defaultModel === "string" ? defaultModel : defaultModel?.primary); + replyText = `${replyText}\n\n_[Generated by ${formatModelName(modelInfo)}]_`; + } - if (isGroup && groupChannel) { - const parsed = parseChannelNest(groupChannel); - if (!parsed) { - return; - } - await sendGroupMessage({ - api: api, - fromShip: botShipName, - hostShip: parsed.hostShip, - channelName: parsed.channelName, - text: replyText, - replyToId: parentId ?? undefined, - }); - // Track thread participation for future replies without mention - if (parentId) { - participatedThreads.add(parentId); - runtime.log?.(`[tlon] Now tracking thread for future replies: ${parentId}`); - } - } else { - await sendDm({ api: api, fromShip: botShipName, toShip: senderShip, text: replyText }); - } - }, - onError: (err, info) => { - const dispatchDuration = Date.now() - dispatchStartTime; - runtime.error?.( - `[tlon] ${info.kind} reply failed after ${dispatchDuration}ms: ${String(err)}`, - ); - }, - }, - dispatcherOptions: { - responsePrefix, - humanDelay, - }, - record: { - onRecordError: (err) => { - runtime.error?.(`[tlon] failed updating session meta: ${String(err)}`); - }, + if (isGroup && groupChannel) { + const parsed = parseChannelNest(groupChannel); + if (!parsed) { + return; + } + await sendGroupMessage({ + api: api, + fromShip: botShipName, + hostShip: parsed.hostShip, + channelName: parsed.channelName, + text: replyText, + replyToId: parentId ?? undefined, + }); + // Track thread participation for future replies without mention + if (parentId) { + participatedThreads.add(parentId); + runtime.log?.(`[tlon] Now tracking thread for future replies: ${parentId}`); + } + } else { + await sendDm({ + api: api, + fromShip: botShipName, + toShip: senderShip, + text: replyText, + }); + } + }, + onError: (err, info) => { + const dispatchDuration = Date.now() - dispatchStartTime; + runtime.error?.( + `[tlon] ${info.kind} reply failed after ${dispatchDuration}ms: ${String(err)}`, + ); + }, + }, + dispatcherOptions: { + responsePrefix, + humanDelay, + }, + record: { + onRecordError: (err) => { + runtime.error?.(`[tlon] failed updating session meta: ${String(err)}`); + }, + }, + }), }, }); }; diff --git a/extensions/twitch/src/monitor.ts b/extensions/twitch/src/monitor.ts index 11f5e3c3bf7..06500f444c7 100644 --- a/extensions/twitch/src/monitor.ts +++ b/extensions/twitch/src/monitor.ts @@ -51,116 +51,126 @@ async function processTwitchMessage(params: { const { message, account, accountId, config, runtime, core, statusSink } = params; const cfg = config as OpenClawConfig; - const route = core.channel.routing.resolveAgentRoute({ - cfg, + await core.channel.turn.runResolved({ channel: "twitch", accountId, - peer: { - kind: "group", // Twitch chat is always group-like - id: message.channel, - }, - }); - - const rawBody = message.message; - const senderId = message.userId ?? message.username; - const body = core.channel.reply.formatAgentEnvelope({ - channel: "Twitch", - from: message.displayName ?? message.username, - timestamp: message.timestamp?.getTime(), - envelope: core.channel.reply.resolveEnvelopeFormatOptions(cfg), - body: rawBody, - }); - - const ctxPayload = core.channel.turn.buildContext({ - channel: "twitch", - accountId, - messageId: message.id, - timestamp: message.timestamp?.getTime(), - from: `twitch:user:${senderId}`, - sender: { - id: senderId, - name: message.displayName ?? message.username, - username: message.username, - }, - conversation: { - kind: "group", - id: message.channel, - label: message.channel, - routePeer: { - kind: "group", - id: message.channel, - }, - }, - route: { - agentId: route.agentId, - accountId: route.accountId, - routeSessionKey: route.sessionKey, - }, - reply: { - to: `twitch:channel:${message.channel}`, - originatingTo: `twitch:channel:${message.channel}`, - }, - message: { - body, - rawBody, - bodyForAgent: rawBody, - commandBody: rawBody, - envelopeFrom: message.displayName ?? message.username, - }, - }); - - const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { - agentId: route.agentId, - }); - - const tableMode = core.channel.text.resolveMarkdownTableMode({ - cfg, - channel: "twitch", - accountId, - }); - const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({ - cfg, - agentId: route.agentId, - channel: "twitch", - accountId, - }); - - await core.channel.turn.dispatchAssembled({ - cfg, - channel: "twitch", - accountId, - agentId: route.agentId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver: async (payload) => { - await deliverTwitchReply({ - payload, - channel: message.channel, - account, - accountId, - config, - tableMode, - runtime, - statusSink, - }); - }, - onError: (err, info) => { - runtime.error?.(`Twitch ${info.kind} reply failed: ${String(err)}`); - }, - }, - dispatcherOptions: replyPipeline, - replyOptions: { - onModelSelected, - }, - record: { - onRecordError: (err) => { - runtime.error?.(`Failed updating session meta: ${String(err)}`); - }, + raw: message, + input: (incoming) => ({ + id: incoming.id ?? `${incoming.channel}:${incoming.timestamp?.getTime() ?? Date.now()}`, + timestamp: incoming.timestamp?.getTime(), + rawText: incoming.message, + textForAgent: incoming.message, + textForCommands: incoming.message, + raw: incoming, + }), + resolveTurn: (input) => { + const route = core.channel.routing.resolveAgentRoute({ + cfg, + channel: "twitch", + accountId, + peer: { + kind: "group", + id: message.channel, + }, + }); + const senderId = message.userId ?? message.username; + const fromLabel = message.displayName ?? message.username; + const body = core.channel.reply.formatAgentEnvelope({ + channel: "Twitch", + from: fromLabel, + timestamp: input.timestamp, + envelope: core.channel.reply.resolveEnvelopeFormatOptions(cfg), + body: input.rawText, + }); + const ctxPayload = core.channel.turn.buildContext({ + channel: "twitch", + accountId, + messageId: input.id, + timestamp: input.timestamp, + from: `twitch:user:${senderId}`, + sender: { + id: senderId, + name: fromLabel, + username: message.username, + }, + conversation: { + kind: "group", + id: message.channel, + label: message.channel, + routePeer: { + kind: "group", + id: message.channel, + }, + }, + route: { + agentId: route.agentId, + accountId: route.accountId, + routeSessionKey: route.sessionKey, + }, + reply: { + to: `twitch:channel:${message.channel}`, + originatingTo: `twitch:channel:${message.channel}`, + }, + message: { + body, + rawBody: input.rawText, + bodyForAgent: input.textForAgent, + commandBody: input.textForCommands, + envelopeFrom: fromLabel, + }, + }); + const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + const tableMode = core.channel.text.resolveMarkdownTableMode({ + cfg, + channel: "twitch", + accountId, + }); + const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({ + cfg, + agentId: route.agentId, + channel: "twitch", + accountId, + }); + return { + cfg, + channel: "twitch", + accountId, + agentId: route.agentId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload) => { + await deliverTwitchReply({ + payload, + channel: message.channel, + account, + accountId, + config, + tableMode, + runtime, + statusSink, + }); + }, + onError: (err, info) => { + runtime.error?.(`Twitch ${info.kind} reply failed: ${String(err)}`); + }, + }, + dispatcherOptions: replyPipeline, + replyOptions: { + onModelSelected, + }, + record: { + onRecordError: (err) => { + runtime.error?.(`Failed updating session meta: ${String(err)}`); + }, + }, + }; }, }); } diff --git a/extensions/zalo/src/monitor.reply-once.lifecycle.test.ts b/extensions/zalo/src/monitor.reply-once.lifecycle.test.ts index 4abecb0ffe9..81f8f0bb0f7 100644 --- a/extensions/zalo/src/monitor.reply-once.lifecycle.test.ts +++ b/extensions/zalo/src/monitor.reply-once.lifecycle.test.ts @@ -92,20 +92,17 @@ describe("Zalo reply-once lifecycle", () => { }, ); - expect(finalizeInboundContextMock).toHaveBeenCalledTimes(1); - expect(finalizeInboundContextMock).toHaveBeenCalledWith( - expect.objectContaining({ - AccountId: "acct-zalo-lifecycle", - SessionKey: "agent:main:zalo:direct:dm-chat-1", - MessageSid: expect.stringContaining("zalo-replay-"), - From: "zalo:user-1", - To: "zalo:dm-chat-1", - }), - ); expect(recordInboundSessionMock).toHaveBeenCalledTimes(1); expect(recordInboundSessionMock).toHaveBeenCalledWith( expect.objectContaining({ sessionKey: "agent:main:zalo:direct:dm-chat-1", + ctx: expect.objectContaining({ + AccountId: "acct-zalo-lifecycle", + SessionKey: "agent:main:zalo:direct:dm-chat-1", + MessageSid: expect.stringContaining("zalo-replay-"), + From: "zalo:user-1", + To: "zalo:dm-chat-1", + }), }), ); expect(sendMessageMock).toHaveBeenCalledTimes(1); diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts index 011c39e6067..28afba75469 100644 --- a/extensions/zalo/src/monitor.ts +++ b/extensions/zalo/src/monitor.ts @@ -582,28 +582,55 @@ async function processMessageWithPipeline(params: ZaloMessagePipelineParams): Pr body: rawBody, }); - const ctxPayload = core.channel.reply.finalizeInboundContext({ - Body: body, - BodyForAgent: rawBody, - RawBody: rawBody, - CommandBody: rawBody, - From: isGroup ? `zalo:group:${chatId}` : `zalo:${senderId}`, - To: `zalo:${chatId}`, - SessionKey: route.sessionKey, - AccountId: route.accountId, - ChatType: isGroup ? "group" : "direct", - ConversationLabel: fromLabel, - SenderName: senderName || undefined, - SenderId: senderId, - CommandAuthorized: commandAuthorized, - Provider: "zalo", - Surface: "zalo", - MessageSid: message_id, - MediaPath: mediaPath, - MediaType: mediaType, - MediaUrl: mediaPath, - OriginatingChannel: "zalo", - OriginatingTo: `zalo:${chatId}`, + const ctxPayload = core.channel.turn.buildContext({ + channel: "zalo", + accountId: route.accountId, + messageId: message_id, + timestamp: date ? date * 1000 : undefined, + from: isGroup ? `zalo:group:${chatId}` : `zalo:${senderId}`, + sender: { + id: senderId, + name: senderName || undefined, + }, + conversation: { + kind: isGroup ? "group" : "direct", + id: chatId, + label: fromLabel, + routePeer: { + kind: isGroup ? "group" : "direct", + id: chatId, + }, + }, + route: { + agentId: route.agentId, + accountId: route.accountId, + routeSessionKey: route.sessionKey, + }, + reply: { + to: `zalo:${chatId}`, + originatingTo: `zalo:${chatId}`, + }, + message: { + body, + bodyForAgent: rawBody, + rawBody, + commandBody: rawBody, + envelopeFrom: fromLabel, + }, + media: + mediaPath || mediaType + ? [ + { + path: mediaPath, + url: mediaPath, + contentType: mediaType, + }, + ] + : undefined, + extra: { + CommandAuthorized: commandAuthorized, + GroupSubject: undefined, + }, }); const tableMode = core.channel.text.resolveMarkdownTableMode({ @@ -640,50 +667,63 @@ async function processMessageWithPipeline(params: ZaloMessagePipelineParams): Pr }, }); - await core.channel.turn.dispatchAssembled({ - cfg: config, + await core.channel.turn.runResolved({ channel: "zalo", accountId: account.accountId, - agentId: route.agentId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver: async (payload) => { - await deliverZaloReply({ - payload, - token, - chatId, - runtime, - core, - config, - webhookUrl: params.webhookUrl, - webhookPath: params.webhookPath, - proxyUrl: account.config.proxy, - mediaMaxBytes: params.mediaMaxMb * 1024 * 1024, - canHostMedia: params.canHostMedia, - accountId: account.accountId, - statusSink, - fetcher, - tableMode, - }); - }, - onError: (err, info) => { - runtime.error?.(`[${account.accountId}] Zalo ${info.kind} reply failed: ${String(err)}`); - }, + raw: message, + input: { + id: message_id, + timestamp: date ? date * 1000 : undefined, + rawText: rawBody, + textForAgent: rawBody, + textForCommands: rawBody, + raw: message, }, - dispatcherOptions: replyPipeline, - replyOptions: { - onModelSelected, - }, - record: { - onRecordError: (err) => { - runtime.error?.(`zalo: failed updating session meta: ${String(err)}`); + resolveTurn: () => ({ + cfg: config, + channel: "zalo", + accountId: account.accountId, + agentId: route.agentId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload) => { + await deliverZaloReply({ + payload, + token, + chatId, + runtime, + core, + config, + webhookUrl: params.webhookUrl, + webhookPath: params.webhookPath, + proxyUrl: account.config.proxy, + mediaMaxBytes: params.mediaMaxMb * 1024 * 1024, + canHostMedia: params.canHostMedia, + accountId: account.accountId, + statusSink, + fetcher, + tableMode, + }); + }, + onError: (err, info) => { + runtime.error?.(`[${account.accountId}] Zalo ${info.kind} reply failed: ${String(err)}`); + }, }, - }, + dispatcherOptions: replyPipeline, + replyOptions: { + onModelSelected, + }, + record: { + onRecordError: (err) => { + runtime.error?.(`zalo: failed updating session meta: ${String(err)}`); + }, + }, + }), }); } diff --git a/extensions/zalo/src/test-support/lifecycle-test-support.ts b/extensions/zalo/src/test-support/lifecycle-test-support.ts index be1c2a085a5..1ece16c9568 100644 --- a/extensions/zalo/src/test-support/lifecycle-test-support.ts +++ b/extensions/zalo/src/test-support/lifecycle-test-support.ts @@ -122,6 +122,50 @@ export function createImageUpdate(params?: { export function createImageLifecycleCore() { const finalizeInboundContextMock = vi.fn((ctx: Record) => ctx); + const buildChannelTurnContextMock = vi.fn( + (params: { + channel: string; + accountId?: string; + messageId?: string; + timestamp?: number; + from: string; + sender: { id: string; name?: string }; + conversation: { kind: string; label?: string }; + route: { + accountId?: string; + routeSessionKey: string; + dispatchSessionKey?: string; + }; + reply: { to: string; originatingTo: string }; + message: { body?: string; rawBody: string; bodyForAgent?: string; commandBody?: string }; + media?: Array<{ path?: string; url?: string; contentType?: string }>; + extra?: Record; + }) => + finalizeInboundContextMock({ + Body: params.message.body ?? params.message.rawBody, + BodyForAgent: params.message.bodyForAgent ?? params.message.rawBody, + RawBody: params.message.rawBody, + CommandBody: params.message.commandBody ?? params.message.rawBody, + From: params.from, + To: params.reply.to, + SessionKey: params.route.dispatchSessionKey ?? params.route.routeSessionKey, + AccountId: params.route.accountId ?? params.accountId, + ChatType: params.conversation.kind, + ConversationLabel: params.conversation.label, + SenderName: params.sender.name, + SenderId: params.sender.id, + Provider: params.channel, + Surface: params.channel, + MessageSid: params.messageId, + Timestamp: params.timestamp, + MediaPath: params.media?.[0]?.path, + MediaType: params.media?.[0]?.contentType, + MediaUrl: params.media?.[0]?.url ?? params.media?.[0]?.path, + OriginatingChannel: params.channel, + OriginatingTo: params.reply.originatingTo, + ...params.extra, + }), + ); const recordInboundSessionMock = vi.fn(async () => undefined); const fetchRemoteMediaMock = vi.fn(async () => ({ buffer: Buffer.from("image-bytes"), @@ -188,6 +232,103 @@ export function createImageLifecycleCore() { ) as unknown as PluginRuntime["channel"]["reply"]["dispatchReplyWithBufferedBlockDispatcher"], }, turn: { + run: vi.fn(async (params: Parameters[0]) => { + const input = await params.adapter.ingest(params.raw); + if (!input) { + return { + admission: { kind: "drop" as const, reason: "ingest-null" }, + dispatched: false, + }; + } + const resolved = await params.adapter.resolveTurn( + input, + { + kind: "message", + canStartAgentTurn: true, + }, + {}, + ); + await resolved.recordInboundSession({ + storePath: resolved.storePath, + sessionKey: resolved.ctxPayload.SessionKey ?? resolved.routeSessionKey, + ctx: resolved.ctxPayload, + groupResolution: resolved.record?.groupResolution, + createIfMissing: resolved.record?.createIfMissing, + updateLastRoute: resolved.record?.updateLastRoute, + onRecordError: resolved.record?.onRecordError ?? (() => undefined), + }); + const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({ + ctx: resolved.ctxPayload, + cfg: resolved.cfg, + dispatcherOptions: { + ...resolved.dispatcherOptions, + deliver: async (payload, info) => { + await resolved.delivery.deliver(payload, info); + }, + onError: resolved.delivery.onError, + }, + replyOptions: resolved.replyOptions, + replyResolver: resolved.replyResolver, + }); + return { + admission: { kind: "dispatch" as const }, + dispatched: true, + ctxPayload: resolved.ctxPayload, + routeSessionKey: resolved.routeSessionKey, + dispatchResult, + }; + }) as unknown as PluginRuntime["channel"]["turn"]["run"], + runResolved: vi.fn( + async (params: Parameters[0]) => { + const input = + typeof params.input === "function" ? await params.input(params.raw) : params.input; + if (!input) { + return { + admission: { kind: "drop" as const, reason: "ingest-null" }, + dispatched: false, + }; + } + const resolved = await params.resolveTurn( + input, + { + kind: "message", + canStartAgentTurn: true, + }, + {}, + ); + await resolved.recordInboundSession({ + storePath: resolved.storePath, + sessionKey: resolved.ctxPayload.SessionKey ?? resolved.routeSessionKey, + ctx: resolved.ctxPayload, + groupResolution: resolved.record?.groupResolution, + createIfMissing: resolved.record?.createIfMissing, + updateLastRoute: resolved.record?.updateLastRoute, + onRecordError: resolved.record?.onRecordError ?? (() => undefined), + }); + const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({ + ctx: resolved.ctxPayload, + cfg: resolved.cfg, + dispatcherOptions: { + ...resolved.dispatcherOptions, + deliver: async (payload, info) => { + await resolved.delivery.deliver(payload, info); + }, + onError: resolved.delivery.onError, + }, + replyOptions: resolved.replyOptions, + replyResolver: resolved.replyResolver, + }); + return { + admission: { kind: "dispatch" as const }, + dispatched: true, + ctxPayload: resolved.ctxPayload, + routeSessionKey: resolved.routeSessionKey, + dispatchResult, + }; + }, + ) as unknown as PluginRuntime["channel"]["turn"]["runResolved"], + buildContext: + buildChannelTurnContextMock as unknown as PluginRuntime["channel"]["turn"]["buildContext"], dispatchAssembled: vi.fn( async (turn: Parameters[0]) => { await turn.recordInboundSession({ diff --git a/extensions/zalouser/src/monitor.group-gating.test.ts b/extensions/zalouser/src/monitor.group-gating.test.ts index e0e831a17e9..6f91ed0b3a4 100644 --- a/extensions/zalouser/src/monitor.group-gating.test.ts +++ b/extensions/zalouser/src/monitor.group-gating.test.ts @@ -122,6 +122,68 @@ function installRuntime(params: { }; }, ); + const runTurn = vi.fn(async (params: Parameters[0]) => { + const input = await params.adapter.ingest(params.raw); + if (!input) { + return { admission: { kind: "drop" as const, reason: "ingest-null" }, dispatched: false }; + } + const resolved = await params.adapter.resolveTurn( + input, + { + kind: "message", + canStartAgentTurn: true, + }, + {}, + ); + return await dispatchAssembled(resolved); + }); + const runResolvedTurn = vi.fn( + async (params: Parameters[0]) => { + const input = + typeof params.input === "function" ? await params.input(params.raw) : params.input; + if (!input) { + return { + admission: { kind: "drop" as const, reason: "ingest-null" }, + dispatched: false, + }; + } + const resolved = await params.resolveTurn( + input, + { + kind: "message", + canStartAgentTurn: true, + }, + {}, + ); + return await dispatchAssembled(resolved); + }, + ); + const buildContext = vi.fn( + (params: Parameters[0]) => + ({ + Body: params.message.body ?? params.message.rawBody, + BodyForAgent: params.message.bodyForAgent ?? params.message.rawBody, + InboundHistory: params.message.inboundHistory, + RawBody: params.message.rawBody, + CommandBody: params.message.commandBody ?? params.message.rawBody, + BodyForCommands: params.message.commandBody ?? params.message.rawBody, + From: params.from, + To: params.reply.to, + SessionKey: params.route.dispatchSessionKey ?? params.route.routeSessionKey, + AccountId: params.route.accountId ?? params.accountId, + ChatType: params.conversation.kind, + ConversationLabel: params.conversation.label, + SenderName: params.sender.name, + SenderId: params.sender.id, + Provider: params.provider ?? params.channel, + Surface: params.surface ?? params.provider ?? params.channel, + MessageSid: params.messageId, + MessageSidFull: params.messageIdFull, + OriginatingChannel: params.channel, + OriginatingTo: params.reply.originatingTo, + ...params.extra, + }) as ReturnType, + ); const buildAgentSessionKey = vi.fn( (input: { agentId: string; @@ -201,6 +263,9 @@ function installRuntime(params: { dispatchReplyWithBufferedBlockDispatcher, }, turn: { + run: runTurn as unknown as PluginRuntime["channel"]["turn"]["run"], + runResolved: runResolvedTurn as unknown as PluginRuntime["channel"]["turn"]["runResolved"], + buildContext: buildContext as unknown as PluginRuntime["channel"]["turn"]["buildContext"], dispatchAssembled: dispatchAssembled as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"], }, diff --git a/extensions/zalouser/src/monitor.ts b/extensions/zalouser/src/monitor.ts index 89eb10accc1..3c83d1195bf 100644 --- a/extensions/zalouser/src/monitor.ts +++ b/extensions/zalouser/src/monitor.ts @@ -592,40 +592,62 @@ async function processMessage( : undefined; const normalizedTo = isGroup ? `zalouser:group:${chatId}` : `zalouser:${chatId}`; + const messageSid = resolveZalouserMessageSid({ + msgId: message.msgId, + cliMsgId: message.cliMsgId, + fallback: `${message.timestampMs}`, + }); + const messageSidFull = formatZalouserMessageSidFull({ + msgId: message.msgId, + cliMsgId: message.cliMsgId, + }); - const ctxPayload = core.channel.reply.finalizeInboundContext({ - Body: combinedBody, - BodyForAgent: rawBody, - InboundHistory: inboundHistory, - RawBody: rawBody, - CommandBody: commandBody, - BodyForCommands: commandBody, - From: isGroup ? `zalouser:group:${chatId}` : `zalouser:${senderId}`, - To: normalizedTo, - SessionKey: inboundSessionKey, - AccountId: route.accountId, - ChatType: isGroup ? "group" : "direct", - ConversationLabel: fromLabel, - GroupSubject: isGroup ? groupName || undefined : undefined, - GroupChannel: isGroup ? groupName || undefined : undefined, - GroupMembers: isGroup ? groupMembers : undefined, - SenderName: senderName || undefined, - SenderId: senderId, - WasMentioned: isGroup ? mentionDecision.effectiveWasMentioned : undefined, - CommandAuthorized: commandAuthorized, - Provider: "zalouser", - Surface: "zalouser", - MessageSid: resolveZalouserMessageSid({ - msgId: message.msgId, - cliMsgId: message.cliMsgId, - fallback: `${message.timestampMs}`, - }), - MessageSidFull: formatZalouserMessageSidFull({ - msgId: message.msgId, - cliMsgId: message.cliMsgId, - }), - OriginatingChannel: "zalouser", - OriginatingTo: normalizedTo, + const ctxPayload = core.channel.turn.buildContext({ + channel: "zalouser", + accountId: route.accountId, + messageId: messageSid, + messageIdFull: messageSidFull, + timestamp: message.timestampMs, + from: isGroup ? `zalouser:group:${chatId}` : `zalouser:${senderId}`, + sender: { + id: senderId, + name: senderName || undefined, + }, + conversation: { + kind: isGroup ? "group" : "direct", + id: chatId, + label: fromLabel, + routePeer: { + kind: isGroup ? "group" : "direct", + id: chatId, + }, + }, + route: { + agentId: route.agentId, + accountId: route.accountId, + routeSessionKey: route.sessionKey, + dispatchSessionKey: inboundSessionKey, + }, + reply: { + to: normalizedTo, + originatingTo: normalizedTo, + }, + message: { + body: combinedBody, + bodyForAgent: rawBody, + rawBody, + commandBody, + inboundHistory, + envelopeFrom: fromLabel, + }, + extra: { + BodyForCommands: commandBody, + GroupSubject: isGroup ? groupName || undefined : undefined, + GroupChannel: isGroup ? groupName || undefined : undefined, + GroupMembers: isGroup ? groupMembers : undefined, + WasMentioned: isGroup ? mentionDecision.effectiveWasMentioned : undefined, + CommandAuthorized: commandAuthorized, + }, }); const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({ @@ -649,49 +671,64 @@ async function processMessage( }, }); - await core.channel.turn.dispatchAssembled({ - cfg: config, + await core.channel.turn.runResolved({ channel: "zalouser", accountId: account.accountId, - agentId: route.agentId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver: async (payload) => { - await deliverZalouserReply({ - payload: payload as { text?: string; mediaUrls?: string[]; mediaUrl?: string }, - profile: account.profile, - chatId, - isGroup, - runtime, - core, - config, - accountId: account.accountId, - statusSink, - tableMode: core.channel.text.resolveMarkdownTableMode({ - cfg: config, - channel: "zalouser", + raw: message, + input: { + id: messageSid ?? `${message.timestampMs}`, + timestamp: message.timestampMs, + rawText: rawBody, + textForAgent: rawBody, + textForCommands: commandBody, + raw: message, + }, + resolveTurn: () => ({ + cfg: config, + channel: "zalouser", + accountId: account.accountId, + agentId: route.agentId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload) => { + await deliverZalouserReply({ + payload: payload as { text?: string; mediaUrls?: string[]; mediaUrl?: string }, + profile: account.profile, + chatId, + isGroup, + runtime, + core, + config, accountId: account.accountId, - }), - }); + statusSink, + tableMode: core.channel.text.resolveMarkdownTableMode({ + cfg: config, + channel: "zalouser", + accountId: account.accountId, + }), + }); + }, + onError: (err, info) => { + runtime.error( + `[${account.accountId}] Zalouser ${info.kind} reply failed: ${String(err)}`, + ); + }, }, - onError: (err, info) => { - runtime.error(`[${account.accountId}] Zalouser ${info.kind} reply failed: ${String(err)}`); + dispatcherOptions: replyPipeline, + replyOptions: { + onModelSelected, }, - }, - dispatcherOptions: replyPipeline, - replyOptions: { - onModelSelected, - }, - record: { - onRecordError: (err) => { - runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`); + record: { + onRecordError: (err) => { + runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`); + }, }, - }, + }), }); if (isGroup && historyKey) { clearHistoryEntriesIfEnabled({ diff --git a/src/channels/turn/context.ts b/src/channels/turn/context.ts index 230928d807f..306a32be257 100644 --- a/src/channels/turn/context.ts +++ b/src/channels/turn/context.ts @@ -1,5 +1,5 @@ import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; -import type { FinalizedMsgContext, MsgContext } from "../../auto-reply/templating.js"; +import type { FinalizedMsgContext } from "../../auto-reply/templating.js"; import type { AccessFacts, ConversationFacts, @@ -28,7 +28,7 @@ export type BuildChannelTurnContextParams = { access?: AccessFacts; media?: InboundMediaFacts[]; supplemental?: SupplementalContextFacts; - extra?: MsgContext; + extra?: Record; }; function compactStrings(values: Array): string[] | undefined { diff --git a/src/channels/turn/kernel.ts b/src/channels/turn/kernel.ts index 6ed51bd78c6..4abda92bf2f 100644 --- a/src/channels/turn/kernel.ts +++ b/src/channels/turn/kernel.ts @@ -12,6 +12,7 @@ import type { PreparedChannelTurn, PreflightFacts, RunChannelTurnParams, + RunResolvedChannelTurnParams, } from "./types.js"; export type { AccessFacts, @@ -37,6 +38,7 @@ export type { ReplyPlanFacts, RouteFacts, RunChannelTurnParams, + RunResolvedChannelTurnParams, SenderFacts, SupplementalContextFacts, } from "./types.js"; @@ -307,3 +309,18 @@ export async function runChannelTurn( return result; } + +export async function runResolvedChannelTurn( + params: RunResolvedChannelTurnParams, +): Promise { + return await runChannelTurn({ + channel: params.channel, + accountId: params.accountId, + raw: params.raw, + log: params.log, + adapter: { + ingest: (raw) => (typeof params.input === "function" ? params.input(raw) : params.input), + resolveTurn: params.resolveTurn, + }, + }); +} diff --git a/src/channels/turn/types.ts b/src/channels/turn/types.ts index fc914e94200..5c75849392a 100644 --- a/src/channels/turn/types.ts +++ b/src/channels/turn/types.ts @@ -293,3 +293,18 @@ export type RunChannelTurnParams = { adapter: ChannelTurnAdapter; log?: (event: ChannelTurnLogEvent) => void; }; + +export type RunResolvedChannelTurnParams = { + channel: string; + accountId?: string; + raw: TRaw; + input: + | NormalizedTurnInput + | ((raw: TRaw) => Promise | NormalizedTurnInput | null); + resolveTurn: ( + input: NormalizedTurnInput, + eventClass: ChannelEventClass, + preflight: PreflightFacts, + ) => Promise | ChannelTurnResolved; + log?: (event: ChannelTurnLogEvent) => void; +}; diff --git a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts index 3e0fdb028da..2dd1e856c82 100644 --- a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts +++ b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts @@ -199,11 +199,21 @@ export function createPluginRuntimeMock(overrides: DeepPartial = To: params.reply.to, SessionKey: params.route.dispatchSessionKey ?? params.route.routeSessionKey, AccountId: params.route.accountId ?? params.accountId, + MessageSid: params.messageId, + MessageSidFull: params.messageIdFull, + ReplyToId: params.reply.replyToId ?? params.supplemental?.quote?.id, + ReplyToIdFull: params.reply.replyToIdFull ?? params.supplemental?.quote?.fullId, + MediaPath: params.media?.[0]?.path, + MediaUrl: params.media?.[0]?.url ?? params.media?.[0]?.path, + MediaType: params.media?.[0]?.contentType ?? params.media?.[0]?.kind, ChatType: params.conversation.kind, ConversationLabel: params.conversation.label, SenderName: params.sender.name ?? params.sender.displayLabel, SenderId: params.sender.id, SenderUsername: params.sender.username, + Timestamp: params.timestamp, + WasMentioned: params.access?.mentions?.wasMentioned, + GroupSystemPrompt: params.supplemental?.groupSystemPrompt, Provider: params.provider ?? params.channel, Surface: params.surface ?? params.provider ?? params.channel, OriginatingChannel: params.channel, @@ -214,6 +224,28 @@ export function createPluginRuntimeMock(overrides: DeepPartial = ...params.extra, }) as ReturnType, ) as unknown as PluginRuntime["channel"]["turn"]["buildContext"]; + const runResolvedChannelTurnMock = vi.fn( + async (params: Parameters[0]) => { + const input = + typeof params.input === "function" ? await params.input(params.raw) : params.input; + if (!input) { + return { + admission: { kind: "drop" as const, reason: "ingest-null" }, + dispatched: false, + }; + } + return await runChannelTurnMock({ + channel: params.channel, + accountId: params.accountId, + raw: params.raw, + log: params.log, + adapter: { + ingest: () => input, + resolveTurn: params.resolveTurn, + }, + }); + }, + ) as unknown as PluginRuntime["channel"]["turn"]["runResolved"]; const base: PluginRuntime = { version: "1.0.0-test", config: { @@ -568,6 +600,7 @@ export function createPluginRuntimeMock(overrides: DeepPartial = }, turn: { run: runChannelTurnMock, + runResolved: runResolvedChannelTurnMock, buildContext: buildChannelTurnContextMock, runPrepared: runPreparedChannelTurnMock, dispatchAssembled: dispatchAssembledChannelTurnMock, diff --git a/src/plugins/runtime/runtime-channel.ts b/src/plugins/runtime/runtime-channel.ts index 3813877d2e3..8e25706dae6 100644 --- a/src/plugins/runtime/runtime-channel.ts +++ b/src/plugins/runtime/runtime-channel.ts @@ -55,6 +55,7 @@ import { dispatchAssembledChannelTurn, runChannelTurn, runPreparedChannelTurn, + runResolvedChannelTurn, } from "../../channels/turn/kernel.js"; import { resolveChannelGroupPolicy, @@ -173,6 +174,7 @@ export function createRuntimeChannel(): PluginRuntime["channel"] { }, turn: { run: runChannelTurn, + runResolved: runResolvedChannelTurn, buildContext: buildChannelTurnContext, runPrepared: runPreparedChannelTurn, dispatchAssembled: dispatchAssembledChannelTurn, diff --git a/src/plugins/runtime/types-channel.ts b/src/plugins/runtime/types-channel.ts index e7341d932da..8db2c944497 100644 --- a/src/plugins/runtime/types-channel.ts +++ b/src/plugins/runtime/types-channel.ts @@ -153,6 +153,7 @@ export type PluginRuntimeChannel = { }; turn: { run: typeof import("../../channels/turn/kernel.js").runChannelTurn; + runResolved: typeof import("../../channels/turn/kernel.js").runResolvedChannelTurn; buildContext: typeof import("../../channels/turn/kernel.js").buildChannelTurnContext; runPrepared: typeof import("../../channels/turn/kernel.js").runPreparedChannelTurn; dispatchAssembled: typeof import("../../channels/turn/kernel.js").dispatchAssembledChannelTurn;