From 0df9f297b60cd774cc5f81b4b4143327dda79e0d Mon Sep 17 00:00:00 2001 From: Firas Alswihry Date: Fri, 22 May 2026 03:08:00 +0300 Subject: [PATCH] fix(gateway): mirror source message sends into transcript (#84837) Co-authored-by: Firas Alswihry --- CHANGELOG.md | 1 + docs/install/installer.md | 24 +- src/gateway/server-methods/send.test.ts | 462 ++++++++++++++++++++++ src/gateway/server-methods/send.ts | 34 ++ src/infra/outbound/payloads.ts | 84 +++- src/infra/outbound/source-reply-mirror.ts | 165 ++++++++ 6 files changed, 757 insertions(+), 13 deletions(-) create mode 100644 src/infra/outbound/source-reply-mirror.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ad53ba8605..210fafe4d09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai - Agents/subagents: surface blocked child-run completions as errors instead of successful subagent finishes. (#80886) Thanks @TurboTheTurtle. - WhatsApp: update Baileys to `7.0.0-rc13` and drop the obsolete logger type patch. - Install/update: reject OpenClaw GitHub source package targets early and point moving-main users at the dev/git install path instead of the broken npm source-install flow. +- Gateway: mirror successful same-source message-tool sends into session transcripts so delivered replies stay in later history/context. (#84837) Thanks @iFiras-Max1. - Infra/json: retry transient `File changed during read` races while loading JSON state so config and state reads recover instead of failing the turn. (#84285) - Providers/Ollama: resolve configured Ollama Cloud `OLLAMA_API_KEY` markers to the real discovery key so cloud provider entries keep authenticated model catalog access. (#85037) - Discord: keep persistent component registry fallback warnings actionable by forwarding structured error and cause metadata through the runtime logger. Fixes #84185. (#84190) Thanks @100menotu001. diff --git a/docs/install/installer.md b/docs/install/installer.md index dcea5726d22..e823206e09c 100644 --- a/docs/install/installer.md +++ b/docs/install/installer.md @@ -154,19 +154,19 @@ The script exits with code `2` for invalid method selection or invalid `--instal -| Variable | Description | -| ------------------------------------------------------- | --------------------------------------------- | -| `OPENCLAW_INSTALL_METHOD=git\|npm` | Install method | +| Variable | Description | +| ------------------------------------------------- | --------------------------------------------- | +| `OPENCLAW_INSTALL_METHOD=git\|npm` | Install method | | `OPENCLAW_VERSION=latest\|next\|\|` | npm version, dist-tag, or package spec | -| `OPENCLAW_BETA=0\|1` | Use beta if available | -| `OPENCLAW_GIT_DIR=` | Checkout directory | -| `OPENCLAW_GIT_UPDATE=0\|1` | Toggle git updates | -| `OPENCLAW_NO_PROMPT=1` | Disable prompts | -| `OPENCLAW_NO_ONBOARD=1` | Skip onboarding | -| `OPENCLAW_DRY_RUN=1` | Dry run mode | -| `OPENCLAW_VERBOSE=1` | Debug mode | -| `OPENCLAW_NPM_LOGLEVEL=error\|warn\|notice` | npm log level | -| `SHARP_IGNORE_GLOBAL_LIBVIPS=0\|1` | Control sharp/libvips behavior (default: `1`) | +| `OPENCLAW_BETA=0\|1` | Use beta if available | +| `OPENCLAW_GIT_DIR=` | Checkout directory | +| `OPENCLAW_GIT_UPDATE=0\|1` | Toggle git updates | +| `OPENCLAW_NO_PROMPT=1` | Disable prompts | +| `OPENCLAW_NO_ONBOARD=1` | Skip onboarding | +| `OPENCLAW_DRY_RUN=1` | Dry run mode | +| `OPENCLAW_VERBOSE=1` | Debug mode | +| `OPENCLAW_NPM_LOGLEVEL=error\|warn\|notice` | npm log level | +| `SHARP_IGNORE_GLOBAL_LIBVIPS=0\|1` | Control sharp/libvips behavior (default: `1`) | diff --git a/src/gateway/server-methods/send.test.ts b/src/gateway/server-methods/send.test.ts index 07dfad87e26..66de9fdcd9e 100644 --- a/src/gateway/server-methods/send.test.ts +++ b/src/gateway/server-methods/send.test.ts @@ -1297,6 +1297,468 @@ describe("gateway send mirroring", () => { ); }); + it("mirrors successful source-conversation message.action sends into the assistant transcript", async () => { + const telegramPlugin: ChannelPlugin = { + id: "telegram", + meta: { + id: "telegram", + label: "Telegram", + selectionLabel: "Telegram", + docsPath: "/channels/telegram", + blurb: "Telegram source send transcript mirror test plugin.", + }, + capabilities: { chatTypes: ["direct"] }, + config: { + listAccountIds: () => ["default"], + resolveAccount: () => ({ enabled: true }), + isConfigured: () => true, + }, + actions: { + describeMessageTool: () => ({ actions: ["send"] }), + supportsAction: ({ action }) => action === "send", + handleAction: async () => jsonResult({ ok: true, messageId: "tg-1" }), + }, + threading: { + resolveCurrentChannelId: ({ to, threadId }) => + threadId == null ? to : `${to}:topic:${threadId}`, + }, + }; + mocks.getChannelPlugin.mockReturnValue(telegramPlugin); + setActivePluginRegistry( + createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]), + "send-test-source-message-action-mirror", + ); + mocks.dispatchChannelMessageAction.mockResolvedValueOnce( + jsonResult({ ok: true, messageId: "tg-1" }), + ); + + const { respond } = await runMessageActionRequest({ + channel: "telegram", + action: "send", + params: { + to: "chat-123", + message: "visible source reply", + }, + sessionKey: "agent:main:telegram:direct:chat-123", + agentId: "main", + toolContext: { + currentChannelProvider: "telegram", + currentChannelId: "chat-123", + }, + idempotencyKey: "idem-source-message-action", + }); + + expect(firstRespondCall(respond)[0]).toBe(true); + expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith({ + agentId: "main", + sessionKey: "agent:main:telegram:direct:chat-123", + text: "visible source reply", + mediaUrls: undefined, + idempotencyKey: "idem-source-message-action", + config: {}, + }); + }); + + it("mirrors accepted source send text aliases", async () => { + mocks.dispatchChannelMessageAction.mockResolvedValueOnce( + jsonResult({ ok: true, messageId: "tg-content-1" }), + ); + + const { respond } = await runMessageActionRequest({ + channel: "telegram", + action: "send", + params: { + to: "chat-123", + content: "visible content alias reply", + }, + sessionKey: "agent:main:telegram:direct:chat-123", + agentId: "main", + toolContext: { + currentChannelProvider: "telegram", + currentChannelId: "chat-123", + }, + idempotencyKey: "idem-content-source-message-action", + }); + + expect(firstRespondCall(respond)[0]).toBe(true); + expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith({ + agentId: "main", + sessionKey: "agent:main:telegram:direct:chat-123", + text: "visible content alias reply", + mediaUrls: undefined, + idempotencyKey: "idem-content-source-message-action", + config: {}, + }); + }); + + it("keeps delivered source sends successful when transcript mirroring fails", async () => { + mocks.dispatchChannelMessageAction.mockResolvedValueOnce( + jsonResult({ ok: true, messageId: "tg-mirror-failed" }), + ); + mocks.appendAssistantMessageToSessionTranscript.mockRejectedValueOnce( + new Error("transcript unavailable"), + ); + + const { respond } = await runMessageActionRequest({ + channel: "telegram", + action: "send", + params: { + to: "chat-123", + message: "visible source reply", + }, + sessionKey: "agent:main:telegram:direct:chat-123", + agentId: "main", + toolContext: { + currentChannelProvider: "telegram", + currentChannelId: "chat-123", + }, + idempotencyKey: "idem-source-message-action-mirror-failed", + }); + + const call = firstRespondCall(respond); + expect(call[0]).toBe(true); + expect(call[1]).toEqual({ ok: true, messageId: "tg-mirror-failed" }); + expect(call[2]).toBeUndefined(); + expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledOnce(); + }); + + it("mirrors caption-only source sends with media", async () => { + mocks.dispatchChannelMessageAction.mockResolvedValueOnce( + jsonResult({ ok: true, messageId: "tg-caption-1" }), + ); + + const { respond } = await runMessageActionRequest({ + channel: "telegram", + action: "send", + params: { + to: "chat-123", + mediaUrl: "https://example.com/image.png", + caption: "visible media caption", + }, + sessionKey: "agent:main:telegram:direct:chat-123", + agentId: "main", + toolContext: { + currentChannelProvider: "telegram", + currentChannelId: "chat-123", + }, + idempotencyKey: "idem-caption-source-message-action", + }); + + expect(firstRespondCall(respond)[0]).toBe(true); + expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith({ + agentId: "main", + sessionKey: "agent:main:telegram:direct:chat-123", + text: "visible media caption", + mediaUrls: ["https://example.com/image.png"], + idempotencyKey: "idem-caption-source-message-action", + config: {}, + }); + }); + + it("mirrors presentation-only source-conversation message.action sends", async () => { + const telegramPlugin: ChannelPlugin = { + id: "telegram", + meta: { + id: "telegram", + label: "Telegram", + selectionLabel: "Telegram", + docsPath: "/channels/telegram", + blurb: "Telegram source send rich transcript mirror test plugin.", + }, + capabilities: { chatTypes: ["direct"] }, + config: { + listAccountIds: () => ["default"], + resolveAccount: () => ({ enabled: true }), + isConfigured: () => true, + }, + actions: { + describeMessageTool: () => ({ actions: ["send"] }), + supportsAction: ({ action }) => action === "send", + handleAction: async () => jsonResult({ ok: true, messageId: "tg-rich-1" }), + }, + threading: { + resolveCurrentChannelId: ({ to, threadId }) => + threadId == null ? to : `${to}:topic:${threadId}`, + }, + }; + mocks.getChannelPlugin.mockReturnValue(telegramPlugin); + setActivePluginRegistry( + createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]), + "send-test-rich-source-message-action-mirror", + ); + mocks.dispatchChannelMessageAction.mockResolvedValueOnce( + jsonResult({ ok: true, messageId: "tg-rich-1" }), + ); + + const { respond } = await runMessageActionRequest({ + channel: "telegram", + action: "send", + params: { + to: "chat-123", + presentation: { + title: "Approval needed", + blocks: [ + { type: "text", text: "Review the deployment request" }, + { + type: "buttons", + buttons: [ + { label: "Approve", value: "approve" }, + { label: "Reject", value: "reject" }, + ], + }, + ], + }, + }, + sessionKey: "agent:main:telegram:direct:chat-123", + agentId: "main", + toolContext: { + currentChannelProvider: "telegram", + currentChannelId: "chat-123", + }, + idempotencyKey: "idem-rich-source-message-action", + }); + + expect(firstRespondCall(respond)[0]).toBe(true); + expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith({ + agentId: "main", + sessionKey: "agent:main:telegram:direct:chat-123", + text: "Approval needed\nReview the deployment request\nApprove\nReject", + mediaUrls: undefined, + idempotencyKey: "idem-rich-source-message-action", + config: {}, + }); + }); + + it("mirrors title-only source-conversation presentation sends", async () => { + const telegramPlugin: ChannelPlugin = { + id: "telegram", + meta: { + id: "telegram", + label: "Telegram", + selectionLabel: "Telegram", + docsPath: "/channels/telegram", + blurb: "Telegram source send title-only transcript mirror test plugin.", + }, + capabilities: { chatTypes: ["direct"] }, + config: { + listAccountIds: () => ["default"], + resolveAccount: () => ({ enabled: true }), + isConfigured: () => true, + }, + actions: { + describeMessageTool: () => ({ actions: ["send"] }), + supportsAction: ({ action }) => action === "send", + handleAction: async () => jsonResult({ ok: true, messageId: "tg-title-1" }), + }, + }; + mocks.getChannelPlugin.mockReturnValue(telegramPlugin); + setActivePluginRegistry( + createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]), + "send-test-title-only-source-message-action-mirror", + ); + mocks.dispatchChannelMessageAction.mockResolvedValueOnce( + jsonResult({ ok: true, messageId: "tg-title-1" }), + ); + + const { respond } = await runMessageActionRequest({ + channel: "telegram", + action: "send", + params: { + to: "chat-123", + presentation: { + title: "Title-only approval", + }, + }, + sessionKey: "agent:main:telegram:direct:chat-123", + agentId: "main", + toolContext: { + currentChannelProvider: "telegram", + currentChannelId: "chat-123", + }, + idempotencyKey: "idem-title-only-source-message-action", + }); + + expect(firstRespondCall(respond)[0]).toBe(true); + expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith({ + agentId: "main", + sessionKey: "agent:main:telegram:direct:chat-123", + text: "Title-only approval", + mediaUrls: undefined, + idempotencyKey: "idem-title-only-source-message-action", + config: {}, + }); + }); + + it("mirrors auto-threaded Telegram source sends into the topic transcript", async () => { + const telegramTopicPlugin: ChannelPlugin = { + id: "telegram", + meta: { + id: "telegram", + label: "Telegram", + selectionLabel: "Telegram", + docsPath: "/channels/telegram", + blurb: "Telegram topic source send transcript mirror test plugin.", + }, + capabilities: { chatTypes: ["group"] }, + config: { + listAccountIds: () => ["default"], + resolveAccount: () => ({ enabled: true }), + isConfigured: () => true, + }, + actions: { + describeMessageTool: () => ({ actions: ["send"] }), + supportsAction: ({ action }) => action === "send", + handleAction: async () => jsonResult({ ok: true, messageId: "tg-topic-1" }), + }, + threading: { + resolveCurrentChannelId: ({ to, threadId }) => + threadId == null ? to : `${to}:topic:${threadId}`, + }, + }; + mocks.getChannelPlugin.mockReturnValue(telegramTopicPlugin); + setActivePluginRegistry( + createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramTopicPlugin }]), + "send-test-topic-source-message-action-mirror", + ); + mocks.dispatchChannelMessageAction.mockResolvedValueOnce( + jsonResult({ ok: true, messageId: "tg-topic-1" }), + ); + + const { respond } = await runMessageActionRequest({ + channel: "telegram", + action: "send", + params: { + to: "chat-123", + message: "visible topic source reply", + messageThreadId: "77", + }, + sessionKey: "agent:main:telegram:group:chat-123:topic:77", + agentId: "main", + toolContext: { + currentChannelProvider: "telegram", + currentChannelId: "chat-123:topic:77", + currentThreadTs: "77", + }, + idempotencyKey: "idem-topic-source-message-action", + }); + + expect(firstRespondCall(respond)[0]).toBe(true); + expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith({ + agentId: "main", + sessionKey: "agent:main:telegram:group:chat-123:topic:77", + text: "visible topic source reply", + mediaUrls: undefined, + idempotencyKey: "idem-topic-source-message-action", + config: {}, + }); + }); + + it("does not mirror topic context when delivery params target the parent chat", async () => { + const telegramTopicPlugin: ChannelPlugin = { + id: "telegram", + meta: { + id: "telegram", + label: "Telegram", + selectionLabel: "Telegram", + docsPath: "/channels/telegram", + blurb: "Telegram parent send transcript mirror test plugin.", + }, + capabilities: { chatTypes: ["group"] }, + config: { + listAccountIds: () => ["default"], + resolveAccount: () => ({ enabled: true }), + isConfigured: () => true, + }, + actions: { + describeMessageTool: () => ({ actions: ["send"] }), + supportsAction: ({ action }) => action === "send", + handleAction: async () => jsonResult({ ok: true, messageId: "tg-parent-1" }), + }, + threading: { + resolveCurrentChannelId: ({ to, threadId }) => + threadId == null ? to : `${to}:topic:${threadId}`, + }, + }; + mocks.getChannelPlugin.mockReturnValue(telegramTopicPlugin); + setActivePluginRegistry( + createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramTopicPlugin }]), + "send-test-topic-context-parent-message-action-mirror", + ); + mocks.dispatchChannelMessageAction.mockResolvedValueOnce( + jsonResult({ ok: true, messageId: "tg-parent-1" }), + ); + + const { respond } = await runMessageActionRequest({ + channel: "telegram", + action: "send", + params: { + to: "chat-123", + message: "visible parent source reply", + }, + sessionKey: "agent:main:telegram:group:chat-123:topic:77", + agentId: "main", + toolContext: { + currentChannelProvider: "telegram", + currentChannelId: "chat-123:topic:77", + currentThreadTs: "77", + }, + idempotencyKey: "idem-topic-context-parent-message-action", + }); + + expect(firstRespondCall(respond)[0]).toBe(true); + expect(mocks.appendAssistantMessageToSessionTranscript).not.toHaveBeenCalled(); + }); + + it("does not mirror message.action sends to a different target", async () => { + mocks.dispatchChannelMessageAction.mockResolvedValueOnce( + jsonResult({ ok: true, messageId: "tg-external" }), + ); + + const { respond } = await runMessageActionRequest({ + channel: "telegram", + action: "send", + params: { + to: "other-chat", + message: "external visible reply", + }, + sessionKey: "agent:main:telegram:direct:chat-123", + agentId: "main", + toolContext: { + currentChannelProvider: "telegram", + currentChannelId: "chat-123", + }, + idempotencyKey: "idem-external-message-action", + }); + + expect(firstRespondCall(respond)[0]).toBe(true); + expect(mocks.appendAssistantMessageToSessionTranscript).not.toHaveBeenCalled(); + }); + + it("does not mirror explicitly failed message.action sends", async () => { + mocks.dispatchChannelMessageAction.mockResolvedValueOnce( + jsonResult({ ok: false, error: "delivery failed" }), + ); + + const { respond } = await runMessageActionRequest({ + channel: "telegram", + action: "send", + params: { + to: "chat-123", + message: "failed source reply", + }, + sessionKey: "agent:main:telegram:direct:chat-123", + agentId: "main", + toolContext: { + currentChannelProvider: "telegram", + currentChannelId: "chat-123", + }, + idempotencyKey: "idem-failed-message-action", + }); + + expect(firstRespondCall(respond)[0]).toBe(true); + expect(mocks.appendAssistantMessageToSessionTranscript).not.toHaveBeenCalled(); + }); + it("passes agent-scoped media roots to gateway message actions", async () => { const mediaActionPlugin: ChannelPlugin = { id: "telegram", diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index 99d9475b469..bced1b4a096 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -16,6 +16,7 @@ import { projectOutboundPayloadPlanForMirror, } from "../../infra/outbound/payloads.js"; import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js"; +import { mirrorDeliveredSourceReplyToTranscript } from "../../infra/outbound/source-reply-mirror.js"; import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js"; import { resolveOutboundTarget } from "../../infra/outbound/targets.js"; import { extractToolPayload } from "../../infra/outbound/tool-payload.js"; @@ -264,6 +265,21 @@ function createGatewayInflightUnavailableFailure(params: { }; } +async function mirrorDeliveredSourceReplyToTranscriptBestEffort(params: { + context: GatewayRequestContext; + mirror: Parameters[0]; +}) { + try { + await mirrorDeliveredSourceReplyToTranscript(params.mirror); + } catch (err) { + params.context.logGateway?.warn?.("Source reply transcript mirror failed after delivery.", { + error: formatForLog(err), + channel: params.mirror.channel, + sessionKey: params.mirror.sessionKey, + }); + } +} + export const sendHandlers: GatewayRequestHandlers = { "message.action": async ({ params, respond, context, client }) => { const p = params; @@ -371,6 +387,24 @@ export const sendHandlers: GatewayRequestHandlers = { return { ok: false, error, meta: { channel } }; } const payload = extractToolPayload(handled); + const sessionKey = normalizeOptionalString(request.sessionKey) ?? undefined; + const agentId = + normalizeOptionalString(request.agentId) ?? + (sessionKey ? resolveSessionAgentId({ sessionKey, config: cfg }) : undefined); + await mirrorDeliveredSourceReplyToTranscriptBestEffort({ + context, + mirror: { + action: request.action, + channel, + actionParams: request.params, + cfg, + sessionKey, + agentId, + toolContext: request.toolContext, + idempotencyKey: request.idempotencyKey, + deliveredPayload: payload, + }, + }); return createGatewayInflightSuccess({ context, dedupeKey, payload, channel }); } catch (err) { return createGatewayInflightUnavailableFailure({ context, dedupeKey, channel, err }); diff --git a/src/infra/outbound/payloads.ts b/src/infra/outbound/payloads.ts index 4cb509eb083..b5d0ec0c87d 100644 --- a/src/infra/outbound/payloads.ts +++ b/src/infra/outbound/payloads.ts @@ -12,6 +12,8 @@ import { hasMessagePresentationBlocks, hasReplyChannelData, hasReplyPayloadContent, + normalizeInteractiveReply, + normalizeMessagePresentation, type InteractiveReply, type MessagePresentation, type ReplyPayloadDelivery, @@ -63,6 +65,86 @@ export type OutboundPayloadMirror = { mediaUrls: string[]; }; +function collectPresentationMirrorText(presentation: MessagePresentation | undefined): string[] { + if (!presentation) { + return []; + } + const lines: string[] = []; + if (presentation.title?.trim()) { + lines.push(presentation.title.trim()); + } + for (const block of presentation.blocks) { + if ((block.type === "text" || block.type === "context") && block.text.trim()) { + lines.push(block.text.trim()); + continue; + } + if (block.type === "buttons") { + for (const button of block.buttons) { + if (button.label.trim()) { + lines.push(button.label.trim()); + } + } + continue; + } + if (block.type === "select") { + if (block.placeholder?.trim()) { + lines.push(block.placeholder.trim()); + } + for (const option of block.options) { + if (option.label.trim()) { + lines.push(option.label.trim()); + } + } + } + } + return lines; +} + +function collectInteractiveMirrorText(interactive: InteractiveReply | undefined): string[] { + if (!interactive) { + return []; + } + const lines: string[] = []; + for (const block of interactive.blocks) { + if (block.type === "text" && block.text.trim()) { + lines.push(block.text.trim()); + continue; + } + if (block.type === "buttons") { + for (const button of block.buttons) { + if (button.label.trim()) { + lines.push(button.label.trim()); + } + } + continue; + } + if (block.type === "select") { + if (block.placeholder?.trim()) { + lines.push(block.placeholder.trim()); + } + for (const option of block.options) { + if (option.label.trim()) { + lines.push(option.label.trim()); + } + } + } + } + return lines; +} + +function resolveOutboundMirrorText(entry: OutboundPayloadPlan): string { + const text = entry.parts.text.trim() ? entry.parts.text : entry.payload.text; + if (text?.trim()) { + return text; + } + const presentation = normalizeMessagePresentation(entry.payload.presentation); + const interactive = normalizeInteractiveReply(entry.payload.interactive); + return [ + ...collectPresentationMirrorText(presentation), + ...collectInteractiveMirrorText(interactive), + ].join("\n"); +} + function isSuppressedRelayStatusText(text: string): boolean { const normalized = text.trim(); if (!normalized) { @@ -265,7 +347,7 @@ export function projectOutboundPayloadPlanForMirror( ): OutboundPayloadMirror { return { text: plan - .map((entry) => entry.payload.text) + .map(resolveOutboundMirrorText) .filter((text): text is string => Boolean(text)) .join("\n"), mediaUrls: plan.flatMap((entry) => entry.parts.mediaUrls), diff --git a/src/infra/outbound/source-reply-mirror.ts b/src/infra/outbound/source-reply-mirror.ts new file mode 100644 index 00000000000..966029714b9 --- /dev/null +++ b/src/infra/outbound/source-reply-mirror.ts @@ -0,0 +1,165 @@ +import type { ReplyPayload } from "../../auto-reply/types.js"; +import { getChannelPlugin } from "../../channels/plugins/index.js"; +import type { + ChannelId, + ChannelThreadingToolContext, +} from "../../channels/plugins/types.public.js"; +import { appendAssistantMessageToSessionTranscript } from "../../config/sessions.js"; +import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import { + normalizeOptionalLowercaseString, + normalizeOptionalString, +} from "../../shared/string-coerce.js"; +import { createOutboundPayloadPlan, projectOutboundPayloadPlanForMirror } from "./payloads.js"; + +type SourceReplyTranscriptMirrorParams = { + action: string; + channel: string; + actionParams: Record; + cfg: OpenClawConfig; + sessionKey?: string; + agentId?: string; + toolContext?: ChannelThreadingToolContext; + idempotencyKey?: string; + deliveredPayload?: unknown; +}; + +type MirrorableSourceReplyTranscriptParams = SourceReplyTranscriptMirrorParams & { + sessionKey: string; +}; + +function readStringArray(value: unknown): string[] | undefined { + if (!Array.isArray(value)) { + return undefined; + } + const normalized = value + .map((entry) => normalizeOptionalString(entry)) + .filter((entry): entry is string => Boolean(entry)); + return normalized.length ? normalized : undefined; +} + +function readFirstString( + params: Record, + keys: readonly string[], +): string | undefined { + for (const key of keys) { + const value = normalizeOptionalString(params[key]); + if (value) { + return value; + } + } + return undefined; +} + +function resolveSourceReplyTarget(params: Record): string | undefined { + return readFirstString(params, ["target", "to", "channelId", "chatId"]); +} + +function resolveSourceReplyThreadId(params: SourceReplyTranscriptMirrorParams): string | undefined { + return readFirstString(params.actionParams, ["threadId", "messageThreadId"]); +} + +function resolveThreadedSourceTarget( + params: SourceReplyTranscriptMirrorParams, + requestedTarget: string, +): string { + const threadId = resolveSourceReplyThreadId(params); + if (!threadId) { + return requestedTarget; + } + return ( + normalizeOptionalString( + getChannelPlugin(params.channel as ChannelId)?.threading?.resolveCurrentChannelId?.({ + to: requestedTarget, + threadId, + }), + ) ?? requestedTarget + ); +} + +function hasExplicitDeliveryFailure(payload: unknown): boolean { + if (!payload || typeof payload !== "object" || Array.isArray(payload)) { + return false; + } + const record = payload as Record; + if (record.ok === false) { + return true; + } + const status = normalizeOptionalLowercaseString(record.status); + if (status === "failed" || status === "error") { + return true; + } + const deliveryStatus = normalizeOptionalLowercaseString(record.deliveryStatus); + return deliveryStatus === "failed" || deliveryStatus === "error"; +} + +function isCurrentSourceConversation( + params: SourceReplyTranscriptMirrorParams, +): params is MirrorableSourceReplyTranscriptParams { + if (params.action !== "send") { + return false; + } + if (!params.sessionKey?.trim()) { + return false; + } + const currentChannel = normalizeOptionalLowercaseString( + params.toolContext?.currentChannelProvider, + ); + if (!currentChannel || currentChannel !== normalizeOptionalLowercaseString(params.channel)) { + return false; + } + const currentTarget = normalizeOptionalString(params.toolContext?.currentChannelId); + if (!currentTarget) { + return false; + } + const requestedTarget = resolveSourceReplyTarget(params.actionParams); + if (!requestedTarget) { + return false; + } + return ( + requestedTarget === currentTarget || + resolveThreadedSourceTarget(params, requestedTarget) === currentTarget + ); +} + +export async function mirrorDeliveredSourceReplyToTranscript( + params: SourceReplyTranscriptMirrorParams, +): Promise { + if (hasExplicitDeliveryFailure(params.deliveredPayload)) { + return false; + } + if (!isCurrentSourceConversation(params)) { + return false; + } + + const plan = createOutboundPayloadPlan([ + { + text: readFirstString(params.actionParams, ["message", "content", "text", "caption"]) ?? "", + mediaUrl: readFirstString(params.actionParams, [ + "mediaUrl", + "media", + "path", + "filePath", + "fileUrl", + ]), + mediaUrls: readStringArray(params.actionParams.mediaUrls), + presentation: params.actionParams.presentation as ReplyPayload["presentation"], + interactive: params.actionParams.interactive as ReplyPayload["interactive"], + channelData: params.actionParams.channelData as ReplyPayload["channelData"], + }, + ]); + const mirror = projectOutboundPayloadPlanForMirror(plan); + if (!mirror.text && mirror.mediaUrls.length === 0) { + return false; + } + + await appendAssistantMessageToSessionTranscript({ + agentId: params.agentId, + sessionKey: params.sessionKey, + text: mirror.text, + mediaUrls: mirror.mediaUrls.length ? mirror.mediaUrls : undefined, + idempotencyKey: params.idempotencyKey, + config: params.cfg, + }); + return true; +}