diff --git a/src/auto-reply/reply/agent-runner-payloads.test.ts b/src/auto-reply/reply/agent-runner-payloads.test.ts index 886f9d72e4c..b1273431fe0 100644 --- a/src/auto-reply/reply/agent-runner-payloads.test.ts +++ b/src/auto-reply/reply/agent-runner-payloads.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "vitest"; +import { beforeEach, describe, expect, it } from "vitest"; import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../../plugins/runtime.js"; import { createTestRegistry } from "../../test-utils/channel-plugins.js"; import { @@ -31,6 +31,10 @@ async function expectSameTargetRepliesDelivered(params: { provider: string; to: } describe("buildReplyPayloads media filter integration", () => { + beforeEach(() => { + resetPluginRuntimeStateForTest(); + }); + it("strips legacy bracket tool blocks from heartbeat replies", async () => { const { replyPayloads } = await buildReplyPayloads({ ...baseParams, diff --git a/src/auto-reply/reply/reply-dispatcher.ts b/src/auto-reply/reply/reply-dispatcher.ts index 9fad32e668e..61641ee6366 100644 --- a/src/auto-reply/reply/reply-dispatcher.ts +++ b/src/auto-reply/reply/reply-dispatcher.ts @@ -29,7 +29,7 @@ type ReplyDispatchSkipHandler = ( type ReplyDispatchDeliverer = ( payload: ReplyPayload, info: { kind: ReplyDispatchKind }, -) => Promise; +) => Promise; export type ReplyDispatchBeforeDeliver = ( payload: ReplyPayload, diff --git a/src/auto-reply/reply/reply-payloads-dedupe.ts b/src/auto-reply/reply/reply-payloads-dedupe.ts index ce2506b862c..cf76e25a187 100644 --- a/src/auto-reply/reply/reply-payloads-dedupe.ts +++ b/src/auto-reply/reply/reply-payloads-dedupe.ts @@ -1,8 +1,8 @@ import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js"; import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.types.js"; import { getChannelPlugin } from "../../channels/plugins/index.js"; +import { getLoadedChannelPluginForRead } from "../../channels/plugins/registry-loaded-read.js"; import { normalizeAnyChannelId } from "../../channels/registry.js"; -import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js"; import { channelRouteTargetsMatchExact, stringifyRouteThreadId, @@ -91,6 +91,18 @@ function normalizeThreadIdForComparison(value?: string): string | undefined { return stringifyRouteThreadId(value); } +function normalizeTargetForDedupe(provider: string, rawTarget?: string): string | undefined { + const fallback = normalizeOptionalString(rawTarget); + if (!fallback) { + return undefined; + } + const providerId = normalizeProviderForComparison(provider); + const normalizer = providerId + ? getLoadedChannelPluginForRead(providerId)?.messaging?.normalizeTarget + : undefined; + return normalizeOptionalString(normalizer?.(rawTarget ?? "") ?? fallback); +} + function resolveTargetProviderForComparison(params: { currentProvider: string; targetProvider?: string; @@ -113,7 +125,7 @@ function normalizeRouteTargetForDedupe(params: { accountId?: string; threadId?: string; }): MessagingToolDedupeRouteTarget | null { - const to = normalizeTargetForProvider(params.provider, params.rawTarget); + const to = normalizeTargetForDedupe(params.provider, params.rawTarget); if (!to) { return null; } diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index b9f7d5a8627..b72c19838fd 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -33,7 +33,7 @@ import { type SavedMedia, saveMediaBuffer, } from "../../media/store.js"; -import { createChannelReplyPipeline } from "../../plugin-sdk/channel-reply-pipeline.js"; +import { createChannelMessageReplyPipeline } from "../../plugin-sdk/channel-message.js"; import { isPluginOwnedSessionBindingRecord } from "../../plugins/conversation-binding.js"; import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; @@ -2247,7 +2247,7 @@ export const chatHandlers: GatewayRequestHandlers = { ctx.MediaStaged = true; } - const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({ + const { onModelSelected, ...replyPipeline } = createChannelMessageReplyPipeline({ cfg, agentId, channel: INTERNAL_MESSAGE_CHANNEL, diff --git a/src/gateway/server-restart-sentinel.test.ts b/src/gateway/server-restart-sentinel.test.ts index 1e596003dc2..cb6d887e5f7 100644 --- a/src/gateway/server-restart-sentinel.test.ts +++ b/src/gateway/server-restart-sentinel.test.ts @@ -3,7 +3,7 @@ import type { ChannelPlugin } from "../channels/plugins/types.plugin.js"; type LoadedSessionEntry = ReturnType; type RecordInboundSessionAndDispatchReplyParams = Parameters< - typeof import("../plugin-sdk/inbound-reply-dispatch.js").recordInboundSessionAndDispatchReply + typeof import("../plugin-sdk/channel-message.js").recordChannelMessageReplyDispatch >[0]; const mocks = vi.hoisted(() => { @@ -194,8 +194,8 @@ vi.mock("../infra/system-events.js", () => ({ enqueueSystemEvent: mocks.enqueueSystemEvent, })); -vi.mock("../plugin-sdk/inbound-reply-dispatch.js", () => ({ - recordInboundSessionAndDispatchReply: mocks.recordInboundSessionAndDispatchReply, +vi.mock("../plugin-sdk/channel-message.js", () => ({ + recordChannelMessageReplyDispatch: mocks.recordInboundSessionAndDispatchReply, })); vi.mock("../infra/heartbeat-wake.js", async () => { diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index 08a3ae8f2fc..cd9a9da0c0c 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -34,8 +34,8 @@ import { } from "../infra/session-delivery-queue.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { recordChannelMessageReplyDispatch } from "../plugin-sdk/channel-message.js"; import { stringifyRouteThreadId } from "../plugin-sdk/channel-route.js"; -import { recordInboundSessionAndDispatchReply } from "../plugin-sdk/inbound-reply-dispatch.js"; import type { OutboundReplyPayload } from "../plugin-sdk/reply-payload.js"; import { deliveryContextFromSession, @@ -272,7 +272,7 @@ async function deliverQueuedSessionDelivery(params: { config: cfg, }); let dispatchError: unknown; - await recordInboundSessionAndDispatchReply({ + await recordChannelMessageReplyDispatch({ cfg, channel: route.channel, accountId: route.accountId, diff --git a/src/infra/outbound/channel-bootstrap.runtime.ts b/src/infra/outbound/channel-bootstrap.runtime.ts index 8eb0dab460e..6c0c5aee934 100644 --- a/src/infra/outbound/channel-bootstrap.runtime.ts +++ b/src/infra/outbound/channel-bootstrap.runtime.ts @@ -1,13 +1,56 @@ +import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../../agents/agent-scope.js"; +import { applyPluginAutoEnable } from "../../config/plugin-auto-enable.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import { resolveRuntimePluginRegistry } from "../../plugins/loader.js"; +import { + getActivePluginChannelRegistry, + getActivePluginChannelRegistryVersion, +} from "../../plugins/runtime.js"; import type { DeliverableMessageChannel } from "../../utils/message-channel.js"; +const bootstrapAttempts = new Set(); + export function resetOutboundChannelBootstrapStateForTests(): void { - // Runtime channel plugins are loaded during Gateway startup now. + bootstrapAttempts.clear(); } export function bootstrapOutboundChannelPlugin(params: { channel: DeliverableMessageChannel; cfg?: OpenClawConfig; }): void { - void params; + const cfg = params.cfg; + if (!cfg) { + return; + } + + const activeChannelRegistry = getActivePluginChannelRegistry(); + const activeHasRequestedChannel = activeChannelRegistry?.channels?.some( + (entry) => entry?.plugin?.id === params.channel, + ); + if (activeHasRequestedChannel) { + return; + } + + const attemptKey = `${getActivePluginChannelRegistryVersion()}:${params.channel}`; + if (bootstrapAttempts.has(attemptKey)) { + return; + } + bootstrapAttempts.add(attemptKey); + + const autoEnabled = applyPluginAutoEnable({ config: cfg }); + const defaultAgentId = resolveDefaultAgentId(autoEnabled.config); + const workspaceDir = resolveAgentWorkspaceDir(autoEnabled.config, defaultAgentId); + try { + resolveRuntimePluginRegistry({ + config: autoEnabled.config, + activationSourceConfig: cfg, + autoEnabledReasons: autoEnabled.autoEnabledReasons, + workspaceDir, + runtimeOptions: { + allowGatewaySubagentBinding: true, + }, + }); + } catch { + bootstrapAttempts.delete(attemptKey); + } } diff --git a/src/infra/outbound/channel-resolution.test.ts b/src/infra/outbound/channel-resolution.test.ts index e02c5c0e08b..7063c5d9fe0 100644 --- a/src/infra/outbound/channel-resolution.test.ts +++ b/src/infra/outbound/channel-resolution.test.ts @@ -128,28 +128,33 @@ describe("outbound channel resolution", () => { ).toBe(plugin); }); - it("does not load registries while resolving outbound plugins", async () => { + it("bootstraps configured channel plugins when the active registry is missing the target", async () => { const plugin = { id: "alpha" }; getLoadedChannelPluginMock.mockReturnValueOnce(undefined).mockReturnValueOnce(plugin); - const channelResolution = await importChannelResolution("no-bootstrap"); + const channelResolution = await importChannelResolution("bootstrap-missing-target"); expect( channelResolution.resolveOutboundChannelPlugin({ channel: "alpha", cfg: { channels: {} } as never, + allowBootstrap: true, }), ).toBe(plugin); - expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled(); - - getChannelPluginMock.mockReturnValue(undefined); - channelResolution.resolveOutboundChannelPlugin({ - channel: "alpha", - cfg: { channels: {} } as never, - }); - expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled(); + expect(applyPluginAutoEnableMock).toHaveBeenCalledWith({ config: { channels: {} } }); + expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledWith( + expect.objectContaining({ + config: { autoEnabled: true }, + activationSourceConfig: { channels: {} }, + autoEnabledReasons: {}, + workspaceDir: "/tmp/workspace", + runtimeOptions: { + allowGatewaySubagentBinding: true, + }, + }), + ); }); - it("does not load when the active registry has other channels but not the requested one", async () => { + it("attempts activation when the active registry has other channels but not the requested one", async () => { getLoadedChannelPluginMock.mockReturnValue(undefined); getChannelPluginMock.mockReturnValue(undefined); getActivePluginRegistryMock.mockReturnValue({ @@ -164,47 +169,80 @@ describe("outbound channel resolution", () => { channelResolution.resolveOutboundChannelPlugin({ channel: "alpha", cfg: { channels: {} } as never, + allowBootstrap: true, }), ).toBeUndefined(); - expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled(); + expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1); }); it("does not retry registry loads after a missing outbound plugin", async () => { getChannelPluginMock.mockReturnValue(undefined); - resolveRuntimePluginRegistryMock.mockImplementationOnce(() => { - throw new Error("transient"); - }); const channelResolution = await importChannelResolution("bootstrap-retry"); expect( channelResolution.resolveOutboundChannelPlugin({ channel: "alpha", cfg: { channels: {} } as never, + allowBootstrap: true, }), ).toBeUndefined(); channelResolution.resolveOutboundChannelPlugin({ channel: "alpha", cfg: { channels: {} } as never, + allowBootstrap: true, }); - expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled(); + expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1); }); - it("does not load when the pinned channel registry version changes", async () => { + it("allows another activation attempt when the pinned channel registry version changes", async () => { getChannelPluginMock.mockReturnValue(undefined); const channelResolution = await importChannelResolution("channel-version-change"); channelResolution.resolveOutboundChannelPlugin({ channel: "alpha", cfg: { channels: {} } as never, + allowBootstrap: true, }); - expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled(); + expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1); getActivePluginChannelRegistryVersionMock.mockReturnValue(2); channelResolution.resolveOutboundChannelPlugin({ channel: "alpha", cfg: { channels: {} } as never, + allowBootstrap: true, }); + expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(2); + }); + + it("resolves message adapters through the activation-aware channel plugin path", async () => { + const message = { send: { text: vi.fn() } }; + const plugin = { id: "alpha", message }; + getLoadedChannelPluginMock.mockReturnValueOnce(undefined).mockReturnValueOnce(plugin); + const channelResolution = await importChannelResolution("message-adapter-bootstrap"); + + expect( + channelResolution.resolveOutboundChannelMessageAdapter({ + channel: "alpha", + cfg: { channels: {} } as never, + allowBootstrap: true, + }), + ).toBe(message); + expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1); + }); + + it("does not bootstrap by default for outbound hot-path resolution", async () => { + const plugin = { id: "alpha" }; + getLoadedChannelPluginMock.mockReturnValue(undefined); + getChannelPluginMock.mockReturnValue(plugin); + const channelResolution = await importChannelResolution("no-bootstrap-default"); + + expect( + channelResolution.resolveOutboundChannelPlugin({ + channel: "alpha", + cfg: { channels: {} } as never, + }), + ).toBe(plugin); expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled(); }); }); diff --git a/src/infra/outbound/channel-resolution.ts b/src/infra/outbound/channel-resolution.ts index 70149a404c7..2b377079f83 100644 --- a/src/infra/outbound/channel-resolution.ts +++ b/src/infra/outbound/channel-resolution.ts @@ -1,3 +1,4 @@ +import type { ChannelMessageAdapterShape } from "../../channels/message/types.js"; import { getChannelPlugin, getLoadedChannelPlugin } from "../../channels/plugins/index.js"; import type { ChannelPlugin } from "../../channels/plugins/types.plugin.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; @@ -52,6 +53,7 @@ function resolveDirectFromActiveRegistry( export function resolveOutboundChannelPlugin(params: { channel: string; cfg?: OpenClawConfig; + allowBootstrap?: boolean; }): ChannelPlugin | undefined { const normalized = normalizeDeliverableOutboundChannel(params.channel); if (!normalized) { @@ -69,6 +71,18 @@ export function resolveOutboundChannelPlugin(params: { return directCurrent; } + if (params.allowBootstrap !== true) { + return resolve(); + } + maybeBootstrapChannelPlugin({ channel: normalized, cfg: params.cfg }); return resolveLoaded() ?? resolveDirectFromActiveRegistry(normalized) ?? resolve(); } + +export function resolveOutboundChannelMessageAdapter(params: { + channel: string; + cfg?: OpenClawConfig; + allowBootstrap?: boolean; +}): ChannelMessageAdapterShape | undefined { + return resolveOutboundChannelPlugin(params)?.message; +} diff --git a/src/infra/outbound/deliver-types.ts b/src/infra/outbound/deliver-types.ts index 5d0a081c22a..f3d8efaba86 100644 --- a/src/infra/outbound/deliver-types.ts +++ b/src/infra/outbound/deliver-types.ts @@ -1,3 +1,4 @@ +import type { MessageReceipt } from "../../channels/message/types.js"; import type { ChannelId } from "../../channels/plugins/channel-id.types.js"; export type OutboundDeliveryResult = { @@ -10,6 +11,7 @@ export type OutboundDeliveryResult = { timestamp?: number; toJid?: string; pollId?: string; + receipt?: MessageReceipt; // Channel docking: stash channel-specific fields here to avoid core type churn. meta?: Record; }; diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 18e5c539c28..774e6d8906b 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -1,6 +1,7 @@ import path from "node:path"; import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { chunkText } from "../../auto-reply/chunk.js"; +import { createMessageReceiptFromOutboundResults } from "../../channels/message/receipt.js"; import type { ChannelOutboundAdapter } from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; import * as mediaCapabilityModule from "../../media/read-capability.js"; @@ -41,6 +42,8 @@ const queueMocks = vi.hoisted(() => ({ enqueueDelivery: vi.fn(async () => "mock-queue-id"), ackDelivery: vi.fn(async () => {}), failDelivery: vi.fn(async () => {}), + markDeliveryPlatformOutcomeUnknown: vi.fn(async () => {}), + markDeliveryPlatformSendAttemptStarted: vi.fn(async () => {}), withActiveDeliveryClaim: vi.fn< ( entryId: string, @@ -81,6 +84,8 @@ vi.mock("./delivery-queue.js", () => ({ enqueueDelivery: queueMocks.enqueueDelivery, ackDelivery: queueMocks.ackDelivery, failDelivery: queueMocks.failDelivery, + markDeliveryPlatformOutcomeUnknown: queueMocks.markDeliveryPlatformOutcomeUnknown, + markDeliveryPlatformSendAttemptStarted: queueMocks.markDeliveryPlatformSendAttemptStarted, withActiveDeliveryClaim: queueMocks.withActiveDeliveryClaim, })); vi.mock("../../logging/subsystem.js", () => ({ @@ -100,6 +105,7 @@ type DeliverModule = typeof import("./deliver.js"); let deliverOutboundPayloads: DeliverModule["deliverOutboundPayloads"]; let normalizeOutboundPayloads: DeliverModule["normalizeOutboundPayloads"]; +let resolveOutboundDurableFinalDeliverySupport: DeliverModule["resolveOutboundDurableFinalDeliverySupport"]; const matrixChunkConfig: OpenClawConfig = { channels: { matrix: { textChunkLimit: 4000 } } as OpenClawConfig["channels"], @@ -218,7 +224,7 @@ function flushDiagnosticEvents() { return new Promise((resolve) => setImmediate(resolve)); } -async function runBestEffortPartialFailureDelivery() { +async function runBestEffortPartialFailureDelivery(params?: { onError?: boolean }) { const sendMatrix = vi .fn() .mockRejectedValueOnce(new Error("fail")) @@ -232,7 +238,7 @@ async function runBestEffortPartialFailureDelivery() { payloads: [{ text: "a" }, { text: "b" }], deps: { matrix: sendMatrix }, bestEffort: true, - onError, + ...(params?.onError === false ? {} : { onError }), }); return { sendMatrix, onError, results }; } @@ -256,7 +262,11 @@ function expectSuccessfulMatrixInternalHookPayload( describe("deliverOutboundPayloads", () => { beforeAll(async () => { - ({ deliverOutboundPayloads, normalizeOutboundPayloads } = await import("./deliver.js")); + ({ + deliverOutboundPayloads, + normalizeOutboundPayloads, + resolveOutboundDurableFinalDeliverySupport, + } = await import("./deliver.js")); }); beforeEach(() => { @@ -279,6 +289,10 @@ describe("deliverOutboundPayloads", () => { queueMocks.ackDelivery.mockResolvedValue(undefined); queueMocks.failDelivery.mockClear(); queueMocks.failDelivery.mockResolvedValue(undefined); + queueMocks.markDeliveryPlatformOutcomeUnknown.mockClear(); + queueMocks.markDeliveryPlatformOutcomeUnknown.mockResolvedValue(undefined); + queueMocks.markDeliveryPlatformSendAttemptStarted.mockClear(); + queueMocks.markDeliveryPlatformSendAttemptStarted.mockResolvedValue(undefined); queueMocks.withActiveDeliveryClaim.mockClear(); queueMocks.withActiveDeliveryClaim.mockImplementation(async (_entryId, fn) => ({ status: "claimed", @@ -293,6 +307,733 @@ describe("deliverOutboundPayloads", () => { setActivePluginRegistry(emptyRegistry); }); + it("reports unsupported durable final delivery when required capabilities are missing", async () => { + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + sendText: async () => ({ channel: "matrix", messageId: "m1" }), + deliveryCapabilities: { + durableFinal: { + text: true, + }, + }, + }, + }), + }, + ]), + ); + + await expect( + resolveOutboundDurableFinalDeliverySupport({ + cfg: {}, + channel: "matrix", + requirements: { + text: true, + silent: true, + }, + }), + ).resolves.toEqual({ + ok: false, + reason: "capability_mismatch", + capability: "silent", + }); + }); + + it("uses channel message adapter capabilities for durable final support", async () => { + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: { + ...createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + sendText: async () => ({ channel: "matrix", messageId: "outbound" }), + deliveryCapabilities: { + durableFinal: { + text: true, + }, + }, + }, + }), + message: { + id: "matrix", + durableFinal: { + capabilities: { + text: true, + silent: true, + }, + }, + send: { + text: async () => ({ + messageId: "message", + receipt: createMessageReceiptFromOutboundResults({ + results: [{ channel: "matrix", messageId: "message" }], + kind: "text", + }), + }), + }, + }, + }, + }, + ]), + ); + + await expect( + resolveOutboundDurableFinalDeliverySupport({ + cfg: {}, + channel: "matrix", + requirements: { + text: true, + silent: true, + }, + }), + ).resolves.toEqual({ ok: true }); + }); + + it("requires a real reconciler for required unknown-send recovery support", async () => { + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: { + ...createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + sendText: async () => ({ channel: "matrix", messageId: "outbound" }), + }, + }), + message: { + id: "matrix", + durableFinal: { + capabilities: { + text: true, + reconcileUnknownSend: true, + }, + }, + send: { + text: async () => ({ + messageId: "message", + receipt: createMessageReceiptFromOutboundResults({ + results: [{ channel: "matrix", messageId: "message" }], + kind: "text", + }), + }), + }, + }, + }, + }, + ]), + ); + + await expect( + resolveOutboundDurableFinalDeliverySupport({ + cfg: {}, + channel: "matrix", + requirements: { + text: true, + reconcileUnknownSend: true, + }, + }), + ).resolves.toEqual({ + ok: false, + reason: "capability_mismatch", + capability: "reconcileUnknownSend", + }); + }); + + it("accepts required unknown-send recovery only when the adapter declares and implements it", async () => { + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: { + ...createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + sendText: async () => ({ channel: "matrix", messageId: "outbound" }), + }, + }), + message: { + id: "matrix", + durableFinal: { + capabilities: { + text: true, + reconcileUnknownSend: true, + }, + reconcileUnknownSend: async () => ({ status: "not_sent" }), + }, + send: { + text: async () => ({ + messageId: "message", + receipt: createMessageReceiptFromOutboundResults({ + results: [{ channel: "matrix", messageId: "message" }], + kind: "text", + }), + }), + }, + }, + }, + }, + ]), + ); + + await expect( + resolveOutboundDurableFinalDeliverySupport({ + cfg: {}, + channel: "matrix", + requirements: { + text: true, + reconcileUnknownSend: true, + }, + }), + ).resolves.toEqual({ ok: true }); + }); + + it("sends text through the channel message adapter when present", async () => { + const messageSendText = vi.fn(async () => ({ + messageId: "message-adapter-1", + receipt: createMessageReceiptFromOutboundResults({ + results: [{ channel: "matrix", messageId: "message-adapter-1" }], + kind: "text", + }), + })); + const outboundSendText = vi.fn(async () => ({ + channel: "matrix" as const, + messageId: "outbound-1", + })); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: { + ...createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + chunker: chunkText, + sendText: outboundSendText, + }, + }), + message: { + id: "matrix", + durableFinal: { + capabilities: { + text: true, + }, + }, + send: { + text: messageSendText, + }, + }, + }, + }, + ]), + ); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hello" }], + }); + + expect(messageSendText).toHaveBeenCalledWith( + expect.objectContaining({ + to: "!room:example", + text: "hello", + }), + ); + expect(outboundSendText).not.toHaveBeenCalled(); + expect(results).toMatchObject([ + { + channel: "matrix", + messageId: "message-adapter-1", + }, + ]); + expect(results[0]?.receipt?.platformMessageIds).toEqual(["message-adapter-1"]); + }); + + it("runs message adapter send lifecycle after durable intent and before platform send", async () => { + const order: string[] = []; + queueMocks.enqueueDelivery.mockImplementationOnce(async () => { + order.push("queue"); + return "queue-1"; + }); + queueMocks.ackDelivery.mockImplementationOnce(async () => { + order.push("ack"); + }); + queueMocks.markDeliveryPlatformSendAttemptStarted.mockImplementationOnce(async () => { + order.push("mark-started"); + }); + queueMocks.markDeliveryPlatformOutcomeUnknown.mockImplementationOnce(async () => { + order.push("mark-unknown"); + }); + const messageSendText = vi.fn(async () => { + order.push("send"); + return { + messageId: "message-adapter-1", + receipt: createMessageReceiptFromOutboundResults({ + results: [{ channel: "matrix", messageId: "message-adapter-1" }], + kind: "text", + }), + }; + }); + const beforeSendAttempt = vi.fn(() => { + order.push("before"); + return "pending-1"; + }); + const afterSendSuccess = vi.fn( + (ctx: { attemptToken?: unknown; result: { messageId?: string } }) => { + order.push(`after:${String(ctx.attemptToken)}:${ctx.result.messageId ?? ""}`); + }, + ); + const afterCommit = vi.fn((ctx: { attemptToken?: unknown; result: { messageId?: string } }) => { + order.push(`commit:${String(ctx.attemptToken)}:${ctx.result.messageId ?? ""}`); + }); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: { + id: "matrix", + message: { + id: "matrix", + durableFinal: { + capabilities: { + text: true, + afterSendSuccess: true, + afterCommit: true, + }, + }, + send: { + lifecycle: { + beforeSendAttempt, + afterSendSuccess, + afterCommit, + }, + text: messageSendText, + }, + }, + }, + }, + ]), + ); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hello" }], + queuePolicy: "required", + }); + + expect(order).toEqual([ + "queue", + "before", + "mark-started", + "send", + "after:pending-1:message-adapter-1", + "mark-unknown", + "ack", + "commit:pending-1:message-adapter-1", + ]); + expect(beforeSendAttempt).toHaveBeenCalledWith( + expect.objectContaining({ + kind: "text", + to: "!room:example", + text: "hello", + }), + ); + expect(afterSendSuccess).toHaveBeenCalledWith( + expect.objectContaining({ + kind: "text", + attemptToken: "pending-1", + result: expect.objectContaining({ messageId: "message-adapter-1" }), + }), + ); + expect(afterCommit).toHaveBeenCalledWith( + expect.objectContaining({ + kind: "text", + attemptToken: "pending-1", + result: expect.objectContaining({ messageId: "message-adapter-1" }), + }), + ); + expect(results).toMatchObject([{ channel: "matrix", messageId: "message-adapter-1" }]); + }); + + it("does not mark queued delivery as unknown when hooks cancel before platform send", async () => { + hookMocks.runner.hasHooks.mockImplementation( + (hookName?: string) => hookName === "message_sending", + ); + hookMocks.runner.runMessageSending.mockResolvedValueOnce({ + cancel: true, + content: "blocked", + }); + const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" }); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hello" }], + deps: { matrix: sendMatrix }, + queuePolicy: "required", + }); + + expect(results).toEqual([]); + expect(sendMatrix).not.toHaveBeenCalled(); + expect(queueMocks.markDeliveryPlatformSendAttemptStarted).not.toHaveBeenCalled(); + expect(queueMocks.markDeliveryPlatformOutcomeUnknown).not.toHaveBeenCalled(); + expect(queueMocks.ackDelivery).toHaveBeenCalledWith("mock-queue-id"); + }); + + it("runs message adapter failure cleanup for failed sends with pending attempt tokens", async () => { + const messageSendText = vi.fn(async () => { + throw new Error("native send failed"); + }); + const afterSendFailure = vi.fn(); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: { + id: "matrix", + message: { + id: "matrix", + durableFinal: { + capabilities: { + text: true, + afterSendSuccess: true, + }, + }, + send: { + lifecycle: { + beforeSendAttempt: () => "pending-2", + afterSendFailure, + }, + text: messageSendText, + }, + }, + }, + }, + ]), + ); + + await expect( + deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hello" }], + queuePolicy: "required", + }), + ).rejects.toThrow("native send failed"); + + expect(afterSendFailure).toHaveBeenCalledWith( + expect.objectContaining({ + kind: "text", + attemptToken: "pending-2", + error: expect.any(Error), + }), + ); + expect(queueMocks.failDelivery).toHaveBeenCalledWith( + "mock-queue-id", + expect.stringContaining("native send failed"), + ); + }); + + it("preserves native send errors when failure cleanup throws", async () => { + const messageSendText = vi.fn(async () => { + throw new Error("native send failed"); + }); + const afterSendFailure = vi.fn(async () => { + throw new Error("cleanup failed"); + }); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: { + id: "matrix", + message: { + id: "matrix", + durableFinal: { + capabilities: { + text: true, + afterSendSuccess: true, + }, + }, + send: { + lifecycle: { + beforeSendAttempt: () => "pending-2", + afterSendFailure, + }, + text: messageSendText, + }, + }, + }, + }, + ]), + ); + + await expect( + deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hello" }], + queuePolicy: "required", + }), + ).rejects.toThrow("native send failed"); + + expect(afterSendFailure).toHaveBeenCalledWith( + expect.objectContaining({ + kind: "text", + attemptToken: "pending-2", + error: expect.any(Error), + }), + ); + expect(queueMocks.failDelivery).toHaveBeenCalledWith( + "mock-queue-id", + expect.stringContaining("native send failed"), + ); + }); + + it("preserves successful sends when the success hook throws", async () => { + const afterSendFailure = vi.fn(); + const afterCommit = vi.fn(); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: { + id: "matrix", + message: { + id: "matrix", + durableFinal: { + capabilities: { + text: true, + afterSendSuccess: true, + afterCommit: true, + }, + }, + send: { + lifecycle: { + afterSendSuccess: async () => { + throw new Error("success hook failed"); + }, + afterSendFailure, + afterCommit, + }, + text: async () => ({ + messageId: "message-adapter-1", + receipt: createMessageReceiptFromOutboundResults({ + results: [{ channel: "matrix", messageId: "message-adapter-1" }], + kind: "text", + }), + }), + }, + }, + }, + }, + ]), + ); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hello" }], + queuePolicy: "required", + }); + + expect(results).toMatchObject([{ channel: "matrix", messageId: "message-adapter-1" }]); + expect(afterSendFailure).not.toHaveBeenCalled(); + expect(afterCommit).toHaveBeenCalledWith( + expect.objectContaining({ + kind: "text", + result: expect.objectContaining({ messageId: "message-adapter-1" }), + }), + ); + expect(queueMocks.ackDelivery).toHaveBeenCalledWith("mock-queue-id"); + expect(queueMocks.failDelivery).not.toHaveBeenCalled(); + }); + + it("requires durable queue writes when requested", async () => { + queueMocks.enqueueDelivery.mockRejectedValueOnce(new Error("queue offline")); + const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1" }); + + await expect( + deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hi" }], + deps: { matrix: sendMatrix }, + queuePolicy: "required", + }), + ).rejects.toThrow("queue offline"); + + expect(sendMatrix).not.toHaveBeenCalled(); + }); + + it("falls back to direct send when best-effort queue writes fail", async () => { + queueMocks.enqueueDelivery.mockRejectedValueOnce(new Error("queue offline")); + const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1" }); + + await expect( + deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hi" }], + deps: { matrix: sendMatrix }, + queuePolicy: "best_effort", + }), + ).resolves.toEqual([expect.objectContaining({ messageId: "m1" })]); + + expect(sendMatrix).toHaveBeenCalled(); + }); + + it("runs afterCommit hooks after best-effort queue fallback direct sends", async () => { + queueMocks.enqueueDelivery.mockRejectedValueOnce(new Error("queue offline")); + const afterCommit = vi.fn(); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: { + id: "matrix", + message: { + id: "matrix", + durableFinal: { + capabilities: { + text: true, + afterCommit: true, + }, + }, + send: { + lifecycle: { + afterCommit, + }, + text: async () => ({ + messageId: "message-adapter-1", + receipt: createMessageReceiptFromOutboundResults({ + results: [{ channel: "matrix", messageId: "message-adapter-1" }], + kind: "text", + }), + }), + }, + }, + }, + }, + ]), + ); + + await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hello" }], + queuePolicy: "best_effort", + }); + + expect(afterCommit).toHaveBeenCalledWith( + expect.objectContaining({ + kind: "text", + result: expect.objectContaining({ messageId: "message-adapter-1" }), + }), + ); + expect(queueMocks.ackDelivery).not.toHaveBeenCalled(); + }); + + it("requires the platform-send attempt marker before required durable platform I/O", async () => { + queueMocks.markDeliveryPlatformSendAttemptStarted.mockRejectedValueOnce( + new Error("marker offline"), + ); + const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1" }); + + await expect( + deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hi" }], + deps: { matrix: sendMatrix }, + queuePolicy: "required", + }), + ).rejects.toThrow("marker offline"); + + expect(queueMocks.markDeliveryPlatformSendAttemptStarted).toHaveBeenCalledWith("mock-queue-id"); + expect(sendMatrix).not.toHaveBeenCalled(); + expect(queueMocks.failDelivery).toHaveBeenCalledWith( + "mock-queue-id", + expect.stringContaining("marker offline"), + ); + expect(queueMocks.ackDelivery).not.toHaveBeenCalled(); + }); + + it("fails required delivery when the post-send unknown marker cannot be written", async () => { + queueMocks.markDeliveryPlatformOutcomeUnknown.mockRejectedValueOnce( + new Error("unknown marker offline"), + ); + const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1" }); + + await expect( + deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hi" }], + deps: { matrix: sendMatrix }, + queuePolicy: "required", + }), + ).rejects.toThrow("unknown marker offline"); + + expect(sendMatrix).toHaveBeenCalled(); + expect(queueMocks.ackDelivery).not.toHaveBeenCalled(); + expect(queueMocks.failDelivery).not.toHaveBeenCalled(); + }); + + it("fails required delivery when queue ack fails after platform send", async () => { + queueMocks.ackDelivery.mockRejectedValueOnce(new Error("ack offline")); + const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1" }); + + await expect( + deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hi" }], + deps: { matrix: sendMatrix }, + queuePolicy: "required", + }), + ).rejects.toThrow("ack offline"); + + expect(sendMatrix).toHaveBeenCalled(); + expect(queueMocks.markDeliveryPlatformOutcomeUnknown).toHaveBeenCalledWith("mock-queue-id"); + expect(queueMocks.failDelivery).not.toHaveBeenCalled(); + }); + it("emits bounded delivery diagnostics for successful outbound sends", async () => { const events: DiagnosticEventPayload[] = []; const unsubscribe = onInternalDiagnosticEvent((event) => events.push(event)); @@ -1551,6 +2292,16 @@ describe("deliverOutboundPayloads", () => { ); }); + it("calls failDelivery instead of ackDelivery on bestEffort partial failure without onError", async () => { + await runBestEffortPartialFailureDelivery({ onError: false }); + + expect(queueMocks.ackDelivery).not.toHaveBeenCalled(); + expect(queueMocks.failDelivery).toHaveBeenCalledWith( + "mock-queue-id", + "partial delivery failure (bestEffort)", + ); + }); + it("writes raw payloads to the queue before normalization", async () => { const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m-raw", roomId: "!room:example" }); const rawPayloads: DeliverOutboundPayload[] = [ @@ -1577,6 +2328,51 @@ describe("deliverOutboundPayloads", () => { { text: "caption\nMEDIA:https://x.test/a.png" }, { text: "NO_REPLY", mediaUrl: " https://x.test/b.png " }, ], + renderedBatchPlan: expect.objectContaining({ + payloadCount: 4, + textCount: 4, + mediaCount: 1, + items: expect.arrayContaining([ + expect.objectContaining({ + index: 3, + kinds: ["text", "media"], + text: "NO_REPLY", + mediaUrls: ["https://x.test/b.png"], + }), + ]), + }), + }), + ); + }); + + it("persists rendered batch plans with queued deliveries", async () => { + const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m-plan", roomId: "!room:example" }); + const renderedBatchPlan = { + payloadCount: 2, + textCount: 1, + mediaCount: 1, + voiceCount: 0, + presentationCount: 0, + interactiveCount: 0, + channelDataCount: 0, + items: [ + { index: 0, kinds: ["text"] as const, text: "hello", mediaUrls: [] }, + { index: 1, kinds: ["media"] as const, mediaUrls: ["file:///tmp/a.png"] }, + ], + }; + + await deliverOutboundPayloads({ + cfg: matrixChunkConfig, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "hello" }, { mediaUrl: "file:///tmp/a.png" }], + deps: { matrix: sendMatrix }, + renderedBatchPlan, + }); + + expect(queueMocks.enqueueDelivery).toHaveBeenCalledWith( + expect.objectContaining({ + renderedBatchPlan, }), ); }); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index b5976da781d..398739c2bbe 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -1,7 +1,16 @@ import { resolveChunkMode, resolveTextChunkLimit } from "../../auto-reply/chunk.js"; import type { ReplyPayload } from "../../auto-reply/types.js"; +import { createRenderedMessageBatchPlan } from "../../channels/message/rendered-batch.js"; +import type { + ChannelMessageAdapterShape, + ChannelMessageSendAttemptContext, + ChannelMessageSendAttemptKind, + ChannelMessageSendLifecycleAdapter, + ChannelMessageSendResult, +} from "../../channels/message/types.js"; import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load.js"; import type { + ChannelDeliveryCapabilities, ChannelOutboundAdapter, ChannelOutboundContext, ChannelOutboundPayloadContext, @@ -32,11 +41,20 @@ import { diagnosticErrorCategory } from "../diagnostic-error-metadata.js"; import { emitDiagnosticEvent, type DiagnosticMessageDeliveryKind } from "../diagnostic-events.js"; import { formatErrorMessage } from "../errors.js"; import { throwIfAborted } from "./abort.js"; +import { resolveOutboundChannelMessageAdapter } from "./channel-resolution.js"; import type { OutboundDeliveryResult } from "./deliver-types.js"; +import { + attachOutboundDeliveryCommitHook, + runOutboundDeliveryCommitHooks, + type OutboundDeliveryCommitHook, +} from "./delivery-commit-hooks.js"; import { ackDelivery, enqueueDelivery, failDelivery, + markDeliveryPlatformOutcomeUnknown, + markDeliveryPlatformSendAttemptStarted, + type QueuedRenderedMessageBatchPlan, withActiveDeliveryClaim, } from "./delivery-queue.js"; import type { OutboundDeliveryFormattingOptions } from "./formatting.js"; @@ -65,6 +83,32 @@ export type { NormalizedOutboundPayload } from "./payloads.js"; export { normalizeOutboundPayloads } from "./payloads.js"; export { resolveOutboundSendDep, type OutboundSendDeps } from "./send-deps.js"; +export type OutboundDeliveryQueuePolicy = "required" | "best_effort"; + +export type OutboundDeliveryIntent = { + id: string; + channel: Exclude; + to: string; + accountId?: string; + queuePolicy: OutboundDeliveryQueuePolicy; +}; + +export type DurableFinalDeliveryRequirement = keyof NonNullable< + ChannelDeliveryCapabilities["durableFinal"] +>; + +export type DurableFinalDeliveryRequirements = Partial< + Record +>; + +export type OutboundDurableDeliverySupport = + | { ok: true } + | { + ok: false; + reason: "missing_outbound_handler" | "capability_mismatch"; + capability?: DurableFinalDeliveryRequirement; + }; + const log = createSubsystemLogger("outbound/deliver"); let transcriptRuntimePromise: | Promise @@ -130,6 +174,8 @@ type ChannelHandler = { ) => Promise; }; +type ChannelMessageLifecycleContext = ChannelMessageSendAttemptContext; + type ChannelHandlerParams = { cfg: OpenClawConfig; channel: Exclude; @@ -146,6 +192,7 @@ type ChannelHandlerParams = { silent?: boolean; mediaAccess?: OutboundMediaAccess; gatewayClientScopes?: readonly string[]; + onPlatformSendStart?: () => Promise; }; // Channel docking: outbound delivery delegates to plugin.outbound adapters. @@ -153,21 +200,26 @@ async function resolveChannelOutboundDirectiveOptions(params: { cfg: OpenClawConfig; channel: Exclude; }): Promise<{ extractMarkdownImages?: boolean }> { - let outbound = await loadChannelOutboundAdapter(params.channel); - if (!outbound) { - const { bootstrapOutboundChannelPlugin } = await loadChannelBootstrapRuntime(); - bootstrapOutboundChannelPlugin({ - channel: params.channel, - cfg: params.cfg, - }); - outbound = await loadChannelOutboundAdapter(params.channel); - } + const outbound = await loadBootstrappedOutboundAdapter(params); return { extractMarkdownImages: outbound?.extractMarkdownImages === true ? true : undefined, }; } async function createChannelHandler(params: ChannelHandlerParams): Promise { + const outbound = await loadBootstrappedOutboundAdapter(params); + const message = resolveOutboundChannelMessageAdapter(params); + const handler = createPluginHandler({ ...params, outbound, message }); + if (!handler) { + throw new Error(`Outbound not configured for channel: ${params.channel}`); + } + return handler; +} + +async function loadBootstrappedOutboundAdapter(params: { + cfg: OpenClawConfig; + channel: Exclude; +}): Promise { let outbound = await loadChannelOutboundAdapter(params.channel); if (!outbound) { const { bootstrapOutboundChannelPlugin } = await loadChannelBootstrapRuntime(); @@ -177,25 +229,112 @@ async function createChannelHandler(params: ChannelHandlerParams): Promise(params: { + lifecycle?: ChannelMessageSendLifecycleAdapter; + ctx: ChannelMessageLifecycleContext; + send: () => Promise; +}): Promise<{ result: TResult; afterCommit?: OutboundDeliveryCommitHook }> { + if (!params.lifecycle) { + return { result: await params.send() }; } - return handler; + let attemptToken: unknown; + try { + attemptToken = await params.lifecycle.beforeSendAttempt?.(params.ctx); + const result = await params.send(); + const successCtx = { + ...params.ctx, + result, + ...(attemptToken !== undefined ? { attemptToken } : {}), + }; + try { + await params.lifecycle.afterSendSuccess?.(successCtx); + } catch (successHookError: unknown) { + log.warn( + `channel message send success hook failed after platform send; preserving send result: ${formatErrorMessage(successHookError)}`, + ); + } + return { + result, + ...(params.lifecycle.afterCommit + ? { + afterCommit: async () => { + await params.lifecycle?.afterCommit?.(successCtx); + }, + } + : {}), + }; + } catch (error: unknown) { + try { + await params.lifecycle.afterSendFailure?.({ + ...params.ctx, + error, + ...(attemptToken !== undefined ? { attemptToken } : {}), + }); + } catch (cleanupError: unknown) { + log.warn( + `channel message send failure cleanup failed; preserving original send error: ${formatErrorMessage(cleanupError)}`, + ); + } + throw error; + } +} + +export async function resolveOutboundDurableFinalDeliverySupport(params: { + cfg: OpenClawConfig; + channel: Exclude; + requirements?: DurableFinalDeliveryRequirements; +}): Promise { + const outbound = await loadBootstrappedOutboundAdapter(params); + const message = resolveOutboundChannelMessageAdapter(params); + if (!message?.send?.text && !outbound?.sendText) { + return { ok: false, reason: "missing_outbound_handler" }; + } + + const messageDurableFinal = message?.durableFinal; + const durableFinal = + messageDurableFinal?.capabilities ?? outbound?.deliveryCapabilities?.durableFinal; + for (const [capability, required] of Object.entries(params.requirements ?? {}) as Array< + [DurableFinalDeliveryRequirement, boolean | undefined] + >) { + if (required === true && durableFinal?.[capability] !== true) { + return { ok: false, reason: "capability_mismatch", capability }; + } + if ( + required === true && + capability === "reconcileUnknownSend" && + typeof messageDurableFinal?.reconcileUnknownSend !== "function" + ) { + return { ok: false, reason: "capability_mismatch", capability }; + } + } + + return { ok: true }; } function createPluginHandler( - params: ChannelHandlerParams & { outbound?: ChannelOutboundAdapter }, + params: ChannelHandlerParams & { + outbound?: ChannelOutboundAdapter; + message?: ChannelMessageAdapterShape; + }, ): ChannelHandler | null { const outbound = params.outbound; - if (!outbound?.sendText) { + const messageText = params.message?.send?.text; + const messageMedia = params.message?.send?.media; + const messagePayload = params.message?.send?.payload; + const messageLifecycle = params.message?.send?.lifecycle; + if (!messageText && !outbound?.sendText) { return null; } const baseCtx = createChannelOutboundContextBase(params); - const sendText = outbound.sendText; - const sendMedia = outbound.sendMedia; - const chunker = outbound.chunker ?? null; - const chunkerMode = outbound.chunkerMode; + const sendText = outbound?.sendText; + const sendMedia = outbound?.sendMedia; + const chunker = outbound?.chunker ?? null; + const chunkerMode = outbound?.chunkerMode; const resolveCtx = (overrides?: { replyToId?: string | null; replyToIdSource?: "explicit" | "implicit"; @@ -222,16 +361,16 @@ function createPluginHandler( return { chunker, chunkerMode, - textChunkLimit: outbound.textChunkLimit, - supportsMedia: Boolean(sendMedia), - sanitizeText: outbound.sanitizeText + textChunkLimit: outbound?.textChunkLimit, + supportsMedia: Boolean(messageMedia ?? sendMedia), + sanitizeText: outbound?.sanitizeText ? (payload) => outbound.sanitizeText!({ text: payload.text ?? "", payload }) : undefined, - normalizePayload: outbound.normalizePayload + normalizePayload: outbound?.normalizePayload ? (payload) => outbound.normalizePayload!({ payload }) : undefined, - sendTextOnlyErrorPayloads: outbound.sendTextOnlyErrorPayloads === true, - renderPresentation: outbound.renderPresentation + sendTextOnlyErrorPayloads: outbound?.sendTextOnlyErrorPayloads === true, + renderPresentation: outbound?.renderPresentation ? async (payload) => { const presentation = normalizeMessagePresentation(payload.presentation); if (!presentation) { @@ -250,7 +389,7 @@ function createPluginHandler( return await outbound.renderPresentation!({ payload, presentation, ctx }); } : undefined, - pinDeliveredMessage: outbound.pinDeliveredMessage + pinDeliveredMessage: outbound?.pinDeliveredMessage ? async ({ target, messageId, pin }) => outbound.pinDeliveredMessage!({ cfg: params.cfg, @@ -259,7 +398,7 @@ function createPluginHandler( pin, }) : undefined, - afterDeliverPayload: outbound.afterDeliverPayload + afterDeliverPayload: outbound?.afterDeliverPayload ? async ({ target, payload, results }) => outbound.afterDeliverPayload!({ cfg: params.cfg, @@ -268,10 +407,10 @@ function createPluginHandler( results, }) : undefined, - shouldSkipPlainTextSanitization: outbound.shouldSkipPlainTextSanitization + shouldSkipPlainTextSanitization: outbound?.shouldSkipPlainTextSanitization ? (payload) => outbound.shouldSkipPlainTextSanitization!({ payload }) : undefined, - resolveEffectiveTextChunkLimit: outbound.resolveEffectiveTextChunkLimit + resolveEffectiveTextChunkLimit: outbound?.resolveEffectiveTextChunkLimit ? (fallbackLimit) => outbound.resolveEffectiveTextChunkLimit!({ cfg: params.cfg, @@ -279,52 +418,125 @@ function createPluginHandler( fallbackLimit, }) : undefined, - sendPayload: outbound.sendPayload - ? async (payload, overrides) => - outbound.sendPayload!({ - ...resolveCtx(overrides), - text: payload.text ?? "", - mediaUrl: payload.mediaUrl, - payload, - }) - : undefined, - sendFormattedText: outbound.sendFormattedText - ? async (text, overrides) => - outbound.sendFormattedText!({ + sendPayload: + messagePayload || outbound?.sendPayload + ? async (payload, overrides) => { + const payloadCtx = { + ...resolveCtx(overrides), + kind: "payload" as const satisfies ChannelMessageSendAttemptKind, + text: payload.text ?? "", + mediaUrl: payload.mediaUrl, + payload, + }; + if (messagePayload) { + const sent = await runChannelMessageSendWithLifecycle({ + lifecycle: messageLifecycle, + ctx: payloadCtx, + send: async () => { + await params.onPlatformSendStart?.(); + return await messagePayload(payloadCtx); + }, + }); + return attachOutboundDeliveryCommitHook( + normalizeChannelMessageSendResult(params.channel, sent.result), + sent.afterCommit, + ); + } + await params.onPlatformSendStart?.(); + return outbound!.sendPayload!(payloadCtx); + } + : undefined, + sendFormattedText: outbound?.sendFormattedText + ? async (text, overrides) => { + await params.onPlatformSendStart?.(); + return await outbound.sendFormattedText!({ ...resolveCtx(overrides), text, - }) + }); + } : undefined, - sendFormattedMedia: outbound.sendFormattedMedia - ? async (caption, mediaUrl, overrides) => - outbound.sendFormattedMedia!({ + sendFormattedMedia: outbound?.sendFormattedMedia + ? async (caption, mediaUrl, overrides) => { + await params.onPlatformSendStart?.(); + return await outbound.sendFormattedMedia!({ ...resolveCtx(overrides), text: caption, mediaUrl, - }) + }); + } : undefined, - sendText: async (text, overrides) => - sendText({ + sendText: async (text, overrides) => { + const textCtx = { ...resolveCtx(overrides), + kind: "text" as const satisfies ChannelMessageSendAttemptKind, text, - }), + }; + if (messageText) { + const sent = await runChannelMessageSendWithLifecycle({ + lifecycle: messageLifecycle, + ctx: textCtx, + send: async () => { + await params.onPlatformSendStart?.(); + return await messageText(textCtx); + }, + }); + return attachOutboundDeliveryCommitHook( + normalizeChannelMessageSendResult(params.channel, sent.result), + sent.afterCommit, + ); + } + await params.onPlatformSendStart?.(); + return sendText!(textCtx); + }, buildTargetRef, sendMedia: async (caption, mediaUrl, overrides) => { - if (sendMedia) { - return sendMedia({ - ...resolveCtx(overrides), - text: caption, - mediaUrl, - }); - } - return sendText({ + const mediaCtx = { ...resolveCtx(overrides), + kind: "media" as const satisfies ChannelMessageSendAttemptKind, text: caption, - }); + mediaUrl, + }; + if (messageMedia) { + const sent = await runChannelMessageSendWithLifecycle({ + lifecycle: messageLifecycle, + ctx: mediaCtx, + send: async () => { + await params.onPlatformSendStart?.(); + return await messageMedia(mediaCtx); + }, + }); + return attachOutboundDeliveryCommitHook( + normalizeChannelMessageSendResult(params.channel, sent.result), + sent.afterCommit, + ); + } + if (sendMedia) { + await params.onPlatformSendStart?.(); + return sendMedia(mediaCtx); + } + await params.onPlatformSendStart?.(); + return sendText!(mediaCtx); }, }; } +function normalizeChannelMessageSendResult( + channel: Exclude, + result: ChannelMessageSendResult, +): OutboundDeliveryResult { + const source = result as ChannelMessageSendResult & Partial; + return { + ...source, + channel, + messageId: + source.messageId ?? + source.receipt.primaryPlatformMessageId ?? + source.receipt.platformMessageIds[0] ?? + "", + receipt: source.receipt, + }; +} + function createChannelOutboundContextBase( params: ChannelHandlerParams, ): Omit { @@ -350,6 +562,40 @@ function createChannelOutboundContextBase( const isAbortError = (err: unknown): boolean => err instanceof Error && err.name === "AbortError"; +async function markQueuedPlatformSendAttemptStarted(params: { + queueId: string; + queuePolicy: OutboundDeliveryQueuePolicy; +}): Promise { + try { + await markDeliveryPlatformSendAttemptStarted(params.queueId); + return true; + } catch (err: unknown) { + if (params.queuePolicy === "required") { + throw err; + } + log.warn( + `failed to mark queued delivery ${params.queueId} as platform-send-attempt-started; continuing best-effort delivery: ${formatErrorMessage(err)}`, + ); + return false; + } +} + +async function markQueuedPlatformOutcomeUnknown(params: { + queueId: string; + queuePolicy: OutboundDeliveryQueuePolicy; +}): Promise { + try { + await markDeliveryPlatformOutcomeUnknown(params.queueId); + } catch (err: unknown) { + if (params.queuePolicy === "required") { + throw err; + } + log.warn( + `failed to mark queued delivery ${params.queueId} as platform-outcome-unknown; continuing best-effort delivery: ${formatErrorMessage(err)}`, + ); + } +} + type DeliverOutboundPayloadsCoreParams = { cfg: OpenClawConfig; channel: Exclude; @@ -376,6 +622,10 @@ type DeliverOutboundPayloadsCoreParams = { gatewayClientScopes?: readonly string[]; }; +type DeliverOutboundPayloadsCoreRuntimeParams = DeliverOutboundPayloadsCoreParams & { + onPlatformSendStart?: () => Promise; +}; + function collectPayloadMediaSources(plan: readonly OutboundPayloadPlan[]): string[] { return plan.flatMap((entry) => entry.parts.mediaUrls); } @@ -383,6 +633,11 @@ function collectPayloadMediaSources(plan: readonly OutboundPayloadPlan[]): strin export type DeliverOutboundPayloadsParams = DeliverOutboundPayloadsCoreParams & { /** @internal Skip write-ahead queue (used by crash-recovery to avoid re-enqueueing). */ skipQueue?: boolean; + /** @internal Let recovery run commit hooks after it has acked the recovered queue entry. */ + deferCommitHooks?: boolean; + queuePolicy?: OutboundDeliveryQueuePolicy; + renderedBatchPlan?: QueuedRenderedMessageBatchPlan; + onDeliveryIntent?: (intent: OutboundDeliveryIntent) => void; }; type MessageSentEvent = { @@ -830,6 +1085,9 @@ export async function deliverOutboundPayloads( params: DeliverOutboundPayloadsParams, ): Promise { const { channel, to, payloads } = params; + const queuePolicy = params.queuePolicy ?? "best_effort"; + const renderedBatchPlan = + params.renderedBatchPlan ?? createRenderedMessageBatchPlan(params.payloads); // Write-ahead delivery queue: persist before sending, remove after success. const queueId = params.skipQueue @@ -839,10 +1097,12 @@ export async function deliverOutboundPayloads( to, accountId: params.accountId, payloads, + renderedBatchPlan, threadId: params.threadId, replyToId: params.replyToId, replyToMode: params.replyToMode, formatting: params.formatting, + identity: params.identity, bestEffort: params.bestEffort, gifPlayback: params.gifPlayback, forceDocument: params.forceDocument, @@ -850,7 +1110,22 @@ export async function deliverOutboundPayloads( mirror: params.mirror, session: params.session, gatewayClientScopes: params.gatewayClientScopes, - }).catch(() => null); // Best-effort — don't block delivery if queue write fails. + }).catch((err: unknown) => { + if (queuePolicy === "required") { + throw err; + } + return null; + }); // Best-effort delivery falls back to direct send if the queue write fails. + + if (queueId) { + params.onDeliveryIntent?.({ + id: queueId, + channel, + to, + ...(params.accountId ? { accountId: params.accountId } : {}), + queuePolicy, + }); + } if (!queueId) { return await deliverOutboundPayloadsWithQueueCleanup(params, null); @@ -876,23 +1151,65 @@ async function deliverOutboundPayloadsWithQueueCleanup( // without throwing — so the outer try/catch never fires. We track whether any // payload failed so we can call failDelivery instead of ackDelivery. let hadPartialFailure = false; - const wrappedParams = params.onError - ? { - ...params, - onError: (err: unknown, payload: NormalizedOutboundPayload) => { - hadPartialFailure = true; - params.onError!(err, payload); - }, - } - : params; + const wrappedParams = { + ...params, + onError: (err: unknown, payload: NormalizedOutboundPayload) => { + hadPartialFailure = true; + params.onError?.(err, payload); + }, + }; + const queuePolicy = params.queuePolicy ?? "best_effort"; + let platformResultsReturned = false; try { - const results = await deliverOutboundPayloadsCore(wrappedParams); + let platformSendStarted = false; + const results = await deliverOutboundPayloadsCore({ + ...wrappedParams, + ...(queueId + ? { + onPlatformSendStart: async () => { + if (platformSendStarted) { + return; + } + platformSendStarted = await markQueuedPlatformSendAttemptStarted({ + queueId, + queuePolicy, + }); + }, + } + : {}), + }); + platformResultsReturned = true; + if (!queueId) { + if (!params.deferCommitHooks) { + await runOutboundDeliveryCommitHooks(results); + } + return results; + } if (queueId) { if (hadPartialFailure) { await failDelivery(queueId, "partial delivery failure (bestEffort)").catch(() => {}); } else { - await ackDelivery(queueId).catch(() => {}); // Best-effort cleanup. + if (platformSendStarted) { + await markQueuedPlatformOutcomeUnknown({ + queueId, + queuePolicy, + }); + } + const acked = await ackDelivery(queueId) + .then(() => true) + .catch((err: unknown) => { + if (queuePolicy === "required") { + throw err; + } + log.warn( + `failed to ack queued delivery ${queueId}; continuing best-effort delivery: ${formatErrorMessage(err)}`, + ); + return false; + }); + if (acked) { + await runOutboundDeliveryCommitHooks(results); + } } } return results; @@ -900,7 +1217,7 @@ async function deliverOutboundPayloadsWithQueueCleanup( if (queueId) { if (isAbortError(err)) { await ackDelivery(queueId).catch(() => {}); - } else { + } else if (!platformResultsReturned) { await failDelivery(queueId, formatErrorMessage(err)).catch(() => {}); } } @@ -910,7 +1227,7 @@ async function deliverOutboundPayloadsWithQueueCleanup( /** Core delivery logic (extracted for queue wrapper). */ async function deliverOutboundPayloadsCore( - params: DeliverOutboundPayloadsCoreParams, + params: DeliverOutboundPayloadsCoreRuntimeParams, ): Promise { const { cfg, channel, to, payloads } = params; const directiveOptions = await resolveChannelOutboundDirectiveOptions({ cfg, channel }); @@ -958,6 +1275,7 @@ async function deliverOutboundPayloadsCore( silent: params.silent, mediaAccess, gatewayClientScopes: params.gatewayClientScopes, + ...(params.onPlatformSendStart ? { onPlatformSendStart: params.onPlatformSendStart } : {}), }); const configuredTextLimit = handler.chunker ? resolveTextChunkLimit(cfg, channel, accountId, { diff --git a/src/infra/outbound/delivery-commit-hooks.ts b/src/infra/outbound/delivery-commit-hooks.ts new file mode 100644 index 00000000000..f39077ae833 --- /dev/null +++ b/src/infra/outbound/delivery-commit-hooks.ts @@ -0,0 +1,46 @@ +import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { formatErrorMessage } from "../errors.js"; +import type { OutboundDeliveryResult } from "./deliver-types.js"; + +export type OutboundDeliveryCommitHook = () => Promise; + +const log = createSubsystemLogger("outbound/deliver"); +const outboundDeliveryCommitHooks = new WeakMap< + OutboundDeliveryResult, + OutboundDeliveryCommitHook[] +>(); + +export function attachOutboundDeliveryCommitHook( + result: T, + hook?: OutboundDeliveryCommitHook, +): T { + if (!hook) { + return result; + } + const hooks = outboundDeliveryCommitHooks.get(result) ?? []; + hooks.push(hook); + outboundDeliveryCommitHooks.set(result, hooks); + return result; +} + +export async function runOutboundDeliveryCommitHooks( + results: readonly OutboundDeliveryResult[], +): Promise { + for (const result of results) { + for (const hook of outboundDeliveryCommitHooks.get(result) ?? []) { + try { + await hook(); + } catch (err) { + log.warn("Plugin message adapter after-commit hook failed.", { + channel: result.channel, + messageId: result.messageId, + error: formatErrorMessage(err), + }); + } + } + } +} + +export function isOutboundDeliveryResultArray(value: unknown): value is OutboundDeliveryResult[] { + return Array.isArray(value); +} diff --git a/src/infra/outbound/delivery-queue-recovery.ts b/src/infra/outbound/delivery-queue-recovery.ts index 1c99408ab03..85421cbad58 100644 --- a/src/infra/outbound/delivery-queue-recovery.ts +++ b/src/infra/outbound/delivery-queue-recovery.ts @@ -1,5 +1,15 @@ +import type { + ChannelMessageSendCommitContext, + ChannelMessageUnknownSendReconciliationResult, +} from "../../channels/message/types.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { formatErrorMessage } from "../errors.js"; +import { resolveOutboundChannelMessageAdapter } from "./channel-resolution.js"; +import type { OutboundDeliveryResult } from "./deliver-types.js"; +import { + isOutboundDeliveryResultArray, + runOutboundDeliveryCommitHooks, +} from "./delivery-commit-hooks.js"; import { ackDelivery, failDelivery, @@ -22,6 +32,7 @@ export type DeliverFn = ( cfg: OpenClawConfig; } & QueuedDeliveryPayload & { skipQueue?: boolean; + deferCommitHooks?: boolean; }, ) => Promise; @@ -116,10 +127,12 @@ function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) to: entry.to, accountId: entry.accountId, payloads: entry.payloads, + renderedBatchPlan: entry.renderedBatchPlan, threadId: entry.threadId, replyToId: entry.replyToId, replyToMode: entry.replyToMode, formatting: entry.formatting, + identity: entry.identity, bestEffort: entry.bestEffort, gifPlayback: entry.gifPlayback, forceDocument: entry.forceDocument, @@ -128,9 +141,157 @@ function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) session: entry.session, gatewayClientScopes: entry.gatewayClientScopes, skipQueue: true, // Prevent re-enqueueing during recovery. + deferCommitHooks: true, } satisfies Parameters[0]; } +async function reconcileUnknownQueuedDelivery(opts: { + entry: QueuedDelivery; + cfg: OpenClawConfig; + log: RecoveryLogger; +}): Promise { + const adapter = resolveOutboundChannelMessageAdapter({ + channel: opts.entry.channel, + cfg: opts.cfg, + allowBootstrap: true, + }); + if (adapter?.durableFinal?.capabilities?.reconcileUnknownSend !== true) { + return null; + } + const reconcileUnknownSend = adapter?.durableFinal?.reconcileUnknownSend; + if (!reconcileUnknownSend) { + return null; + } + const { entry } = opts; + try { + return await reconcileUnknownSend({ + cfg: opts.cfg, + queueId: entry.id, + channel: entry.channel, + to: entry.to, + ...(entry.accountId !== undefined ? { accountId: entry.accountId } : {}), + enqueuedAt: entry.enqueuedAt, + retryCount: entry.retryCount, + ...(entry.platformSendStartedAt !== undefined + ? { platformSendStartedAt: entry.platformSendStartedAt } + : {}), + payloads: entry.payloads, + ...(entry.renderedBatchPlan ? { renderedBatchPlan: entry.renderedBatchPlan } : {}), + ...(entry.replyToId !== undefined ? { replyToId: entry.replyToId } : {}), + ...(entry.replyToMode !== undefined ? { replyToMode: entry.replyToMode } : {}), + ...(entry.threadId !== undefined ? { threadId: entry.threadId } : {}), + ...(entry.silent !== undefined ? { silent: entry.silent } : {}), + }); + } catch (err) { + const error = formatErrorMessage(err); + opts.log.warn(`Delivery entry ${opts.entry.id} unknown-send reconciliation failed: ${error}`); + return { status: "unresolved", error, retryable: true }; + } +} + +function buildReconciledSentResult( + entry: QueuedDelivery, + reconciliation: Extract, +): OutboundDeliveryResult { + return { + channel: entry.channel, + messageId: + reconciliation.messageId ?? + reconciliation.receipt.primaryPlatformMessageId ?? + reconciliation.receipt.platformMessageIds[0] ?? + "", + receipt: reconciliation.receipt, + }; +} + +function buildReconciledCommitContext(params: { + entry: QueuedDelivery; + cfg: OpenClawConfig; + result: OutboundDeliveryResult; +}): ChannelMessageSendCommitContext { + const payload = params.entry.payloads[0] ?? {}; + const result = { + messageId: params.result.messageId, + receipt: params.result.receipt ?? { + platformMessageIds: [params.result.messageId].filter(Boolean), + parts: [], + sentAt: Date.now(), + }, + }; + const base = { + cfg: params.cfg, + to: params.entry.to, + accountId: params.entry.accountId, + replyToId: params.entry.replyToId, + replyToMode: params.entry.replyToMode, + threadId: params.entry.threadId, + silent: params.entry.silent, + result, + }; + if ( + payload.presentation !== undefined || + payload.delivery !== undefined || + payload.interactive !== undefined || + (payload.channelData !== undefined && Object.keys(payload.channelData).length > 0) + ) { + return { + ...base, + kind: "payload", + text: payload.text ?? "", + mediaUrl: payload.mediaUrl, + payload, + }; + } + const mediaUrl = payload.mediaUrl ?? payload.mediaUrls?.find((url) => url); + if (mediaUrl) { + return { + ...base, + kind: "media", + text: payload.text ?? "", + mediaUrl, + audioAsVoice: payload.audioAsVoice, + gifPlayback: params.entry.gifPlayback, + forceDocument: params.entry.forceDocument, + }; + } + return { + ...base, + kind: "text", + text: payload.text ?? "", + }; +} + +async function runReconciledSentCommitHooks(params: { + entry: QueuedDelivery; + cfg: OpenClawConfig; + reconciliation: Extract; + log: RecoveryLogger; +}): Promise { + const adapter = resolveOutboundChannelMessageAdapter({ + channel: params.entry.channel, + cfg: params.cfg, + allowBootstrap: true, + }); + const afterCommit = adapter?.send?.lifecycle?.afterCommit; + if (!afterCommit) { + return; + } + const result = buildReconciledSentResult(params.entry, params.reconciliation); + try { + await afterCommit( + buildReconciledCommitContext({ + entry: params.entry, + cfg: params.cfg, + result, + }), + ); + } catch (err) { + params.log.warn( + `Delivery entry ${params.entry.id} reconciled sent afterCommit hook failed: ${formatErrorMessage(err)}`, + ); + } +} + async function moveEntryToFailedWithLogging( entryId: string, log: RecoveryLogger, @@ -196,14 +357,90 @@ async function drainQueuedEntry(opts: { entry: QueuedDelivery; cfg: OpenClawConfig; deliver: DeliverFn; + log: RecoveryLogger; stateDir?: string; onRecovered?: (entry: QueuedDelivery) => void; onFailed?: (entry: QueuedDelivery, errMsg: string) => void; }): Promise<"recovered" | "failed" | "moved-to-failed" | "already-gone"> { const { entry } = opts; + if ( + entry.recoveryState === "send_attempt_started" || + entry.recoveryState === "unknown_after_send" + ) { + const reconciliation = await reconcileUnknownQueuedDelivery({ + entry, + cfg: opts.cfg, + log: opts.log, + }); + if (reconciliation?.status === "sent") { + try { + await ackDelivery(entry.id, opts.stateDir); + await runReconciledSentCommitHooks({ + entry, + cfg: opts.cfg, + reconciliation, + log: opts.log, + }); + opts.onRecovered?.(entry); + opts.log.info(`Delivery entry ${entry.id} reconciled unknown_after_send as already sent`); + return "recovered"; + } catch (ackErr) { + if (getErrnoCode(ackErr) === "ENOENT") { + return "already-gone"; + } + const errMsg = `failed to ack reconciled sent delivery: ${formatErrorMessage(ackErr)}`; + opts.log.warn(`Delivery entry ${entry.id} ${errMsg}`); + opts.onFailed?.(entry, errMsg); + try { + await failDelivery(entry.id, errMsg, opts.stateDir); + return "failed"; + } catch (failErr) { + if (getErrnoCode(failErr) === "ENOENT") { + return "already-gone"; + } + } + return "failed"; + } + } + if (reconciliation?.status === "not_sent") { + opts.log.info( + `Delivery entry ${entry.id} reconciled ${entry.recoveryState} as not sent; replaying`, + ); + } else { + const errMsg = + reconciliation?.status === "unresolved" && reconciliation.error + ? `delivery state is ${entry.recoveryState} and reconciliation is unresolved: ${reconciliation.error}` + : `delivery state is ${entry.recoveryState}; refusing blind replay without adapter reconciliation`; + opts.log.warn(`Delivery entry ${entry.id} ${errMsg}`); + opts.onFailed?.(entry, errMsg); + if (reconciliation === null || reconciliation.retryable === true) { + try { + await failDelivery(entry.id, errMsg, opts.stateDir); + return "failed"; + } catch (failErr) { + if (getErrnoCode(failErr) === "ENOENT") { + return "already-gone"; + } + } + return "failed"; + } + try { + await moveToFailed(entry.id, opts.stateDir); + return "moved-to-failed"; + } catch (moveErr) { + if (getErrnoCode(moveErr) === "ENOENT") { + return "already-gone"; + } + } + return "failed"; + } + } try { - await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg)); + const result = await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg)); await ackDelivery(entry.id, opts.stateDir); + if (isOutboundDeliveryResultArray(result)) { + await runOutboundDeliveryCommitHooks(result); + } opts.onRecovered?.(entry); return "recovered"; } catch (err) { @@ -314,6 +551,7 @@ export async function drainPendingDeliveries(opts: { entry: currentEntry, cfg: opts.cfg, deliver, + log: opts.log, stateDir: opts.stateDir, onFailed: (failedEntry, errMsg) => { if (isPermanentDeliveryError(errMsg)) { @@ -405,6 +643,7 @@ export async function recoverPendingDeliveries(opts: { entry: currentEntry, cfg: opts.cfg, deliver: opts.deliver, + log: opts.log, stateDir: opts.stateDir, onRecovered: (recoveredEntry) => { summary.recovered += 1; diff --git a/src/infra/outbound/delivery-queue-storage.ts b/src/infra/outbound/delivery-queue-storage.ts index 1e1a9a68021..806f4fc3c34 100644 --- a/src/infra/outbound/delivery-queue-storage.ts +++ b/src/infra/outbound/delivery-queue-storage.ts @@ -1,10 +1,12 @@ import fs from "node:fs"; import path from "node:path"; import type { ReplyPayload } from "../../auto-reply/types.js"; +import type { RenderedMessageBatchPlanItem } from "../../channels/message/types.js"; import { resolveStateDir } from "../../config/paths.js"; import type { ReplyToMode } from "../../config/types.js"; import { generateSecureUuid } from "../secure-random.js"; import type { OutboundDeliveryFormattingOptions } from "./formatting.js"; +import type { OutboundIdentity } from "./identity.js"; import type { OutboundMirror } from "./mirror.js"; import type { OutboundSessionContext } from "./session-context.js"; import type { OutboundChannel } from "./targets.js"; @@ -12,6 +14,17 @@ import type { OutboundChannel } from "./targets.js"; const QUEUE_DIRNAME = "delivery-queue"; const FAILED_DIRNAME = "failed"; +export type QueuedRenderedMessageBatchPlan = { + payloadCount: number; + textCount: number; + mediaCount: number; + voiceCount: number; + presentationCount: number; + interactiveCount: number; + channelDataCount: number; + items: readonly RenderedMessageBatchPlanItem[]; +}; + export type QueuedDeliveryPayload = { channel: Exclude; to: string; @@ -22,10 +35,13 @@ export type QueuedDeliveryPayload = { * should produce the same result on replay. */ payloads: ReplyPayload[]; + /** Replayable projection summary captured when the durable send intent is created. */ + renderedBatchPlan?: QueuedRenderedMessageBatchPlan; threadId?: string | number | null; replyToId?: string | null; replyToMode?: ReplyToMode; formatting?: OutboundDeliveryFormattingOptions; + identity?: OutboundIdentity; bestEffort?: boolean; gifPlayback?: boolean; forceDocument?: boolean; @@ -43,6 +59,8 @@ export interface QueuedDelivery extends QueuedDeliveryPayload { retryCount: number; lastAttemptAt?: number; lastError?: string; + platformSendStartedAt?: number; + recoveryState?: "send_attempt_started" | "unknown_after_send"; } export function resolveQueueDir(stateDir?: string): string { @@ -144,10 +162,12 @@ export async function enqueueDelivery( to: params.to, accountId: params.accountId, payloads: params.payloads, + renderedBatchPlan: params.renderedBatchPlan, threadId: params.threadId, replyToId: params.replyToId, replyToMode: params.replyToMode, formatting: params.formatting, + identity: params.identity, bestEffort: params.bestEffort, gifPlayback: params.gifPlayback, forceDocument: params.forceDocument, @@ -198,6 +218,28 @@ export async function failDelivery(id: string, error: string, stateDir?: string) await writeQueueEntry(filePath, entry); } +export async function markDeliveryPlatformSendAttemptStarted( + id: string, + stateDir?: string, +): Promise { + const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`); + const entry = await readQueueEntry(filePath); + entry.platformSendStartedAt = entry.platformSendStartedAt ?? Date.now(); + entry.recoveryState = "send_attempt_started"; + await writeQueueEntry(filePath, entry); +} + +export async function markDeliveryPlatformOutcomeUnknown( + id: string, + stateDir?: string, +): Promise { + const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`); + const entry = await readQueueEntry(filePath); + entry.platformSendStartedAt = entry.platformSendStartedAt ?? Date.now(); + entry.recoveryState = "unknown_after_send"; + await writeQueueEntry(filePath, entry); +} + /** Load a single pending delivery entry by ID from the queue directory. */ export async function loadPendingDelivery( id: string, diff --git a/src/infra/outbound/delivery-queue.recovery.test.ts b/src/infra/outbound/delivery-queue.recovery.test.ts index 21e15736e40..10970aee3f5 100644 --- a/src/infra/outbound/delivery-queue.recovery.test.ts +++ b/src/infra/outbound/delivery-queue.recovery.test.ts @@ -1,6 +1,7 @@ import fs from "node:fs"; import path from "node:path"; -import { describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { attachOutboundDeliveryCommitHook } from "./delivery-commit-hooks.js"; import { enqueueDelivery, loadPendingDeliveries, @@ -14,10 +15,20 @@ import { setQueuedEntryState, } from "./delivery-queue.test-helpers.js"; +const resolveOutboundChannelMessageAdapterMock = vi.hoisted(() => vi.fn()); + +vi.mock("./channel-resolution.js", () => ({ + resolveOutboundChannelMessageAdapter: resolveOutboundChannelMessageAdapterMock, +})); + describe("delivery-queue recovery", () => { const { tmpDir } = installDeliveryQueueTmpDirHooks(); const baseCfg = {}; + beforeEach(() => { + resolveOutboundChannelMessageAdapterMock.mockReset(); + }); + const enqueueCrashRecoveryEntries = async () => { await enqueueDelivery( { channel: "demo-channel-a", to: "+1", payloads: [{ text: "a" }] }, @@ -98,6 +109,353 @@ describe("delivery-queue recovery", () => { expect(entries[0]?.lastError).toBe("network down"); }); + it("retains entries abandoned after platform send may have started without reconciliation", async () => { + const id = await enqueueDelivery( + { channel: "demo-channel-a", to: "+1", payloads: [{ text: "maybe sent" }] }, + tmpDir(), + ); + setQueuedEntryState(tmpDir(), id, { + retryCount: 0, + platformSendStartedAt: Date.now(), + recoveryState: "unknown_after_send", + }); + + const deliver = vi.fn().mockResolvedValue([]); + const log = createRecoveryLog(); + const { result } = await runRecovery({ deliver, log }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result).toEqual({ + recovered: 0, + failed: 1, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + const entries = await loadPendingDeliveries(tmpDir()); + expect(entries).toHaveLength(1); + expect(entries[0]?.id).toBe(id); + expect(entries[0]?.retryCount).toBe(1); + expect(entries[0]?.lastError).toContain("unknown_after_send"); + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("unknown_after_send")); + }); + + it("retains started entries without reconciliation instead of blindly replaying", async () => { + const id = await enqueueDelivery( + { channel: "demo-channel-a", to: "+1", payloads: [{ text: "not yet sent" }] }, + tmpDir(), + ); + setQueuedEntryState(tmpDir(), id, { + retryCount: 0, + platformSendStartedAt: Date.now(), + recoveryState: "send_attempt_started", + }); + + const deliver = vi.fn().mockResolvedValue([]); + const log = createRecoveryLog(); + const { result } = await runRecovery({ deliver, log }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result).toEqual({ + recovered: 0, + failed: 1, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + const entries = await loadPendingDeliveries(tmpDir()); + expect(entries).toHaveLength(1); + expect(entries[0]?.id).toBe(id); + expect(entries[0]?.retryCount).toBe(1); + expect(entries[0]?.lastError).toContain("send_attempt_started"); + expect(log.warn).toHaveBeenCalledWith( + expect.stringContaining("refusing blind replay without adapter reconciliation"), + ); + }); + + it("replays started entries only after adapter proves they were not sent", async () => { + const id = await enqueueDelivery( + { channel: "demo-channel-a", to: "+1", payloads: [{ text: "not yet sent" }] }, + tmpDir(), + ); + setQueuedEntryState(tmpDir(), id, { + retryCount: 0, + platformSendStartedAt: Date.now(), + recoveryState: "send_attempt_started", + }); + resolveOutboundChannelMessageAdapterMock.mockReturnValue({ + durableFinal: { + capabilities: { reconcileUnknownSend: true }, + reconcileUnknownSend: vi.fn().mockResolvedValue({ status: "not_sent" }), + }, + }); + + const deliver = vi.fn().mockResolvedValue([]); + const { result } = await runRecovery({ deliver }); + + expect(resolveOutboundChannelMessageAdapterMock).toHaveBeenCalledWith({ + channel: "demo-channel-a", + cfg: baseCfg, + allowBootstrap: true, + }); + expect(deliver).toHaveBeenCalledWith( + expect.objectContaining({ channel: "demo-channel-a", to: "+1", skipQueue: true }), + ); + expect(result).toEqual({ + recovered: 1, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0); + }); + + it("acks unknown-after-send entries reconciled as already sent before commit hooks", async () => { + const id = await enqueueDelivery( + { + channel: "demo-channel-a", + to: "+1", + accountId: "acct-1", + payloads: [{ text: "maybe sent" }], + replyToId: "root-message", + threadId: "thread-1", + silent: true, + }, + tmpDir(), + ); + setQueuedEntryState(tmpDir(), id, { + retryCount: 0, + platformSendStartedAt: Date.now(), + recoveryState: "unknown_after_send", + }); + const order: string[] = []; + const afterCommit = vi.fn(() => { + order.push("afterCommit"); + }); + const reconcileUnknownSend = vi.fn().mockResolvedValue({ + status: "sent", + messageId: "platform-1", + receipt: { + primaryPlatformMessageId: "platform-1", + platformMessageIds: ["platform-1"], + parts: [{ platformMessageId: "platform-1", kind: "text", index: 0 }], + sentAt: 1, + }, + }); + resolveOutboundChannelMessageAdapterMock.mockReturnValue({ + durableFinal: { + capabilities: { reconcileUnknownSend: true }, + reconcileUnknownSend, + }, + send: { + lifecycle: { + afterCommit, + }, + }, + }); + + const rename = fs.promises.rename.bind(fs.promises); + const renameSpy = vi.spyOn(fs.promises, "rename").mockImplementation(async (...args) => { + order.push("ack"); + return await rename(...args); + }); + + try { + const deliver = vi.fn().mockResolvedValue([]); + const { result } = await runRecovery({ deliver }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result).toEqual({ + recovered: 1, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + expect(reconcileUnknownSend).toHaveBeenCalledWith( + expect.objectContaining({ + cfg: baseCfg, + queueId: id, + channel: "demo-channel-a", + to: "+1", + accountId: "acct-1", + payloads: [{ text: "maybe sent" }], + replyToId: "root-message", + threadId: "thread-1", + silent: true, + retryCount: 0, + }), + ); + expect(afterCommit).toHaveBeenCalledWith( + expect.objectContaining({ + kind: "text", + to: "+1", + accountId: "acct-1", + replyToId: "root-message", + threadId: "thread-1", + silent: true, + result: expect.objectContaining({ messageId: "platform-1" }), + }), + ); + expect(order).toEqual(["ack", "afterCommit"]); + expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0); + } finally { + renameSpy.mockRestore(); + } + }); + + it("records retry state when acking a reconciled sent entry fails", async () => { + const id = await enqueueDelivery( + { channel: "demo-channel-a", to: "+1", payloads: [{ text: "maybe sent" }] }, + tmpDir(), + ); + setQueuedEntryState(tmpDir(), id, { + retryCount: 0, + platformSendStartedAt: Date.now(), + recoveryState: "unknown_after_send", + }); + resolveOutboundChannelMessageAdapterMock.mockReturnValue({ + durableFinal: { + capabilities: { reconcileUnknownSend: true }, + reconcileUnknownSend: vi.fn().mockResolvedValue({ + status: "sent", + messageId: "platform-1", + receipt: { + primaryPlatformMessageId: "platform-1", + platformMessageIds: ["platform-1"], + parts: [{ platformMessageId: "platform-1", kind: "text", index: 0 }], + sentAt: 1, + }, + }), + }, + }); + const renameSpy = vi + .spyOn(fs.promises, "rename") + .mockRejectedValueOnce(Object.assign(new Error("ack denied"), { code: "EACCES" })); + + try { + const deliver = vi.fn().mockResolvedValue([]); + const log = createRecoveryLog(); + const { result } = await runRecovery({ deliver, log }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result).toEqual({ + recovered: 0, + failed: 1, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + const entries = await loadPendingDeliveries(tmpDir()); + expect(entries).toHaveLength(1); + expect(entries[0]?.id).toBe(id); + expect(entries[0]?.retryCount).toBe(1); + expect(entries[0]?.lastError).toContain("failed to ack reconciled sent delivery"); + expect(entries[0]?.lastError).toContain("ack denied"); + expect(log.warn).toHaveBeenCalledWith( + expect.stringContaining("failed to ack reconciled sent delivery"), + ); + } finally { + renameSpy.mockRestore(); + } + }); + + it("replays unknown-after-send entries only after adapter proves they were not sent", async () => { + const id = await enqueueDelivery( + { channel: "demo-channel-a", to: "+1", payloads: [{ text: "not sent" }] }, + tmpDir(), + ); + setQueuedEntryState(tmpDir(), id, { + retryCount: 0, + platformSendStartedAt: Date.now(), + recoveryState: "unknown_after_send", + }); + resolveOutboundChannelMessageAdapterMock.mockReturnValue({ + durableFinal: { + capabilities: { reconcileUnknownSend: true }, + reconcileUnknownSend: vi.fn().mockResolvedValue({ status: "not_sent" }), + }, + }); + + const deliver = vi.fn().mockResolvedValue([]); + const { result } = await runRecovery({ deliver }); + + expect(deliver).toHaveBeenCalledTimes(1); + expect(deliver).toHaveBeenCalledWith( + expect.objectContaining({ channel: "demo-channel-a", to: "+1", skipQueue: true }), + ); + expect(result).toEqual({ + recovered: 1, + failed: 0, + skippedMaxRetries: 0, + deferredBackoff: 0, + }); + expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0); + }); + + it("keeps retryable unresolved unknown-after-send entries on the queue without replaying", async () => { + const id = await enqueueDelivery( + { channel: "demo-channel-a", to: "+1", payloads: [{ text: "unknown" }] }, + tmpDir(), + ); + setQueuedEntryState(tmpDir(), id, { + retryCount: 0, + platformSendStartedAt: Date.now(), + recoveryState: "unknown_after_send", + }); + resolveOutboundChannelMessageAdapterMock.mockReturnValue({ + durableFinal: { + capabilities: { reconcileUnknownSend: true }, + reconcileUnknownSend: vi.fn().mockResolvedValue({ + status: "unresolved", + error: "provider lookup timed out", + retryable: true, + }), + }, + }); + + const deliver = vi.fn().mockResolvedValue([]); + const { result } = await runRecovery({ deliver }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result.failed).toBe(1); + const entries = await loadPendingDeliveries(tmpDir()); + expect(entries).toHaveLength(1); + expect(entries[0]?.id).toBe(id); + expect(entries[0]?.retryCount).toBe(1); + expect(entries[0]?.recoveryState).toBe("unknown_after_send"); + expect(entries[0]?.lastError).toContain("provider lookup timed out"); + }); + + it("does not reconcile unknown-after-send entries unless the adapter declares the capability", async () => { + const id = await enqueueDelivery( + { channel: "demo-channel-a", to: "+1", payloads: [{ text: "hidden method" }] }, + tmpDir(), + ); + setQueuedEntryState(tmpDir(), id, { + retryCount: 0, + platformSendStartedAt: Date.now(), + recoveryState: "unknown_after_send", + }); + const reconcileUnknownSend = vi.fn().mockResolvedValue({ status: "not_sent" }); + resolveOutboundChannelMessageAdapterMock.mockReturnValue({ + durableFinal: { + reconcileUnknownSend, + }, + }); + + const deliver = vi.fn().mockResolvedValue([]); + const log = createRecoveryLog(); + const { result } = await runRecovery({ deliver, log }); + + expect(reconcileUnknownSend).not.toHaveBeenCalled(); + expect(deliver).not.toHaveBeenCalled(); + expect(result.failed).toBe(1); + const entries = await loadPendingDeliveries(tmpDir()); + expect(entries).toHaveLength(1); + expect(entries[0]?.id).toBe(id); + expect(entries[0]?.retryCount).toBe(1); + expect(log.warn).toHaveBeenCalledWith( + expect.stringContaining("refusing blind replay without adapter reconciliation"), + ); + }); + it("moves entries to failed/ immediately on permanent delivery errors", async () => { const id = await enqueueDelivery( { channel: "demo-channel", to: "user:abc", payloads: [{ text: "hi" }] }, @@ -150,6 +508,36 @@ describe("delivery-queue recovery", () => { expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true })); }); + it("runs recovered send commit hooks only after the queue entry is acked", async () => { + const id = await enqueueDelivery( + { channel: "demo-channel-a", to: "+1", payloads: [{ text: "a" }] }, + tmpDir(), + ); + const order: string[] = []; + const result = attachOutboundDeliveryCommitHook( + { channel: "demo-channel-a", messageId: "m1" }, + async () => { + order.push( + fs.existsSync(path.join(tmpDir(), "delivery-queue", "pending", `${id}.json`)) + ? "commit-before-ack" + : "commit-after-ack", + ); + }, + ); + const deliver = vi.fn(async () => { + order.push("deliver"); + return [result]; + }); + + await runRecovery({ deliver }); + + expect(order).toEqual(["deliver", "commit-after-ack"]); + expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0); + expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "pending", `${id}.json`))).toBe( + false, + ); + }); + it("replays stored delivery options during recovery", async () => { await enqueueDelivery( { diff --git a/src/infra/outbound/delivery-queue.storage.test.ts b/src/infra/outbound/delivery-queue.storage.test.ts index 5cbd2895179..1538c394938 100644 --- a/src/infra/outbound/delivery-queue.storage.test.ts +++ b/src/infra/outbound/delivery-queue.storage.test.ts @@ -6,6 +6,8 @@ import { enqueueDelivery, failDelivery, loadPendingDeliveries, + markDeliveryPlatformOutcomeUnknown, + markDeliveryPlatformSendAttemptStarted, moveToFailed, } from "./delivery-queue.js"; import { installDeliveryQueueTmpDirHooks, readQueuedEntry } from "./delivery-queue.test-helpers.js"; @@ -24,6 +26,16 @@ describe("delivery-queue storage", () => { channel: "directchat", to: "+1555", payloads: [{ text: "hello" }], + renderedBatchPlan: { + payloadCount: 1, + textCount: 1, + mediaCount: 0, + voiceCount: 0, + presentationCount: 0, + interactiveCount: 0, + channelDataCount: 0, + items: [{ index: 0, kinds: ["text"] as const, text: "hello", mediaUrls: [] }], + }, bestEffort: true, gifPlayback: true, silent: true, @@ -50,6 +62,16 @@ describe("delivery-queue storage", () => { id, channel: "directchat", to: "+1555", + renderedBatchPlan: { + payloadCount: 1, + textCount: 1, + mediaCount: 0, + voiceCount: 0, + presentationCount: 0, + interactiveCount: 0, + channelDataCount: 0, + items: [{ index: 0, kinds: ["text"] as const, text: "hello", mediaUrls: [] }], + }, bestEffort: true, gifPlayback: true, silent: true, @@ -115,6 +137,45 @@ describe("delivery-queue storage", () => { }); describe("failDelivery", () => { + it("marks entries as send-attempt-started before platform I/O", async () => { + const id = await enqueueTextDelivery( + { + channel: "forum", + to: "123", + payloads: [{ text: "test" }], + }, + tmpDir(), + ); + + await markDeliveryPlatformSendAttemptStarted(id, tmpDir()); + + const entry = readQueuedEntry(tmpDir(), id); + expect(typeof entry.platformSendStartedAt).toBe("number"); + expect((entry.platformSendStartedAt as number) > 0).toBe(true); + expect(entry.recoveryState).toBe("send_attempt_started"); + expect(entry.retryCount).toBe(0); + }); + + it("marks entries as unknown-after-send after platform I/O returns", async () => { + const id = await enqueueTextDelivery( + { + channel: "forum", + to: "123", + payloads: [{ text: "test" }], + }, + tmpDir(), + ); + + await markDeliveryPlatformSendAttemptStarted(id, tmpDir()); + await markDeliveryPlatformOutcomeUnknown(id, tmpDir()); + + const entry = readQueuedEntry(tmpDir(), id); + expect(typeof entry.platformSendStartedAt).toBe("number"); + expect((entry.platformSendStartedAt as number) > 0).toBe(true); + expect(entry.recoveryState).toBe("unknown_after_send"); + expect(entry.retryCount).toBe(0); + }); + it("increments retryCount, records attempt time, and sets lastError", async () => { const id = await enqueueTextDelivery( { diff --git a/src/infra/outbound/delivery-queue.test-helpers.ts b/src/infra/outbound/delivery-queue.test-helpers.ts index 1d48f4a5a72..e202d799526 100644 --- a/src/infra/outbound/delivery-queue.test-helpers.ts +++ b/src/infra/outbound/delivery-queue.test-helpers.ts @@ -40,7 +40,13 @@ export function readQueuedEntry(tmpDir: string, id: string): Record 0 ? mergedMediaUrls : mediaUrl ? [mediaUrl] : undefined; + const rawDelivery = params.delivery; + const delivery = + rawDelivery && typeof rawDelivery === "object" && !Array.isArray(rawDelivery) + ? (rawDelivery as ReplyPayloadDelivery) + : undefined; + const rawChannelData = params.channelData; + const channelData = + rawChannelData && typeof rawChannelData === "object" && !Array.isArray(rawChannelData) + ? (rawChannelData as Record) + : undefined; + const presentation = normalizeMessagePresentation(params.presentation); + const interactive = normalizeInteractiveReply(params.interactive); + const payload: ReplyPayload = { + text: message, + ...(mediaUrl ? { mediaUrl } : {}), + ...(mergedMediaUrls.length ? { mediaUrls: mergedMediaUrls } : {}), + ...(asVoice ? { audioAsVoice: true } : {}), + ...(presentation ? { presentation } : {}), + ...(interactive ? { interactive } : {}), + ...(delivery ? { delivery } : {}), + ...(channelData ? { channelData } : {}), + }; throwIfAborted(abortSignal); const gatewayPluginAction = await runGatewayPluginMessageActionOrNull({ @@ -779,6 +804,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise ({ getChannelPlugin: vi.fn(), resolveOutboundTarget: vi.fn(), deliverOutboundPayloads: vi.fn(), + resolveOutboundDurableFinalDeliverySupport: vi.fn(), resolveRuntimePluginRegistry: vi.fn(), })); @@ -43,6 +44,7 @@ vi.mock("./targets.js", () => ({ vi.mock("./deliver.js", () => ({ deliverOutboundPayloads: mocks.deliverOutboundPayloads, + resolveOutboundDurableFinalDeliverySupport: mocks.resolveOutboundDurableFinalDeliverySupport, })); vi.mock("../../utils/message-channel.js", async () => { @@ -78,6 +80,7 @@ describe("sendMessage", () => { mocks.getChannelPlugin.mockClear(); mocks.resolveOutboundTarget.mockClear(); mocks.deliverOutboundPayloads.mockClear(); + mocks.resolveOutboundDurableFinalDeliverySupport.mockClear(); mocks.resolveRuntimePluginRegistry.mockClear(); mocks.getChannelPlugin.mockReturnValue({ @@ -85,6 +88,7 @@ describe("sendMessage", () => { }); mocks.resolveOutboundTarget.mockImplementation(({ to }: { to: string }) => ({ ok: true, to })); mocks.deliverOutboundPayloads.mockResolvedValue([{ channel: "forum", messageId: "m1" }]); + mocks.resolveOutboundDurableFinalDeliverySupport.mockResolvedValue({ ok: true }); }); it("passes explicit agentId to outbound delivery for scoped media roots", async () => { @@ -227,6 +231,66 @@ describe("sendMessage", () => { ); }); + it("forwards prepared payloads and required queue policy into outbound delivery", async () => { + const mediaAccess = { + localRoots: ["/tmp/media"], + readFile: vi.fn(async () => Buffer.from("media")), + }; + + await sendMessage({ + cfg: {}, + channel: "forum", + to: "123456", + content: "fallback text", + payloads: [{ text: "prepared", channelData: { forum: { card: true } } }], + queuePolicy: "required", + mediaAccess, + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + payloads: [ + expect.objectContaining({ + text: "prepared", + channelData: { forum: { card: true } }, + }), + ], + queuePolicy: "required", + mediaAccess, + }), + ); + expect(mocks.resolveOutboundDurableFinalDeliverySupport).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "forum", + requirements: expect.objectContaining({ + payload: true, + reconcileUnknownSend: true, + }), + }), + ); + }); + + it("rejects required durable sends before enqueue when replay safety is unsupported", async () => { + mocks.resolveOutboundDurableFinalDeliverySupport.mockResolvedValueOnce({ + ok: false, + reason: "capability_mismatch", + capability: "reconcileUnknownSend", + }); + + await expect( + sendMessage({ + cfg: {}, + channel: "forum", + to: "123456", + content: "fallback text", + payloads: [{ text: "prepared", channelData: { forum: { card: true } } }], + queuePolicy: "required", + }), + ).rejects.toThrow("missing reconcileUnknownSend"); + + expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled(); + }); + it("applies mirror matrix semantics for MEDIA and silent token variants", async () => { const matrix: Array<{ name: string; diff --git a/src/infra/outbound/message.ts b/src/infra/outbound/message.ts index becfcd9c4c1..cca21b01a5e 100644 --- a/src/infra/outbound/message.ts +++ b/src/infra/outbound/message.ts @@ -1,4 +1,7 @@ +import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; +import { deriveDurableFinalDeliveryRequirements } from "../../channels/message/capabilities.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import type { OutboundMediaAccess } from "../../media/load-options.js"; import type { PollInput } from "../../polls.js"; import { normalizePollInput } from "../../polls.js"; import { @@ -11,7 +14,10 @@ import { resolveOutboundChannelPlugin } from "./channel-resolution.js"; import { resolveMessageChannelSelection } from "./channel-selection.js"; import { deliverOutboundPayloads, + resolveOutboundDurableFinalDeliverySupport, + type DurableFinalDeliveryRequirements, type OutboundDeliveryResult, + type OutboundDeliveryQueuePolicy, type OutboundSendDeps, } from "./deliver.js"; import type { OutboundMirror } from "./mirror.js"; @@ -75,6 +81,9 @@ type MessageSendParams = { threadId?: string | number; dryRun?: boolean; bestEffort?: boolean; + queuePolicy?: OutboundDeliveryQueuePolicy; + payloads?: ReplyPayload[]; + mediaAccess?: OutboundMediaAccess; deps?: OutboundSendDeps; cfg?: OpenClawConfig; gateway?: MessageGatewayOptions; @@ -177,6 +186,76 @@ function resolveRequiredPlugin(channel: string, cfg: OpenClawConfig) { return plugin; } +function payloadRequiresDurablePayloadTransport(payload: ReplyPayload): boolean { + return ( + payload.presentation !== undefined || + payload.delivery !== undefined || + payload.interactive !== undefined || + (payload.channelData !== undefined && Object.keys(payload.channelData).length > 0) + ); +} + +function mergeDurableRequirements( + target: DurableFinalDeliveryRequirements, + source: DurableFinalDeliveryRequirements, +): DurableFinalDeliveryRequirements { + for (const [capability, required] of Object.entries(source) as Array< + [keyof DurableFinalDeliveryRequirements, boolean | undefined] + >) { + if (required === true) { + target[capability] = true; + } + } + return target; +} + +function deriveRequiredMessageSendCapabilities(params: { + payloads: ReplyPayload[]; + replyToId?: string | null; + threadId?: string | number | null; + silent?: boolean; +}): DurableFinalDeliveryRequirements { + const requirements: DurableFinalDeliveryRequirements = { reconcileUnknownSend: true }; + for (const payload of params.payloads) { + mergeDurableRequirements( + requirements, + deriveDurableFinalDeliveryRequirements({ + payload, + replyToId: params.replyToId, + threadId: params.threadId, + silent: params.silent, + payloadTransport: payloadRequiresDurablePayloadTransport(payload), + batch: params.payloads.length > 1, + reconcileUnknownSend: true, + }), + ); + } + return requirements; +} + +async function assertRequiredMessageSendDurability(params: { + cfg: OpenClawConfig; + channel: Exclude; + payloads: ReplyPayload[]; + replyToId?: string | null; + threadId?: string | number | null; + silent?: boolean; +}): Promise { + const support = await resolveOutboundDurableFinalDeliverySupport({ + cfg: params.cfg, + channel: params.channel, + requirements: deriveRequiredMessageSendCapabilities(params), + }); + if (support.ok) { + return; + } + const suffix = + support.reason === "capability_mismatch" && support.capability + ? `missing ${support.capability}` + : support.reason; + throw new Error(`Required durable message send is unsupported for ${params.channel}: ${suffix}`); +} + function resolveGatewayOptions(opts?: MessageGatewayOptions) { // Security: backend callers (tools/agents) must not accept user-controlled gateway URLs. // Use config-derived gateway target only. @@ -238,14 +317,18 @@ export async function sendMessage(params: MessageSendParams): Promise 0 + ? params.payloads + : [ + { + text: params.content, + mediaUrl: params.mediaUrl, + mediaUrls: params.mediaUrls, + audioAsVoice: params.asVoice === true, + }, + ]; + const outboundPlan = createOutboundPayloadPlan(outboundPayloads); const normalizedPayloads = projectOutboundPayloadPlanForDelivery(outboundPlan); const mirrorProjection = projectOutboundPayloadPlanForMirror(outboundPlan); const mirrorText = mirrorProjection.text; @@ -286,6 +369,16 @@ export async function sendMessage(params: MessageSendParams): Promise vi.fn(() => [])); @@ -175,6 +181,7 @@ describe("executeSendAction", () => { }); beforeEach(() => { + setActivePluginRegistry(createTestRegistry([])); mocks.dispatchChannelMessageAction.mockClear(); mocks.sendMessage.mockClear(); mocks.sendPoll.mockClear(); @@ -589,6 +596,92 @@ describe("executeSendAction", () => { ); }); + it("routes prepared plugin send payloads through core best-effort delivery by default", async () => { + const prepareSendPayload = vi.fn(({ payload }) => ({ + ...payload, + channelData: { prepared: true }, + })); + const plugin: ChannelPlugin = { + ...createChannelTestPluginBase({ id: "discord" }), + actions: { + describeMessageTool: () => ({ actions: ["send"] }), + prepareSendPayload, + handleAction: async () => ({ content: [], details: { ok: true } }), + }, + outbound: { deliveryMode: "direct" }, + }; + setActivePluginRegistry(createTestRegistry([{ pluginId: "discord", plugin, source: "test" }])); + mocks.sendMessage.mockResolvedValue({ + channel: "discord", + to: "channel:123", + via: "direct", + mediaUrl: null, + }); + + await executeSendAction({ + ctx: { + cfg: {}, + channel: "discord", + params: { to: "channel:123", message: "hello" }, + dryRun: false, + }, + to: "channel:123", + message: "hello", + }); + + expect(prepareSendPayload).toHaveBeenCalled(); + expect(mocks.dispatchChannelMessageAction).not.toHaveBeenCalled(); + expect(mocks.sendMessage).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "discord", + queuePolicy: "best_effort", + payloads: [expect.objectContaining({ channelData: { prepared: true } })], + }), + ); + }); + + it("uses required core delivery only when the send action opts out of best-effort", async () => { + const prepareSendPayload = vi.fn(({ payload }) => ({ + ...payload, + channelData: { prepared: true }, + })); + const plugin: ChannelPlugin = { + ...createChannelTestPluginBase({ id: "discord" }), + actions: { + describeMessageTool: () => ({ actions: ["send"] }), + prepareSendPayload, + handleAction: async () => ({ content: [], details: { ok: true } }), + }, + outbound: { deliveryMode: "direct" }, + }; + setActivePluginRegistry(createTestRegistry([{ pluginId: "discord", plugin, source: "test" }])); + mocks.sendMessage.mockResolvedValue({ + channel: "discord", + to: "channel:123", + via: "direct", + mediaUrl: null, + }); + + await executeSendAction({ + ctx: { + cfg: {}, + channel: "discord", + params: { to: "channel:123", message: "hello" }, + dryRun: false, + }, + to: "channel:123", + message: "hello", + bestEffort: false, + }); + + expect(mocks.sendMessage).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "discord", + queuePolicy: "required", + }), + ); + }); + it("forwards poll args to sendPoll on core outbound path", async () => { mocks.dispatchChannelMessageAction.mockResolvedValue(null); mocks.sendPoll.mockResolvedValue({ diff --git a/src/infra/outbound/outbound-send-service.ts b/src/infra/outbound/outbound-send-service.ts index ad254b8dc9b..076e1bd4d64 100644 --- a/src/infra/outbound/outbound-send-service.ts +++ b/src/infra/outbound/outbound-send-service.ts @@ -1,7 +1,9 @@ import type { AgentToolResult } from "@mariozechner/pi-agent-core"; +import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; import { dispatchChannelMessageAction } from "../../channels/plugins/message-action-dispatch.js"; import type { ChannelId, + ChannelMessageActionContext, ChannelThreadingToolContext, } from "../../channels/plugins/types.public.js"; import { appendAssistantMessageToSessionTranscript } from "../../config/sessions.js"; @@ -10,6 +12,7 @@ import type { OutboundMediaAccess, OutboundMediaReadFile } from "../../media/loa import { resolveAgentScopedOutboundMediaAccess } from "../../media/read-capability.js"; import type { GatewayClientMode, GatewayClientName } from "../../utils/message-channel.js"; import { throwIfAborted } from "./abort.js"; +import { resolveOutboundChannelPlugin } from "./channel-resolution.js"; import type { OutboundSendDeps } from "./deliver.js"; import type { MessagePollResult, MessageSendResult } from "./message.js"; import { sendMessage, sendPoll } from "./message.js"; @@ -122,10 +125,66 @@ async function tryHandleWithPluginAction(params: { }; } +function createChannelActionContext(params: { + ctx: OutboundSendContext; + action: "send" | "poll"; + mediaAccess?: ReturnType; +}): ChannelMessageActionContext { + const mediaAccess = params.mediaAccess ?? params.ctx.mediaAccess; + return { + channel: params.ctx.channel, + action: params.action, + cfg: params.ctx.cfg, + params: params.ctx.params, + ...(mediaAccess ? { mediaAccess } : {}), + mediaLocalRoots: mediaAccess?.localRoots ?? params.ctx.mediaAccess?.localRoots, + mediaReadFile: mediaAccess?.readFile ?? params.ctx.mediaReadFile, + accountId: params.ctx.accountId ?? undefined, + requesterSenderId: params.ctx.requesterSenderId, + senderIsOwner: params.ctx.senderIsOwner, + sessionKey: params.ctx.sessionKey, + sessionId: params.ctx.sessionId, + agentId: params.ctx.agentId, + gateway: params.ctx.gateway, + toolContext: params.ctx.toolContext, + dryRun: params.ctx.dryRun, + }; +} + +async function tryPreparePluginSendPayload(params: { + ctx: OutboundSendContext; + to: string; + payload: ReplyPayload; + replyToId?: string; + threadId?: string | number; +}): Promise { + const plugin = resolveOutboundChannelPlugin({ + channel: params.ctx.channel, + cfg: params.ctx.cfg, + }); + if (!plugin?.outbound) { + return null; + } + const prepareSendPayload = plugin?.actions?.prepareSendPayload; + if (!prepareSendPayload) { + return null; + } + return ( + (await prepareSendPayload({ + ctx: createChannelActionContext({ ctx: params.ctx, action: "send" }), + to: params.to, + payload: params.payload, + replyToId: params.replyToId, + threadId: params.threadId, + })) ?? null + ); +} + export async function executeSendAction(params: { ctx: OutboundSendContext; to: string; message: string; + payload?: ReplyPayload; mediaUrl?: string; mediaUrls?: string[]; asVoice?: boolean; @@ -141,6 +200,61 @@ export async function executeSendAction(params: { sendResult?: MessageSendResult; }> { throwIfAborted(params.ctx.abortSignal); + const defaultPayload: ReplyPayload = params.payload ?? { + text: params.message, + mediaUrl: params.mediaUrl, + mediaUrls: params.mediaUrls, + audioAsVoice: params.asVoice === true, + }; + const queuePolicy = params.bestEffort === false ? "required" : "best_effort"; + const preparedPayload = await tryPreparePluginSendPayload({ + ctx: params.ctx, + to: params.to, + payload: defaultPayload, + replyToId: params.replyToId, + threadId: params.threadId, + }); + if (preparedPayload) { + throwIfAborted(params.ctx.abortSignal); + const result: MessageSendResult = await sendMessage({ + cfg: params.ctx.cfg, + to: params.to, + content: params.message, + payloads: [preparedPayload], + agentId: params.ctx.agentId, + requesterSessionKey: params.ctx.sessionKey, + requesterAccountId: params.ctx.requesterAccountId ?? params.ctx.accountId ?? undefined, + requesterSenderId: params.ctx.requesterSenderId, + requesterSenderName: params.ctx.requesterSenderName, + requesterSenderUsername: params.ctx.requesterSenderUsername, + requesterSenderE164: params.ctx.requesterSenderE164, + mediaUrl: params.mediaUrl || undefined, + mediaUrls: params.mediaUrls, + asVoice: params.asVoice, + channel: params.ctx.channel || undefined, + accountId: params.ctx.accountId ?? undefined, + replyToId: params.replyToId, + threadId: params.threadId, + gifPlayback: params.gifPlayback, + forceDocument: params.forceDocument, + dryRun: params.ctx.dryRun, + bestEffort: params.bestEffort ?? undefined, + queuePolicy, + deps: params.ctx.deps, + gateway: params.ctx.gateway, + mirror: params.ctx.mirror, + abortSignal: params.ctx.abortSignal, + silent: params.ctx.silent, + mediaAccess: params.ctx.mediaAccess, + }); + + return { + handledBy: "core", + payload: result, + sendResult: result, + }; + } + const pluginHandled = await tryHandleWithPluginAction({ ctx: params.ctx, action: "send", @@ -190,11 +304,13 @@ export async function executeSendAction(params: { forceDocument: params.forceDocument, dryRun: params.ctx.dryRun, bestEffort: params.bestEffort ?? undefined, + queuePolicy, deps: params.ctx.deps, gateway: params.ctx.gateway, mirror: params.ctx.mirror, abortSignal: params.ctx.abortSignal, silent: params.ctx.silent, + mediaAccess: params.ctx.mediaAccess, }); return { diff --git a/src/scripts/test-projects.test.ts b/src/scripts/test-projects.test.ts index 1b41a5f4263..0715ed6a84e 100644 --- a/src/scripts/test-projects.test.ts +++ b/src/scripts/test-projects.test.ts @@ -956,6 +956,7 @@ describe("test-projects args", () => { includePatterns: [ "extensions/discord/src/api-barrel.test.ts", "extensions/discord/src/channel-actions.contract.test.ts", + "extensions/discord/src/channel.message-adapter.test.ts", "extensions/discord/src/channel.test.ts", "extensions/discord/src/monitor/message-handler.bot-self-filter.test.ts", "extensions/discord/src/monitor/message-handler.queue.test.ts", diff --git a/src/utils/usage-format.test.ts b/src/utils/usage-format.test.ts index 5eb8346db31..fc2d9b5616e 100644 --- a/src/utils/usage-format.test.ts +++ b/src/utils/usage-format.test.ts @@ -19,8 +19,8 @@ import { describe("usage-format", () => { const originalAgentDir = process.env.OPENCLAW_AGENT_DIR; const originalStateDir = process.env.OPENCLAW_STATE_DIR; - let stateDir: string; let agentDir: string; + let stateDir: string; beforeEach(async () => { stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-usage-format-"));