feat: route outbound sends through durable lifecycle

This commit is contained in:
Peter Steinberger
2026-05-06 01:40:29 +01:00
parent 8bfabd6bb1
commit 2ead1502c9
26 changed files with 2535 additions and 119 deletions

View File

@@ -1,4 +1,4 @@
import { describe, expect, it } from "vitest";
import { beforeEach, describe, expect, it } from "vitest";
import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../../plugins/runtime.js";
import { createTestRegistry } from "../../test-utils/channel-plugins.js";
import {
@@ -31,6 +31,10 @@ async function expectSameTargetRepliesDelivered(params: { provider: string; to:
}
describe("buildReplyPayloads media filter integration", () => {
beforeEach(() => {
resetPluginRuntimeStateForTest();
});
it("strips legacy bracket tool blocks from heartbeat replies", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,

View File

@@ -29,7 +29,7 @@ type ReplyDispatchSkipHandler = (
type ReplyDispatchDeliverer = (
payload: ReplyPayload,
info: { kind: ReplyDispatchKind },
) => Promise<void>;
) => Promise<unknown>;
export type ReplyDispatchBeforeDeliver = (
payload: ReplyPayload,

View File

@@ -1,8 +1,8 @@
import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js";
import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.types.js";
import { getChannelPlugin } from "../../channels/plugins/index.js";
import { getLoadedChannelPluginForRead } from "../../channels/plugins/registry-loaded-read.js";
import { normalizeAnyChannelId } from "../../channels/registry.js";
import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js";
import {
channelRouteTargetsMatchExact,
stringifyRouteThreadId,
@@ -91,6 +91,18 @@ function normalizeThreadIdForComparison(value?: string): string | undefined {
return stringifyRouteThreadId(value);
}
function normalizeTargetForDedupe(provider: string, rawTarget?: string): string | undefined {
const fallback = normalizeOptionalString(rawTarget);
if (!fallback) {
return undefined;
}
const providerId = normalizeProviderForComparison(provider);
const normalizer = providerId
? getLoadedChannelPluginForRead(providerId)?.messaging?.normalizeTarget
: undefined;
return normalizeOptionalString(normalizer?.(rawTarget ?? "") ?? fallback);
}
function resolveTargetProviderForComparison(params: {
currentProvider: string;
targetProvider?: string;
@@ -113,7 +125,7 @@ function normalizeRouteTargetForDedupe(params: {
accountId?: string;
threadId?: string;
}): MessagingToolDedupeRouteTarget | null {
const to = normalizeTargetForProvider(params.provider, params.rawTarget);
const to = normalizeTargetForDedupe(params.provider, params.rawTarget);
if (!to) {
return null;
}

View File

@@ -33,7 +33,7 @@ import {
type SavedMedia,
saveMediaBuffer,
} from "../../media/store.js";
import { createChannelReplyPipeline } from "../../plugin-sdk/channel-reply-pipeline.js";
import { createChannelMessageReplyPipeline } from "../../plugin-sdk/channel-message.js";
import { isPluginOwnedSessionBindingRecord } from "../../plugins/conversation-binding.js";
import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
@@ -2247,7 +2247,7 @@ export const chatHandlers: GatewayRequestHandlers = {
ctx.MediaStaged = true;
}
const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({
const { onModelSelected, ...replyPipeline } = createChannelMessageReplyPipeline({
cfg,
agentId,
channel: INTERNAL_MESSAGE_CHANNEL,

View File

@@ -3,7 +3,7 @@ import type { ChannelPlugin } from "../channels/plugins/types.plugin.js";
type LoadedSessionEntry = ReturnType<typeof import("./session-utils.js").loadSessionEntry>;
type RecordInboundSessionAndDispatchReplyParams = Parameters<
typeof import("../plugin-sdk/inbound-reply-dispatch.js").recordInboundSessionAndDispatchReply
typeof import("../plugin-sdk/channel-message.js").recordChannelMessageReplyDispatch
>[0];
const mocks = vi.hoisted(() => {
@@ -194,8 +194,8 @@ vi.mock("../infra/system-events.js", () => ({
enqueueSystemEvent: mocks.enqueueSystemEvent,
}));
vi.mock("../plugin-sdk/inbound-reply-dispatch.js", () => ({
recordInboundSessionAndDispatchReply: mocks.recordInboundSessionAndDispatchReply,
vi.mock("../plugin-sdk/channel-message.js", () => ({
recordChannelMessageReplyDispatch: mocks.recordInboundSessionAndDispatchReply,
}));
vi.mock("../infra/heartbeat-wake.js", async () => {

View File

@@ -34,8 +34,8 @@ import {
} from "../infra/session-delivery-queue.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { recordChannelMessageReplyDispatch } from "../plugin-sdk/channel-message.js";
import { stringifyRouteThreadId } from "../plugin-sdk/channel-route.js";
import { recordInboundSessionAndDispatchReply } from "../plugin-sdk/inbound-reply-dispatch.js";
import type { OutboundReplyPayload } from "../plugin-sdk/reply-payload.js";
import {
deliveryContextFromSession,
@@ -272,7 +272,7 @@ async function deliverQueuedSessionDelivery(params: {
config: cfg,
});
let dispatchError: unknown;
await recordInboundSessionAndDispatchReply({
await recordChannelMessageReplyDispatch({
cfg,
channel: route.channel,
accountId: route.accountId,

View File

@@ -1,13 +1,56 @@
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../../agents/agent-scope.js";
import { applyPluginAutoEnable } from "../../config/plugin-auto-enable.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { resolveRuntimePluginRegistry } from "../../plugins/loader.js";
import {
getActivePluginChannelRegistry,
getActivePluginChannelRegistryVersion,
} from "../../plugins/runtime.js";
import type { DeliverableMessageChannel } from "../../utils/message-channel.js";
const bootstrapAttempts = new Set<string>();
export function resetOutboundChannelBootstrapStateForTests(): void {
// Runtime channel plugins are loaded during Gateway startup now.
bootstrapAttempts.clear();
}
export function bootstrapOutboundChannelPlugin(params: {
channel: DeliverableMessageChannel;
cfg?: OpenClawConfig;
}): void {
void params;
const cfg = params.cfg;
if (!cfg) {
return;
}
const activeChannelRegistry = getActivePluginChannelRegistry();
const activeHasRequestedChannel = activeChannelRegistry?.channels?.some(
(entry) => entry?.plugin?.id === params.channel,
);
if (activeHasRequestedChannel) {
return;
}
const attemptKey = `${getActivePluginChannelRegistryVersion()}:${params.channel}`;
if (bootstrapAttempts.has(attemptKey)) {
return;
}
bootstrapAttempts.add(attemptKey);
const autoEnabled = applyPluginAutoEnable({ config: cfg });
const defaultAgentId = resolveDefaultAgentId(autoEnabled.config);
const workspaceDir = resolveAgentWorkspaceDir(autoEnabled.config, defaultAgentId);
try {
resolveRuntimePluginRegistry({
config: autoEnabled.config,
activationSourceConfig: cfg,
autoEnabledReasons: autoEnabled.autoEnabledReasons,
workspaceDir,
runtimeOptions: {
allowGatewaySubagentBinding: true,
},
});
} catch {
bootstrapAttempts.delete(attemptKey);
}
}

View File

@@ -128,28 +128,33 @@ describe("outbound channel resolution", () => {
).toBe(plugin);
});
it("does not load registries while resolving outbound plugins", async () => {
it("bootstraps configured channel plugins when the active registry is missing the target", async () => {
const plugin = { id: "alpha" };
getLoadedChannelPluginMock.mockReturnValueOnce(undefined).mockReturnValueOnce(plugin);
const channelResolution = await importChannelResolution("no-bootstrap");
const channelResolution = await importChannelResolution("bootstrap-missing-target");
expect(
channelResolution.resolveOutboundChannelPlugin({
channel: "alpha",
cfg: { channels: {} } as never,
allowBootstrap: true,
}),
).toBe(plugin);
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
getChannelPluginMock.mockReturnValue(undefined);
channelResolution.resolveOutboundChannelPlugin({
channel: "alpha",
cfg: { channels: {} } as never,
});
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
expect(applyPluginAutoEnableMock).toHaveBeenCalledWith({ config: { channels: {} } });
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledWith(
expect.objectContaining({
config: { autoEnabled: true },
activationSourceConfig: { channels: {} },
autoEnabledReasons: {},
workspaceDir: "/tmp/workspace",
runtimeOptions: {
allowGatewaySubagentBinding: true,
},
}),
);
});
it("does not load when the active registry has other channels but not the requested one", async () => {
it("attempts activation when the active registry has other channels but not the requested one", async () => {
getLoadedChannelPluginMock.mockReturnValue(undefined);
getChannelPluginMock.mockReturnValue(undefined);
getActivePluginRegistryMock.mockReturnValue({
@@ -164,47 +169,80 @@ describe("outbound channel resolution", () => {
channelResolution.resolveOutboundChannelPlugin({
channel: "alpha",
cfg: { channels: {} } as never,
allowBootstrap: true,
}),
).toBeUndefined();
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1);
});
it("does not retry registry loads after a missing outbound plugin", async () => {
getChannelPluginMock.mockReturnValue(undefined);
resolveRuntimePluginRegistryMock.mockImplementationOnce(() => {
throw new Error("transient");
});
const channelResolution = await importChannelResolution("bootstrap-retry");
expect(
channelResolution.resolveOutboundChannelPlugin({
channel: "alpha",
cfg: { channels: {} } as never,
allowBootstrap: true,
}),
).toBeUndefined();
channelResolution.resolveOutboundChannelPlugin({
channel: "alpha",
cfg: { channels: {} } as never,
allowBootstrap: true,
});
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1);
});
it("does not load when the pinned channel registry version changes", async () => {
it("allows another activation attempt when the pinned channel registry version changes", async () => {
getChannelPluginMock.mockReturnValue(undefined);
const channelResolution = await importChannelResolution("channel-version-change");
channelResolution.resolveOutboundChannelPlugin({
channel: "alpha",
cfg: { channels: {} } as never,
allowBootstrap: true,
});
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1);
getActivePluginChannelRegistryVersionMock.mockReturnValue(2);
channelResolution.resolveOutboundChannelPlugin({
channel: "alpha",
cfg: { channels: {} } as never,
allowBootstrap: true,
});
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(2);
});
it("resolves message adapters through the activation-aware channel plugin path", async () => {
const message = { send: { text: vi.fn() } };
const plugin = { id: "alpha", message };
getLoadedChannelPluginMock.mockReturnValueOnce(undefined).mockReturnValueOnce(plugin);
const channelResolution = await importChannelResolution("message-adapter-bootstrap");
expect(
channelResolution.resolveOutboundChannelMessageAdapter({
channel: "alpha",
cfg: { channels: {} } as never,
allowBootstrap: true,
}),
).toBe(message);
expect(resolveRuntimePluginRegistryMock).toHaveBeenCalledTimes(1);
});
it("does not bootstrap by default for outbound hot-path resolution", async () => {
const plugin = { id: "alpha" };
getLoadedChannelPluginMock.mockReturnValue(undefined);
getChannelPluginMock.mockReturnValue(plugin);
const channelResolution = await importChannelResolution("no-bootstrap-default");
expect(
channelResolution.resolveOutboundChannelPlugin({
channel: "alpha",
cfg: { channels: {} } as never,
}),
).toBe(plugin);
expect(resolveRuntimePluginRegistryMock).not.toHaveBeenCalled();
});
});

View File

@@ -1,3 +1,4 @@
import type { ChannelMessageAdapterShape } from "../../channels/message/types.js";
import { getChannelPlugin, getLoadedChannelPlugin } from "../../channels/plugins/index.js";
import type { ChannelPlugin } from "../../channels/plugins/types.plugin.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
@@ -52,6 +53,7 @@ function resolveDirectFromActiveRegistry(
export function resolveOutboundChannelPlugin(params: {
channel: string;
cfg?: OpenClawConfig;
allowBootstrap?: boolean;
}): ChannelPlugin | undefined {
const normalized = normalizeDeliverableOutboundChannel(params.channel);
if (!normalized) {
@@ -69,6 +71,18 @@ export function resolveOutboundChannelPlugin(params: {
return directCurrent;
}
if (params.allowBootstrap !== true) {
return resolve();
}
maybeBootstrapChannelPlugin({ channel: normalized, cfg: params.cfg });
return resolveLoaded() ?? resolveDirectFromActiveRegistry(normalized) ?? resolve();
}
export function resolveOutboundChannelMessageAdapter(params: {
channel: string;
cfg?: OpenClawConfig;
allowBootstrap?: boolean;
}): ChannelMessageAdapterShape | undefined {
return resolveOutboundChannelPlugin(params)?.message;
}

View File

@@ -1,3 +1,4 @@
import type { MessageReceipt } from "../../channels/message/types.js";
import type { ChannelId } from "../../channels/plugins/channel-id.types.js";
export type OutboundDeliveryResult = {
@@ -10,6 +11,7 @@ export type OutboundDeliveryResult = {
timestamp?: number;
toJid?: string;
pollId?: string;
receipt?: MessageReceipt;
// Channel docking: stash channel-specific fields here to avoid core type churn.
meta?: Record<string, unknown>;
};

View File

@@ -1,6 +1,7 @@
import path from "node:path";
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { chunkText } from "../../auto-reply/chunk.js";
import { createMessageReceiptFromOutboundResults } from "../../channels/message/receipt.js";
import type { ChannelOutboundAdapter } from "../../channels/plugins/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import * as mediaCapabilityModule from "../../media/read-capability.js";
@@ -41,6 +42,8 @@ const queueMocks = vi.hoisted(() => ({
enqueueDelivery: vi.fn(async () => "mock-queue-id"),
ackDelivery: vi.fn(async () => {}),
failDelivery: vi.fn(async () => {}),
markDeliveryPlatformOutcomeUnknown: vi.fn(async () => {}),
markDeliveryPlatformSendAttemptStarted: vi.fn(async () => {}),
withActiveDeliveryClaim: vi.fn<
(
entryId: string,
@@ -81,6 +84,8 @@ vi.mock("./delivery-queue.js", () => ({
enqueueDelivery: queueMocks.enqueueDelivery,
ackDelivery: queueMocks.ackDelivery,
failDelivery: queueMocks.failDelivery,
markDeliveryPlatformOutcomeUnknown: queueMocks.markDeliveryPlatformOutcomeUnknown,
markDeliveryPlatformSendAttemptStarted: queueMocks.markDeliveryPlatformSendAttemptStarted,
withActiveDeliveryClaim: queueMocks.withActiveDeliveryClaim,
}));
vi.mock("../../logging/subsystem.js", () => ({
@@ -100,6 +105,7 @@ type DeliverModule = typeof import("./deliver.js");
let deliverOutboundPayloads: DeliverModule["deliverOutboundPayloads"];
let normalizeOutboundPayloads: DeliverModule["normalizeOutboundPayloads"];
let resolveOutboundDurableFinalDeliverySupport: DeliverModule["resolveOutboundDurableFinalDeliverySupport"];
const matrixChunkConfig: OpenClawConfig = {
channels: { matrix: { textChunkLimit: 4000 } } as OpenClawConfig["channels"],
@@ -218,7 +224,7 @@ function flushDiagnosticEvents() {
return new Promise<void>((resolve) => setImmediate(resolve));
}
async function runBestEffortPartialFailureDelivery() {
async function runBestEffortPartialFailureDelivery(params?: { onError?: boolean }) {
const sendMatrix = vi
.fn()
.mockRejectedValueOnce(new Error("fail"))
@@ -232,7 +238,7 @@ async function runBestEffortPartialFailureDelivery() {
payloads: [{ text: "a" }, { text: "b" }],
deps: { matrix: sendMatrix },
bestEffort: true,
onError,
...(params?.onError === false ? {} : { onError }),
});
return { sendMatrix, onError, results };
}
@@ -256,7 +262,11 @@ function expectSuccessfulMatrixInternalHookPayload(
describe("deliverOutboundPayloads", () => {
beforeAll(async () => {
({ deliverOutboundPayloads, normalizeOutboundPayloads } = await import("./deliver.js"));
({
deliverOutboundPayloads,
normalizeOutboundPayloads,
resolveOutboundDurableFinalDeliverySupport,
} = await import("./deliver.js"));
});
beforeEach(() => {
@@ -279,6 +289,10 @@ describe("deliverOutboundPayloads", () => {
queueMocks.ackDelivery.mockResolvedValue(undefined);
queueMocks.failDelivery.mockClear();
queueMocks.failDelivery.mockResolvedValue(undefined);
queueMocks.markDeliveryPlatformOutcomeUnknown.mockClear();
queueMocks.markDeliveryPlatformOutcomeUnknown.mockResolvedValue(undefined);
queueMocks.markDeliveryPlatformSendAttemptStarted.mockClear();
queueMocks.markDeliveryPlatformSendAttemptStarted.mockResolvedValue(undefined);
queueMocks.withActiveDeliveryClaim.mockClear();
queueMocks.withActiveDeliveryClaim.mockImplementation(async (_entryId, fn) => ({
status: "claimed",
@@ -293,6 +307,733 @@ describe("deliverOutboundPayloads", () => {
setActivePluginRegistry(emptyRegistry);
});
it("reports unsupported durable final delivery when required capabilities are missing", async () => {
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
sendText: async () => ({ channel: "matrix", messageId: "m1" }),
deliveryCapabilities: {
durableFinal: {
text: true,
},
},
},
}),
},
]),
);
await expect(
resolveOutboundDurableFinalDeliverySupport({
cfg: {},
channel: "matrix",
requirements: {
text: true,
silent: true,
},
}),
).resolves.toEqual({
ok: false,
reason: "capability_mismatch",
capability: "silent",
});
});
it("uses channel message adapter capabilities for durable final support", async () => {
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: {
...createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
sendText: async () => ({ channel: "matrix", messageId: "outbound" }),
deliveryCapabilities: {
durableFinal: {
text: true,
},
},
},
}),
message: {
id: "matrix",
durableFinal: {
capabilities: {
text: true,
silent: true,
},
},
send: {
text: async () => ({
messageId: "message",
receipt: createMessageReceiptFromOutboundResults({
results: [{ channel: "matrix", messageId: "message" }],
kind: "text",
}),
}),
},
},
},
},
]),
);
await expect(
resolveOutboundDurableFinalDeliverySupport({
cfg: {},
channel: "matrix",
requirements: {
text: true,
silent: true,
},
}),
).resolves.toEqual({ ok: true });
});
it("requires a real reconciler for required unknown-send recovery support", async () => {
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: {
...createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
sendText: async () => ({ channel: "matrix", messageId: "outbound" }),
},
}),
message: {
id: "matrix",
durableFinal: {
capabilities: {
text: true,
reconcileUnknownSend: true,
},
},
send: {
text: async () => ({
messageId: "message",
receipt: createMessageReceiptFromOutboundResults({
results: [{ channel: "matrix", messageId: "message" }],
kind: "text",
}),
}),
},
},
},
},
]),
);
await expect(
resolveOutboundDurableFinalDeliverySupport({
cfg: {},
channel: "matrix",
requirements: {
text: true,
reconcileUnknownSend: true,
},
}),
).resolves.toEqual({
ok: false,
reason: "capability_mismatch",
capability: "reconcileUnknownSend",
});
});
it("accepts required unknown-send recovery only when the adapter declares and implements it", async () => {
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: {
...createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
sendText: async () => ({ channel: "matrix", messageId: "outbound" }),
},
}),
message: {
id: "matrix",
durableFinal: {
capabilities: {
text: true,
reconcileUnknownSend: true,
},
reconcileUnknownSend: async () => ({ status: "not_sent" }),
},
send: {
text: async () => ({
messageId: "message",
receipt: createMessageReceiptFromOutboundResults({
results: [{ channel: "matrix", messageId: "message" }],
kind: "text",
}),
}),
},
},
},
},
]),
);
await expect(
resolveOutboundDurableFinalDeliverySupport({
cfg: {},
channel: "matrix",
requirements: {
text: true,
reconcileUnknownSend: true,
},
}),
).resolves.toEqual({ ok: true });
});
it("sends text through the channel message adapter when present", async () => {
const messageSendText = vi.fn(async () => ({
messageId: "message-adapter-1",
receipt: createMessageReceiptFromOutboundResults({
results: [{ channel: "matrix", messageId: "message-adapter-1" }],
kind: "text",
}),
}));
const outboundSendText = vi.fn(async () => ({
channel: "matrix" as const,
messageId: "outbound-1",
}));
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: {
...createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
chunker: chunkText,
sendText: outboundSendText,
},
}),
message: {
id: "matrix",
durableFinal: {
capabilities: {
text: true,
},
},
send: {
text: messageSendText,
},
},
},
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hello" }],
});
expect(messageSendText).toHaveBeenCalledWith(
expect.objectContaining({
to: "!room:example",
text: "hello",
}),
);
expect(outboundSendText).not.toHaveBeenCalled();
expect(results).toMatchObject([
{
channel: "matrix",
messageId: "message-adapter-1",
},
]);
expect(results[0]?.receipt?.platformMessageIds).toEqual(["message-adapter-1"]);
});
it("runs message adapter send lifecycle after durable intent and before platform send", async () => {
const order: string[] = [];
queueMocks.enqueueDelivery.mockImplementationOnce(async () => {
order.push("queue");
return "queue-1";
});
queueMocks.ackDelivery.mockImplementationOnce(async () => {
order.push("ack");
});
queueMocks.markDeliveryPlatformSendAttemptStarted.mockImplementationOnce(async () => {
order.push("mark-started");
});
queueMocks.markDeliveryPlatformOutcomeUnknown.mockImplementationOnce(async () => {
order.push("mark-unknown");
});
const messageSendText = vi.fn(async () => {
order.push("send");
return {
messageId: "message-adapter-1",
receipt: createMessageReceiptFromOutboundResults({
results: [{ channel: "matrix", messageId: "message-adapter-1" }],
kind: "text",
}),
};
});
const beforeSendAttempt = vi.fn(() => {
order.push("before");
return "pending-1";
});
const afterSendSuccess = vi.fn(
(ctx: { attemptToken?: unknown; result: { messageId?: string } }) => {
order.push(`after:${String(ctx.attemptToken)}:${ctx.result.messageId ?? ""}`);
},
);
const afterCommit = vi.fn((ctx: { attemptToken?: unknown; result: { messageId?: string } }) => {
order.push(`commit:${String(ctx.attemptToken)}:${ctx.result.messageId ?? ""}`);
});
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: {
id: "matrix",
message: {
id: "matrix",
durableFinal: {
capabilities: {
text: true,
afterSendSuccess: true,
afterCommit: true,
},
},
send: {
lifecycle: {
beforeSendAttempt,
afterSendSuccess,
afterCommit,
},
text: messageSendText,
},
},
},
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hello" }],
queuePolicy: "required",
});
expect(order).toEqual([
"queue",
"before",
"mark-started",
"send",
"after:pending-1:message-adapter-1",
"mark-unknown",
"ack",
"commit:pending-1:message-adapter-1",
]);
expect(beforeSendAttempt).toHaveBeenCalledWith(
expect.objectContaining({
kind: "text",
to: "!room:example",
text: "hello",
}),
);
expect(afterSendSuccess).toHaveBeenCalledWith(
expect.objectContaining({
kind: "text",
attemptToken: "pending-1",
result: expect.objectContaining({ messageId: "message-adapter-1" }),
}),
);
expect(afterCommit).toHaveBeenCalledWith(
expect.objectContaining({
kind: "text",
attemptToken: "pending-1",
result: expect.objectContaining({ messageId: "message-adapter-1" }),
}),
);
expect(results).toMatchObject([{ channel: "matrix", messageId: "message-adapter-1" }]);
});
it("does not mark queued delivery as unknown when hooks cancel before platform send", async () => {
hookMocks.runner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "message_sending",
);
hookMocks.runner.runMessageSending.mockResolvedValueOnce({
cancel: true,
content: "blocked",
});
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" });
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hello" }],
deps: { matrix: sendMatrix },
queuePolicy: "required",
});
expect(results).toEqual([]);
expect(sendMatrix).not.toHaveBeenCalled();
expect(queueMocks.markDeliveryPlatformSendAttemptStarted).not.toHaveBeenCalled();
expect(queueMocks.markDeliveryPlatformOutcomeUnknown).not.toHaveBeenCalled();
expect(queueMocks.ackDelivery).toHaveBeenCalledWith("mock-queue-id");
});
it("runs message adapter failure cleanup for failed sends with pending attempt tokens", async () => {
const messageSendText = vi.fn(async () => {
throw new Error("native send failed");
});
const afterSendFailure = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: {
id: "matrix",
message: {
id: "matrix",
durableFinal: {
capabilities: {
text: true,
afterSendSuccess: true,
},
},
send: {
lifecycle: {
beforeSendAttempt: () => "pending-2",
afterSendFailure,
},
text: messageSendText,
},
},
},
},
]),
);
await expect(
deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hello" }],
queuePolicy: "required",
}),
).rejects.toThrow("native send failed");
expect(afterSendFailure).toHaveBeenCalledWith(
expect.objectContaining({
kind: "text",
attemptToken: "pending-2",
error: expect.any(Error),
}),
);
expect(queueMocks.failDelivery).toHaveBeenCalledWith(
"mock-queue-id",
expect.stringContaining("native send failed"),
);
});
it("preserves native send errors when failure cleanup throws", async () => {
const messageSendText = vi.fn(async () => {
throw new Error("native send failed");
});
const afterSendFailure = vi.fn(async () => {
throw new Error("cleanup failed");
});
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: {
id: "matrix",
message: {
id: "matrix",
durableFinal: {
capabilities: {
text: true,
afterSendSuccess: true,
},
},
send: {
lifecycle: {
beforeSendAttempt: () => "pending-2",
afterSendFailure,
},
text: messageSendText,
},
},
},
},
]),
);
await expect(
deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hello" }],
queuePolicy: "required",
}),
).rejects.toThrow("native send failed");
expect(afterSendFailure).toHaveBeenCalledWith(
expect.objectContaining({
kind: "text",
attemptToken: "pending-2",
error: expect.any(Error),
}),
);
expect(queueMocks.failDelivery).toHaveBeenCalledWith(
"mock-queue-id",
expect.stringContaining("native send failed"),
);
});
it("preserves successful sends when the success hook throws", async () => {
const afterSendFailure = vi.fn();
const afterCommit = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: {
id: "matrix",
message: {
id: "matrix",
durableFinal: {
capabilities: {
text: true,
afterSendSuccess: true,
afterCommit: true,
},
},
send: {
lifecycle: {
afterSendSuccess: async () => {
throw new Error("success hook failed");
},
afterSendFailure,
afterCommit,
},
text: async () => ({
messageId: "message-adapter-1",
receipt: createMessageReceiptFromOutboundResults({
results: [{ channel: "matrix", messageId: "message-adapter-1" }],
kind: "text",
}),
}),
},
},
},
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hello" }],
queuePolicy: "required",
});
expect(results).toMatchObject([{ channel: "matrix", messageId: "message-adapter-1" }]);
expect(afterSendFailure).not.toHaveBeenCalled();
expect(afterCommit).toHaveBeenCalledWith(
expect.objectContaining({
kind: "text",
result: expect.objectContaining({ messageId: "message-adapter-1" }),
}),
);
expect(queueMocks.ackDelivery).toHaveBeenCalledWith("mock-queue-id");
expect(queueMocks.failDelivery).not.toHaveBeenCalled();
});
it("requires durable queue writes when requested", async () => {
queueMocks.enqueueDelivery.mockRejectedValueOnce(new Error("queue offline"));
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1" });
await expect(
deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hi" }],
deps: { matrix: sendMatrix },
queuePolicy: "required",
}),
).rejects.toThrow("queue offline");
expect(sendMatrix).not.toHaveBeenCalled();
});
it("falls back to direct send when best-effort queue writes fail", async () => {
queueMocks.enqueueDelivery.mockRejectedValueOnce(new Error("queue offline"));
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1" });
await expect(
deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hi" }],
deps: { matrix: sendMatrix },
queuePolicy: "best_effort",
}),
).resolves.toEqual([expect.objectContaining({ messageId: "m1" })]);
expect(sendMatrix).toHaveBeenCalled();
});
it("runs afterCommit hooks after best-effort queue fallback direct sends", async () => {
queueMocks.enqueueDelivery.mockRejectedValueOnce(new Error("queue offline"));
const afterCommit = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: {
id: "matrix",
message: {
id: "matrix",
durableFinal: {
capabilities: {
text: true,
afterCommit: true,
},
},
send: {
lifecycle: {
afterCommit,
},
text: async () => ({
messageId: "message-adapter-1",
receipt: createMessageReceiptFromOutboundResults({
results: [{ channel: "matrix", messageId: "message-adapter-1" }],
kind: "text",
}),
}),
},
},
},
},
]),
);
await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hello" }],
queuePolicy: "best_effort",
});
expect(afterCommit).toHaveBeenCalledWith(
expect.objectContaining({
kind: "text",
result: expect.objectContaining({ messageId: "message-adapter-1" }),
}),
);
expect(queueMocks.ackDelivery).not.toHaveBeenCalled();
});
it("requires the platform-send attempt marker before required durable platform I/O", async () => {
queueMocks.markDeliveryPlatformSendAttemptStarted.mockRejectedValueOnce(
new Error("marker offline"),
);
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1" });
await expect(
deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hi" }],
deps: { matrix: sendMatrix },
queuePolicy: "required",
}),
).rejects.toThrow("marker offline");
expect(queueMocks.markDeliveryPlatformSendAttemptStarted).toHaveBeenCalledWith("mock-queue-id");
expect(sendMatrix).not.toHaveBeenCalled();
expect(queueMocks.failDelivery).toHaveBeenCalledWith(
"mock-queue-id",
expect.stringContaining("marker offline"),
);
expect(queueMocks.ackDelivery).not.toHaveBeenCalled();
});
it("fails required delivery when the post-send unknown marker cannot be written", async () => {
queueMocks.markDeliveryPlatformOutcomeUnknown.mockRejectedValueOnce(
new Error("unknown marker offline"),
);
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1" });
await expect(
deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hi" }],
deps: { matrix: sendMatrix },
queuePolicy: "required",
}),
).rejects.toThrow("unknown marker offline");
expect(sendMatrix).toHaveBeenCalled();
expect(queueMocks.ackDelivery).not.toHaveBeenCalled();
expect(queueMocks.failDelivery).not.toHaveBeenCalled();
});
it("fails required delivery when queue ack fails after platform send", async () => {
queueMocks.ackDelivery.mockRejectedValueOnce(new Error("ack offline"));
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1" });
await expect(
deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hi" }],
deps: { matrix: sendMatrix },
queuePolicy: "required",
}),
).rejects.toThrow("ack offline");
expect(sendMatrix).toHaveBeenCalled();
expect(queueMocks.markDeliveryPlatformOutcomeUnknown).toHaveBeenCalledWith("mock-queue-id");
expect(queueMocks.failDelivery).not.toHaveBeenCalled();
});
it("emits bounded delivery diagnostics for successful outbound sends", async () => {
const events: DiagnosticEventPayload[] = [];
const unsubscribe = onInternalDiagnosticEvent((event) => events.push(event));
@@ -1551,6 +2292,16 @@ describe("deliverOutboundPayloads", () => {
);
});
it("calls failDelivery instead of ackDelivery on bestEffort partial failure without onError", async () => {
await runBestEffortPartialFailureDelivery({ onError: false });
expect(queueMocks.ackDelivery).not.toHaveBeenCalled();
expect(queueMocks.failDelivery).toHaveBeenCalledWith(
"mock-queue-id",
"partial delivery failure (bestEffort)",
);
});
it("writes raw payloads to the queue before normalization", async () => {
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m-raw", roomId: "!room:example" });
const rawPayloads: DeliverOutboundPayload[] = [
@@ -1577,6 +2328,51 @@ describe("deliverOutboundPayloads", () => {
{ text: "caption\nMEDIA:https://x.test/a.png" },
{ text: "NO_REPLY", mediaUrl: " https://x.test/b.png " },
],
renderedBatchPlan: expect.objectContaining({
payloadCount: 4,
textCount: 4,
mediaCount: 1,
items: expect.arrayContaining([
expect.objectContaining({
index: 3,
kinds: ["text", "media"],
text: "NO_REPLY",
mediaUrls: ["https://x.test/b.png"],
}),
]),
}),
}),
);
});
it("persists rendered batch plans with queued deliveries", async () => {
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m-plan", roomId: "!room:example" });
const renderedBatchPlan = {
payloadCount: 2,
textCount: 1,
mediaCount: 1,
voiceCount: 0,
presentationCount: 0,
interactiveCount: 0,
channelDataCount: 0,
items: [
{ index: 0, kinds: ["text"] as const, text: "hello", mediaUrls: [] },
{ index: 1, kinds: ["media"] as const, mediaUrls: ["file:///tmp/a.png"] },
],
};
await deliverOutboundPayloads({
cfg: matrixChunkConfig,
channel: "matrix",
to: "!room:example",
payloads: [{ text: "hello" }, { mediaUrl: "file:///tmp/a.png" }],
deps: { matrix: sendMatrix },
renderedBatchPlan,
});
expect(queueMocks.enqueueDelivery).toHaveBeenCalledWith(
expect.objectContaining({
renderedBatchPlan,
}),
);
});

View File

@@ -1,7 +1,16 @@
import { resolveChunkMode, resolveTextChunkLimit } from "../../auto-reply/chunk.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { createRenderedMessageBatchPlan } from "../../channels/message/rendered-batch.js";
import type {
ChannelMessageAdapterShape,
ChannelMessageSendAttemptContext,
ChannelMessageSendAttemptKind,
ChannelMessageSendLifecycleAdapter,
ChannelMessageSendResult,
} from "../../channels/message/types.js";
import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load.js";
import type {
ChannelDeliveryCapabilities,
ChannelOutboundAdapter,
ChannelOutboundContext,
ChannelOutboundPayloadContext,
@@ -32,11 +41,20 @@ import { diagnosticErrorCategory } from "../diagnostic-error-metadata.js";
import { emitDiagnosticEvent, type DiagnosticMessageDeliveryKind } from "../diagnostic-events.js";
import { formatErrorMessage } from "../errors.js";
import { throwIfAborted } from "./abort.js";
import { resolveOutboundChannelMessageAdapter } from "./channel-resolution.js";
import type { OutboundDeliveryResult } from "./deliver-types.js";
import {
attachOutboundDeliveryCommitHook,
runOutboundDeliveryCommitHooks,
type OutboundDeliveryCommitHook,
} from "./delivery-commit-hooks.js";
import {
ackDelivery,
enqueueDelivery,
failDelivery,
markDeliveryPlatformOutcomeUnknown,
markDeliveryPlatformSendAttemptStarted,
type QueuedRenderedMessageBatchPlan,
withActiveDeliveryClaim,
} from "./delivery-queue.js";
import type { OutboundDeliveryFormattingOptions } from "./formatting.js";
@@ -65,6 +83,32 @@ export type { NormalizedOutboundPayload } from "./payloads.js";
export { normalizeOutboundPayloads } from "./payloads.js";
export { resolveOutboundSendDep, type OutboundSendDeps } from "./send-deps.js";
export type OutboundDeliveryQueuePolicy = "required" | "best_effort";
export type OutboundDeliveryIntent = {
id: string;
channel: Exclude<OutboundChannel, "none">;
to: string;
accountId?: string;
queuePolicy: OutboundDeliveryQueuePolicy;
};
export type DurableFinalDeliveryRequirement = keyof NonNullable<
ChannelDeliveryCapabilities["durableFinal"]
>;
export type DurableFinalDeliveryRequirements = Partial<
Record<DurableFinalDeliveryRequirement, boolean>
>;
export type OutboundDurableDeliverySupport =
| { ok: true }
| {
ok: false;
reason: "missing_outbound_handler" | "capability_mismatch";
capability?: DurableFinalDeliveryRequirement;
};
const log = createSubsystemLogger("outbound/deliver");
let transcriptRuntimePromise:
| Promise<typeof import("../../config/sessions/transcript.runtime.js")>
@@ -130,6 +174,8 @@ type ChannelHandler = {
) => Promise<OutboundDeliveryResult>;
};
type ChannelMessageLifecycleContext = ChannelMessageSendAttemptContext;
type ChannelHandlerParams = {
cfg: OpenClawConfig;
channel: Exclude<OutboundChannel, "none">;
@@ -146,6 +192,7 @@ type ChannelHandlerParams = {
silent?: boolean;
mediaAccess?: OutboundMediaAccess;
gatewayClientScopes?: readonly string[];
onPlatformSendStart?: () => Promise<void>;
};
// Channel docking: outbound delivery delegates to plugin.outbound adapters.
@@ -153,21 +200,26 @@ async function resolveChannelOutboundDirectiveOptions(params: {
cfg: OpenClawConfig;
channel: Exclude<OutboundChannel, "none">;
}): Promise<{ extractMarkdownImages?: boolean }> {
let outbound = await loadChannelOutboundAdapter(params.channel);
if (!outbound) {
const { bootstrapOutboundChannelPlugin } = await loadChannelBootstrapRuntime();
bootstrapOutboundChannelPlugin({
channel: params.channel,
cfg: params.cfg,
});
outbound = await loadChannelOutboundAdapter(params.channel);
}
const outbound = await loadBootstrappedOutboundAdapter(params);
return {
extractMarkdownImages: outbound?.extractMarkdownImages === true ? true : undefined,
};
}
async function createChannelHandler(params: ChannelHandlerParams): Promise<ChannelHandler> {
const outbound = await loadBootstrappedOutboundAdapter(params);
const message = resolveOutboundChannelMessageAdapter(params);
const handler = createPluginHandler({ ...params, outbound, message });
if (!handler) {
throw new Error(`Outbound not configured for channel: ${params.channel}`);
}
return handler;
}
async function loadBootstrappedOutboundAdapter(params: {
cfg: OpenClawConfig;
channel: Exclude<OutboundChannel, "none">;
}): Promise<ChannelOutboundAdapter | undefined> {
let outbound = await loadChannelOutboundAdapter(params.channel);
if (!outbound) {
const { bootstrapOutboundChannelPlugin } = await loadChannelBootstrapRuntime();
@@ -177,25 +229,112 @@ async function createChannelHandler(params: ChannelHandlerParams): Promise<Chann
});
outbound = await loadChannelOutboundAdapter(params.channel);
}
const handler = createPluginHandler({ ...params, outbound });
if (!handler) {
throw new Error(`Outbound not configured for channel: ${params.channel}`);
return outbound;
}
async function runChannelMessageSendWithLifecycle<
TResult extends ChannelMessageSendResult,
>(params: {
lifecycle?: ChannelMessageSendLifecycleAdapter;
ctx: ChannelMessageLifecycleContext;
send: () => Promise<TResult>;
}): Promise<{ result: TResult; afterCommit?: OutboundDeliveryCommitHook }> {
if (!params.lifecycle) {
return { result: await params.send() };
}
return handler;
let attemptToken: unknown;
try {
attemptToken = await params.lifecycle.beforeSendAttempt?.(params.ctx);
const result = await params.send();
const successCtx = {
...params.ctx,
result,
...(attemptToken !== undefined ? { attemptToken } : {}),
};
try {
await params.lifecycle.afterSendSuccess?.(successCtx);
} catch (successHookError: unknown) {
log.warn(
`channel message send success hook failed after platform send; preserving send result: ${formatErrorMessage(successHookError)}`,
);
}
return {
result,
...(params.lifecycle.afterCommit
? {
afterCommit: async () => {
await params.lifecycle?.afterCommit?.(successCtx);
},
}
: {}),
};
} catch (error: unknown) {
try {
await params.lifecycle.afterSendFailure?.({
...params.ctx,
error,
...(attemptToken !== undefined ? { attemptToken } : {}),
});
} catch (cleanupError: unknown) {
log.warn(
`channel message send failure cleanup failed; preserving original send error: ${formatErrorMessage(cleanupError)}`,
);
}
throw error;
}
}
export async function resolveOutboundDurableFinalDeliverySupport(params: {
cfg: OpenClawConfig;
channel: Exclude<OutboundChannel, "none">;
requirements?: DurableFinalDeliveryRequirements;
}): Promise<OutboundDurableDeliverySupport> {
const outbound = await loadBootstrappedOutboundAdapter(params);
const message = resolveOutboundChannelMessageAdapter(params);
if (!message?.send?.text && !outbound?.sendText) {
return { ok: false, reason: "missing_outbound_handler" };
}
const messageDurableFinal = message?.durableFinal;
const durableFinal =
messageDurableFinal?.capabilities ?? outbound?.deliveryCapabilities?.durableFinal;
for (const [capability, required] of Object.entries(params.requirements ?? {}) as Array<
[DurableFinalDeliveryRequirement, boolean | undefined]
>) {
if (required === true && durableFinal?.[capability] !== true) {
return { ok: false, reason: "capability_mismatch", capability };
}
if (
required === true &&
capability === "reconcileUnknownSend" &&
typeof messageDurableFinal?.reconcileUnknownSend !== "function"
) {
return { ok: false, reason: "capability_mismatch", capability };
}
}
return { ok: true };
}
function createPluginHandler(
params: ChannelHandlerParams & { outbound?: ChannelOutboundAdapter },
params: ChannelHandlerParams & {
outbound?: ChannelOutboundAdapter;
message?: ChannelMessageAdapterShape;
},
): ChannelHandler | null {
const outbound = params.outbound;
if (!outbound?.sendText) {
const messageText = params.message?.send?.text;
const messageMedia = params.message?.send?.media;
const messagePayload = params.message?.send?.payload;
const messageLifecycle = params.message?.send?.lifecycle;
if (!messageText && !outbound?.sendText) {
return null;
}
const baseCtx = createChannelOutboundContextBase(params);
const sendText = outbound.sendText;
const sendMedia = outbound.sendMedia;
const chunker = outbound.chunker ?? null;
const chunkerMode = outbound.chunkerMode;
const sendText = outbound?.sendText;
const sendMedia = outbound?.sendMedia;
const chunker = outbound?.chunker ?? null;
const chunkerMode = outbound?.chunkerMode;
const resolveCtx = (overrides?: {
replyToId?: string | null;
replyToIdSource?: "explicit" | "implicit";
@@ -222,16 +361,16 @@ function createPluginHandler(
return {
chunker,
chunkerMode,
textChunkLimit: outbound.textChunkLimit,
supportsMedia: Boolean(sendMedia),
sanitizeText: outbound.sanitizeText
textChunkLimit: outbound?.textChunkLimit,
supportsMedia: Boolean(messageMedia ?? sendMedia),
sanitizeText: outbound?.sanitizeText
? (payload) => outbound.sanitizeText!({ text: payload.text ?? "", payload })
: undefined,
normalizePayload: outbound.normalizePayload
normalizePayload: outbound?.normalizePayload
? (payload) => outbound.normalizePayload!({ payload })
: undefined,
sendTextOnlyErrorPayloads: outbound.sendTextOnlyErrorPayloads === true,
renderPresentation: outbound.renderPresentation
sendTextOnlyErrorPayloads: outbound?.sendTextOnlyErrorPayloads === true,
renderPresentation: outbound?.renderPresentation
? async (payload) => {
const presentation = normalizeMessagePresentation(payload.presentation);
if (!presentation) {
@@ -250,7 +389,7 @@ function createPluginHandler(
return await outbound.renderPresentation!({ payload, presentation, ctx });
}
: undefined,
pinDeliveredMessage: outbound.pinDeliveredMessage
pinDeliveredMessage: outbound?.pinDeliveredMessage
? async ({ target, messageId, pin }) =>
outbound.pinDeliveredMessage!({
cfg: params.cfg,
@@ -259,7 +398,7 @@ function createPluginHandler(
pin,
})
: undefined,
afterDeliverPayload: outbound.afterDeliverPayload
afterDeliverPayload: outbound?.afterDeliverPayload
? async ({ target, payload, results }) =>
outbound.afterDeliverPayload!({
cfg: params.cfg,
@@ -268,10 +407,10 @@ function createPluginHandler(
results,
})
: undefined,
shouldSkipPlainTextSanitization: outbound.shouldSkipPlainTextSanitization
shouldSkipPlainTextSanitization: outbound?.shouldSkipPlainTextSanitization
? (payload) => outbound.shouldSkipPlainTextSanitization!({ payload })
: undefined,
resolveEffectiveTextChunkLimit: outbound.resolveEffectiveTextChunkLimit
resolveEffectiveTextChunkLimit: outbound?.resolveEffectiveTextChunkLimit
? (fallbackLimit) =>
outbound.resolveEffectiveTextChunkLimit!({
cfg: params.cfg,
@@ -279,52 +418,125 @@ function createPluginHandler(
fallbackLimit,
})
: undefined,
sendPayload: outbound.sendPayload
? async (payload, overrides) =>
outbound.sendPayload!({
...resolveCtx(overrides),
text: payload.text ?? "",
mediaUrl: payload.mediaUrl,
payload,
})
: undefined,
sendFormattedText: outbound.sendFormattedText
? async (text, overrides) =>
outbound.sendFormattedText!({
sendPayload:
messagePayload || outbound?.sendPayload
? async (payload, overrides) => {
const payloadCtx = {
...resolveCtx(overrides),
kind: "payload" as const satisfies ChannelMessageSendAttemptKind,
text: payload.text ?? "",
mediaUrl: payload.mediaUrl,
payload,
};
if (messagePayload) {
const sent = await runChannelMessageSendWithLifecycle({
lifecycle: messageLifecycle,
ctx: payloadCtx,
send: async () => {
await params.onPlatformSendStart?.();
return await messagePayload(payloadCtx);
},
});
return attachOutboundDeliveryCommitHook(
normalizeChannelMessageSendResult(params.channel, sent.result),
sent.afterCommit,
);
}
await params.onPlatformSendStart?.();
return outbound!.sendPayload!(payloadCtx);
}
: undefined,
sendFormattedText: outbound?.sendFormattedText
? async (text, overrides) => {
await params.onPlatformSendStart?.();
return await outbound.sendFormattedText!({
...resolveCtx(overrides),
text,
})
});
}
: undefined,
sendFormattedMedia: outbound.sendFormattedMedia
? async (caption, mediaUrl, overrides) =>
outbound.sendFormattedMedia!({
sendFormattedMedia: outbound?.sendFormattedMedia
? async (caption, mediaUrl, overrides) => {
await params.onPlatformSendStart?.();
return await outbound.sendFormattedMedia!({
...resolveCtx(overrides),
text: caption,
mediaUrl,
})
});
}
: undefined,
sendText: async (text, overrides) =>
sendText({
sendText: async (text, overrides) => {
const textCtx = {
...resolveCtx(overrides),
kind: "text" as const satisfies ChannelMessageSendAttemptKind,
text,
}),
};
if (messageText) {
const sent = await runChannelMessageSendWithLifecycle({
lifecycle: messageLifecycle,
ctx: textCtx,
send: async () => {
await params.onPlatformSendStart?.();
return await messageText(textCtx);
},
});
return attachOutboundDeliveryCommitHook(
normalizeChannelMessageSendResult(params.channel, sent.result),
sent.afterCommit,
);
}
await params.onPlatformSendStart?.();
return sendText!(textCtx);
},
buildTargetRef,
sendMedia: async (caption, mediaUrl, overrides) => {
if (sendMedia) {
return sendMedia({
...resolveCtx(overrides),
text: caption,
mediaUrl,
});
}
return sendText({
const mediaCtx = {
...resolveCtx(overrides),
kind: "media" as const satisfies ChannelMessageSendAttemptKind,
text: caption,
});
mediaUrl,
};
if (messageMedia) {
const sent = await runChannelMessageSendWithLifecycle({
lifecycle: messageLifecycle,
ctx: mediaCtx,
send: async () => {
await params.onPlatformSendStart?.();
return await messageMedia(mediaCtx);
},
});
return attachOutboundDeliveryCommitHook(
normalizeChannelMessageSendResult(params.channel, sent.result),
sent.afterCommit,
);
}
if (sendMedia) {
await params.onPlatformSendStart?.();
return sendMedia(mediaCtx);
}
await params.onPlatformSendStart?.();
return sendText!(mediaCtx);
},
};
}
function normalizeChannelMessageSendResult(
channel: Exclude<OutboundChannel, "none">,
result: ChannelMessageSendResult,
): OutboundDeliveryResult {
const source = result as ChannelMessageSendResult & Partial<OutboundDeliveryResult>;
return {
...source,
channel,
messageId:
source.messageId ??
source.receipt.primaryPlatformMessageId ??
source.receipt.platformMessageIds[0] ??
"",
receipt: source.receipt,
};
}
function createChannelOutboundContextBase(
params: ChannelHandlerParams,
): Omit<ChannelOutboundContext, "text" | "mediaUrl"> {
@@ -350,6 +562,40 @@ function createChannelOutboundContextBase(
const isAbortError = (err: unknown): boolean => err instanceof Error && err.name === "AbortError";
async function markQueuedPlatformSendAttemptStarted(params: {
queueId: string;
queuePolicy: OutboundDeliveryQueuePolicy;
}): Promise<boolean> {
try {
await markDeliveryPlatformSendAttemptStarted(params.queueId);
return true;
} catch (err: unknown) {
if (params.queuePolicy === "required") {
throw err;
}
log.warn(
`failed to mark queued delivery ${params.queueId} as platform-send-attempt-started; continuing best-effort delivery: ${formatErrorMessage(err)}`,
);
return false;
}
}
async function markQueuedPlatformOutcomeUnknown(params: {
queueId: string;
queuePolicy: OutboundDeliveryQueuePolicy;
}): Promise<void> {
try {
await markDeliveryPlatformOutcomeUnknown(params.queueId);
} catch (err: unknown) {
if (params.queuePolicy === "required") {
throw err;
}
log.warn(
`failed to mark queued delivery ${params.queueId} as platform-outcome-unknown; continuing best-effort delivery: ${formatErrorMessage(err)}`,
);
}
}
type DeliverOutboundPayloadsCoreParams = {
cfg: OpenClawConfig;
channel: Exclude<OutboundChannel, "none">;
@@ -376,6 +622,10 @@ type DeliverOutboundPayloadsCoreParams = {
gatewayClientScopes?: readonly string[];
};
type DeliverOutboundPayloadsCoreRuntimeParams = DeliverOutboundPayloadsCoreParams & {
onPlatformSendStart?: () => Promise<void>;
};
function collectPayloadMediaSources(plan: readonly OutboundPayloadPlan[]): string[] {
return plan.flatMap((entry) => entry.parts.mediaUrls);
}
@@ -383,6 +633,11 @@ function collectPayloadMediaSources(plan: readonly OutboundPayloadPlan[]): strin
export type DeliverOutboundPayloadsParams = DeliverOutboundPayloadsCoreParams & {
/** @internal Skip write-ahead queue (used by crash-recovery to avoid re-enqueueing). */
skipQueue?: boolean;
/** @internal Let recovery run commit hooks after it has acked the recovered queue entry. */
deferCommitHooks?: boolean;
queuePolicy?: OutboundDeliveryQueuePolicy;
renderedBatchPlan?: QueuedRenderedMessageBatchPlan;
onDeliveryIntent?: (intent: OutboundDeliveryIntent) => void;
};
type MessageSentEvent = {
@@ -830,6 +1085,9 @@ export async function deliverOutboundPayloads(
params: DeliverOutboundPayloadsParams,
): Promise<OutboundDeliveryResult[]> {
const { channel, to, payloads } = params;
const queuePolicy = params.queuePolicy ?? "best_effort";
const renderedBatchPlan =
params.renderedBatchPlan ?? createRenderedMessageBatchPlan(params.payloads);
// Write-ahead delivery queue: persist before sending, remove after success.
const queueId = params.skipQueue
@@ -839,10 +1097,12 @@ export async function deliverOutboundPayloads(
to,
accountId: params.accountId,
payloads,
renderedBatchPlan,
threadId: params.threadId,
replyToId: params.replyToId,
replyToMode: params.replyToMode,
formatting: params.formatting,
identity: params.identity,
bestEffort: params.bestEffort,
gifPlayback: params.gifPlayback,
forceDocument: params.forceDocument,
@@ -850,7 +1110,22 @@ export async function deliverOutboundPayloads(
mirror: params.mirror,
session: params.session,
gatewayClientScopes: params.gatewayClientScopes,
}).catch(() => null); // Best-effort — don't block delivery if queue write fails.
}).catch((err: unknown) => {
if (queuePolicy === "required") {
throw err;
}
return null;
}); // Best-effort delivery falls back to direct send if the queue write fails.
if (queueId) {
params.onDeliveryIntent?.({
id: queueId,
channel,
to,
...(params.accountId ? { accountId: params.accountId } : {}),
queuePolicy,
});
}
if (!queueId) {
return await deliverOutboundPayloadsWithQueueCleanup(params, null);
@@ -876,23 +1151,65 @@ async function deliverOutboundPayloadsWithQueueCleanup(
// without throwing — so the outer try/catch never fires. We track whether any
// payload failed so we can call failDelivery instead of ackDelivery.
let hadPartialFailure = false;
const wrappedParams = params.onError
? {
...params,
onError: (err: unknown, payload: NormalizedOutboundPayload) => {
hadPartialFailure = true;
params.onError!(err, payload);
},
}
: params;
const wrappedParams = {
...params,
onError: (err: unknown, payload: NormalizedOutboundPayload) => {
hadPartialFailure = true;
params.onError?.(err, payload);
},
};
const queuePolicy = params.queuePolicy ?? "best_effort";
let platformResultsReturned = false;
try {
const results = await deliverOutboundPayloadsCore(wrappedParams);
let platformSendStarted = false;
const results = await deliverOutboundPayloadsCore({
...wrappedParams,
...(queueId
? {
onPlatformSendStart: async () => {
if (platformSendStarted) {
return;
}
platformSendStarted = await markQueuedPlatformSendAttemptStarted({
queueId,
queuePolicy,
});
},
}
: {}),
});
platformResultsReturned = true;
if (!queueId) {
if (!params.deferCommitHooks) {
await runOutboundDeliveryCommitHooks(results);
}
return results;
}
if (queueId) {
if (hadPartialFailure) {
await failDelivery(queueId, "partial delivery failure (bestEffort)").catch(() => {});
} else {
await ackDelivery(queueId).catch(() => {}); // Best-effort cleanup.
if (platformSendStarted) {
await markQueuedPlatformOutcomeUnknown({
queueId,
queuePolicy,
});
}
const acked = await ackDelivery(queueId)
.then(() => true)
.catch((err: unknown) => {
if (queuePolicy === "required") {
throw err;
}
log.warn(
`failed to ack queued delivery ${queueId}; continuing best-effort delivery: ${formatErrorMessage(err)}`,
);
return false;
});
if (acked) {
await runOutboundDeliveryCommitHooks(results);
}
}
}
return results;
@@ -900,7 +1217,7 @@ async function deliverOutboundPayloadsWithQueueCleanup(
if (queueId) {
if (isAbortError(err)) {
await ackDelivery(queueId).catch(() => {});
} else {
} else if (!platformResultsReturned) {
await failDelivery(queueId, formatErrorMessage(err)).catch(() => {});
}
}
@@ -910,7 +1227,7 @@ async function deliverOutboundPayloadsWithQueueCleanup(
/** Core delivery logic (extracted for queue wrapper). */
async function deliverOutboundPayloadsCore(
params: DeliverOutboundPayloadsCoreParams,
params: DeliverOutboundPayloadsCoreRuntimeParams,
): Promise<OutboundDeliveryResult[]> {
const { cfg, channel, to, payloads } = params;
const directiveOptions = await resolveChannelOutboundDirectiveOptions({ cfg, channel });
@@ -958,6 +1275,7 @@ async function deliverOutboundPayloadsCore(
silent: params.silent,
mediaAccess,
gatewayClientScopes: params.gatewayClientScopes,
...(params.onPlatformSendStart ? { onPlatformSendStart: params.onPlatformSendStart } : {}),
});
const configuredTextLimit = handler.chunker
? resolveTextChunkLimit(cfg, channel, accountId, {

View File

@@ -0,0 +1,46 @@
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { formatErrorMessage } from "../errors.js";
import type { OutboundDeliveryResult } from "./deliver-types.js";
export type OutboundDeliveryCommitHook = () => Promise<void>;
const log = createSubsystemLogger("outbound/deliver");
const outboundDeliveryCommitHooks = new WeakMap<
OutboundDeliveryResult,
OutboundDeliveryCommitHook[]
>();
export function attachOutboundDeliveryCommitHook<T extends OutboundDeliveryResult>(
result: T,
hook?: OutboundDeliveryCommitHook,
): T {
if (!hook) {
return result;
}
const hooks = outboundDeliveryCommitHooks.get(result) ?? [];
hooks.push(hook);
outboundDeliveryCommitHooks.set(result, hooks);
return result;
}
export async function runOutboundDeliveryCommitHooks(
results: readonly OutboundDeliveryResult[],
): Promise<void> {
for (const result of results) {
for (const hook of outboundDeliveryCommitHooks.get(result) ?? []) {
try {
await hook();
} catch (err) {
log.warn("Plugin message adapter after-commit hook failed.", {
channel: result.channel,
messageId: result.messageId,
error: formatErrorMessage(err),
});
}
}
}
}
export function isOutboundDeliveryResultArray(value: unknown): value is OutboundDeliveryResult[] {
return Array.isArray(value);
}

View File

@@ -1,5 +1,15 @@
import type {
ChannelMessageSendCommitContext,
ChannelMessageUnknownSendReconciliationResult,
} from "../../channels/message/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { formatErrorMessage } from "../errors.js";
import { resolveOutboundChannelMessageAdapter } from "./channel-resolution.js";
import type { OutboundDeliveryResult } from "./deliver-types.js";
import {
isOutboundDeliveryResultArray,
runOutboundDeliveryCommitHooks,
} from "./delivery-commit-hooks.js";
import {
ackDelivery,
failDelivery,
@@ -22,6 +32,7 @@ export type DeliverFn = (
cfg: OpenClawConfig;
} & QueuedDeliveryPayload & {
skipQueue?: boolean;
deferCommitHooks?: boolean;
},
) => Promise<unknown>;
@@ -116,10 +127,12 @@ function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig)
to: entry.to,
accountId: entry.accountId,
payloads: entry.payloads,
renderedBatchPlan: entry.renderedBatchPlan,
threadId: entry.threadId,
replyToId: entry.replyToId,
replyToMode: entry.replyToMode,
formatting: entry.formatting,
identity: entry.identity,
bestEffort: entry.bestEffort,
gifPlayback: entry.gifPlayback,
forceDocument: entry.forceDocument,
@@ -128,9 +141,157 @@ function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig)
session: entry.session,
gatewayClientScopes: entry.gatewayClientScopes,
skipQueue: true, // Prevent re-enqueueing during recovery.
deferCommitHooks: true,
} satisfies Parameters<DeliverFn>[0];
}
async function reconcileUnknownQueuedDelivery(opts: {
entry: QueuedDelivery;
cfg: OpenClawConfig;
log: RecoveryLogger;
}): Promise<ChannelMessageUnknownSendReconciliationResult | null> {
const adapter = resolveOutboundChannelMessageAdapter({
channel: opts.entry.channel,
cfg: opts.cfg,
allowBootstrap: true,
});
if (adapter?.durableFinal?.capabilities?.reconcileUnknownSend !== true) {
return null;
}
const reconcileUnknownSend = adapter?.durableFinal?.reconcileUnknownSend;
if (!reconcileUnknownSend) {
return null;
}
const { entry } = opts;
try {
return await reconcileUnknownSend({
cfg: opts.cfg,
queueId: entry.id,
channel: entry.channel,
to: entry.to,
...(entry.accountId !== undefined ? { accountId: entry.accountId } : {}),
enqueuedAt: entry.enqueuedAt,
retryCount: entry.retryCount,
...(entry.platformSendStartedAt !== undefined
? { platformSendStartedAt: entry.platformSendStartedAt }
: {}),
payloads: entry.payloads,
...(entry.renderedBatchPlan ? { renderedBatchPlan: entry.renderedBatchPlan } : {}),
...(entry.replyToId !== undefined ? { replyToId: entry.replyToId } : {}),
...(entry.replyToMode !== undefined ? { replyToMode: entry.replyToMode } : {}),
...(entry.threadId !== undefined ? { threadId: entry.threadId } : {}),
...(entry.silent !== undefined ? { silent: entry.silent } : {}),
});
} catch (err) {
const error = formatErrorMessage(err);
opts.log.warn(`Delivery entry ${opts.entry.id} unknown-send reconciliation failed: ${error}`);
return { status: "unresolved", error, retryable: true };
}
}
function buildReconciledSentResult(
entry: QueuedDelivery,
reconciliation: Extract<ChannelMessageUnknownSendReconciliationResult, { status: "sent" }>,
): OutboundDeliveryResult {
return {
channel: entry.channel,
messageId:
reconciliation.messageId ??
reconciliation.receipt.primaryPlatformMessageId ??
reconciliation.receipt.platformMessageIds[0] ??
"",
receipt: reconciliation.receipt,
};
}
function buildReconciledCommitContext(params: {
entry: QueuedDelivery;
cfg: OpenClawConfig;
result: OutboundDeliveryResult;
}): ChannelMessageSendCommitContext {
const payload = params.entry.payloads[0] ?? {};
const result = {
messageId: params.result.messageId,
receipt: params.result.receipt ?? {
platformMessageIds: [params.result.messageId].filter(Boolean),
parts: [],
sentAt: Date.now(),
},
};
const base = {
cfg: params.cfg,
to: params.entry.to,
accountId: params.entry.accountId,
replyToId: params.entry.replyToId,
replyToMode: params.entry.replyToMode,
threadId: params.entry.threadId,
silent: params.entry.silent,
result,
};
if (
payload.presentation !== undefined ||
payload.delivery !== undefined ||
payload.interactive !== undefined ||
(payload.channelData !== undefined && Object.keys(payload.channelData).length > 0)
) {
return {
...base,
kind: "payload",
text: payload.text ?? "",
mediaUrl: payload.mediaUrl,
payload,
};
}
const mediaUrl = payload.mediaUrl ?? payload.mediaUrls?.find((url) => url);
if (mediaUrl) {
return {
...base,
kind: "media",
text: payload.text ?? "",
mediaUrl,
audioAsVoice: payload.audioAsVoice,
gifPlayback: params.entry.gifPlayback,
forceDocument: params.entry.forceDocument,
};
}
return {
...base,
kind: "text",
text: payload.text ?? "",
};
}
async function runReconciledSentCommitHooks(params: {
entry: QueuedDelivery;
cfg: OpenClawConfig;
reconciliation: Extract<ChannelMessageUnknownSendReconciliationResult, { status: "sent" }>;
log: RecoveryLogger;
}): Promise<void> {
const adapter = resolveOutboundChannelMessageAdapter({
channel: params.entry.channel,
cfg: params.cfg,
allowBootstrap: true,
});
const afterCommit = adapter?.send?.lifecycle?.afterCommit;
if (!afterCommit) {
return;
}
const result = buildReconciledSentResult(params.entry, params.reconciliation);
try {
await afterCommit(
buildReconciledCommitContext({
entry: params.entry,
cfg: params.cfg,
result,
}),
);
} catch (err) {
params.log.warn(
`Delivery entry ${params.entry.id} reconciled sent afterCommit hook failed: ${formatErrorMessage(err)}`,
);
}
}
async function moveEntryToFailedWithLogging(
entryId: string,
log: RecoveryLogger,
@@ -196,14 +357,90 @@ async function drainQueuedEntry(opts: {
entry: QueuedDelivery;
cfg: OpenClawConfig;
deliver: DeliverFn;
log: RecoveryLogger;
stateDir?: string;
onRecovered?: (entry: QueuedDelivery) => void;
onFailed?: (entry: QueuedDelivery, errMsg: string) => void;
}): Promise<"recovered" | "failed" | "moved-to-failed" | "already-gone"> {
const { entry } = opts;
if (
entry.recoveryState === "send_attempt_started" ||
entry.recoveryState === "unknown_after_send"
) {
const reconciliation = await reconcileUnknownQueuedDelivery({
entry,
cfg: opts.cfg,
log: opts.log,
});
if (reconciliation?.status === "sent") {
try {
await ackDelivery(entry.id, opts.stateDir);
await runReconciledSentCommitHooks({
entry,
cfg: opts.cfg,
reconciliation,
log: opts.log,
});
opts.onRecovered?.(entry);
opts.log.info(`Delivery entry ${entry.id} reconciled unknown_after_send as already sent`);
return "recovered";
} catch (ackErr) {
if (getErrnoCode(ackErr) === "ENOENT") {
return "already-gone";
}
const errMsg = `failed to ack reconciled sent delivery: ${formatErrorMessage(ackErr)}`;
opts.log.warn(`Delivery entry ${entry.id} ${errMsg}`);
opts.onFailed?.(entry, errMsg);
try {
await failDelivery(entry.id, errMsg, opts.stateDir);
return "failed";
} catch (failErr) {
if (getErrnoCode(failErr) === "ENOENT") {
return "already-gone";
}
}
return "failed";
}
}
if (reconciliation?.status === "not_sent") {
opts.log.info(
`Delivery entry ${entry.id} reconciled ${entry.recoveryState} as not sent; replaying`,
);
} else {
const errMsg =
reconciliation?.status === "unresolved" && reconciliation.error
? `delivery state is ${entry.recoveryState} and reconciliation is unresolved: ${reconciliation.error}`
: `delivery state is ${entry.recoveryState}; refusing blind replay without adapter reconciliation`;
opts.log.warn(`Delivery entry ${entry.id} ${errMsg}`);
opts.onFailed?.(entry, errMsg);
if (reconciliation === null || reconciliation.retryable === true) {
try {
await failDelivery(entry.id, errMsg, opts.stateDir);
return "failed";
} catch (failErr) {
if (getErrnoCode(failErr) === "ENOENT") {
return "already-gone";
}
}
return "failed";
}
try {
await moveToFailed(entry.id, opts.stateDir);
return "moved-to-failed";
} catch (moveErr) {
if (getErrnoCode(moveErr) === "ENOENT") {
return "already-gone";
}
}
return "failed";
}
}
try {
await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg));
const result = await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg));
await ackDelivery(entry.id, opts.stateDir);
if (isOutboundDeliveryResultArray(result)) {
await runOutboundDeliveryCommitHooks(result);
}
opts.onRecovered?.(entry);
return "recovered";
} catch (err) {
@@ -314,6 +551,7 @@ export async function drainPendingDeliveries(opts: {
entry: currentEntry,
cfg: opts.cfg,
deliver,
log: opts.log,
stateDir: opts.stateDir,
onFailed: (failedEntry, errMsg) => {
if (isPermanentDeliveryError(errMsg)) {
@@ -405,6 +643,7 @@ export async function recoverPendingDeliveries(opts: {
entry: currentEntry,
cfg: opts.cfg,
deliver: opts.deliver,
log: opts.log,
stateDir: opts.stateDir,
onRecovered: (recoveredEntry) => {
summary.recovered += 1;

View File

@@ -1,10 +1,12 @@
import fs from "node:fs";
import path from "node:path";
import type { ReplyPayload } from "../../auto-reply/types.js";
import type { RenderedMessageBatchPlanItem } from "../../channels/message/types.js";
import { resolveStateDir } from "../../config/paths.js";
import type { ReplyToMode } from "../../config/types.js";
import { generateSecureUuid } from "../secure-random.js";
import type { OutboundDeliveryFormattingOptions } from "./formatting.js";
import type { OutboundIdentity } from "./identity.js";
import type { OutboundMirror } from "./mirror.js";
import type { OutboundSessionContext } from "./session-context.js";
import type { OutboundChannel } from "./targets.js";
@@ -12,6 +14,17 @@ import type { OutboundChannel } from "./targets.js";
const QUEUE_DIRNAME = "delivery-queue";
const FAILED_DIRNAME = "failed";
export type QueuedRenderedMessageBatchPlan = {
payloadCount: number;
textCount: number;
mediaCount: number;
voiceCount: number;
presentationCount: number;
interactiveCount: number;
channelDataCount: number;
items: readonly RenderedMessageBatchPlanItem[];
};
export type QueuedDeliveryPayload = {
channel: Exclude<OutboundChannel, "none">;
to: string;
@@ -22,10 +35,13 @@ export type QueuedDeliveryPayload = {
* should produce the same result on replay.
*/
payloads: ReplyPayload[];
/** Replayable projection summary captured when the durable send intent is created. */
renderedBatchPlan?: QueuedRenderedMessageBatchPlan;
threadId?: string | number | null;
replyToId?: string | null;
replyToMode?: ReplyToMode;
formatting?: OutboundDeliveryFormattingOptions;
identity?: OutboundIdentity;
bestEffort?: boolean;
gifPlayback?: boolean;
forceDocument?: boolean;
@@ -43,6 +59,8 @@ export interface QueuedDelivery extends QueuedDeliveryPayload {
retryCount: number;
lastAttemptAt?: number;
lastError?: string;
platformSendStartedAt?: number;
recoveryState?: "send_attempt_started" | "unknown_after_send";
}
export function resolveQueueDir(stateDir?: string): string {
@@ -144,10 +162,12 @@ export async function enqueueDelivery(
to: params.to,
accountId: params.accountId,
payloads: params.payloads,
renderedBatchPlan: params.renderedBatchPlan,
threadId: params.threadId,
replyToId: params.replyToId,
replyToMode: params.replyToMode,
formatting: params.formatting,
identity: params.identity,
bestEffort: params.bestEffort,
gifPlayback: params.gifPlayback,
forceDocument: params.forceDocument,
@@ -198,6 +218,28 @@ export async function failDelivery(id: string, error: string, stateDir?: string)
await writeQueueEntry(filePath, entry);
}
export async function markDeliveryPlatformSendAttemptStarted(
id: string,
stateDir?: string,
): Promise<void> {
const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`);
const entry = await readQueueEntry(filePath);
entry.platformSendStartedAt = entry.platformSendStartedAt ?? Date.now();
entry.recoveryState = "send_attempt_started";
await writeQueueEntry(filePath, entry);
}
export async function markDeliveryPlatformOutcomeUnknown(
id: string,
stateDir?: string,
): Promise<void> {
const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`);
const entry = await readQueueEntry(filePath);
entry.platformSendStartedAt = entry.platformSendStartedAt ?? Date.now();
entry.recoveryState = "unknown_after_send";
await writeQueueEntry(filePath, entry);
}
/** Load a single pending delivery entry by ID from the queue directory. */
export async function loadPendingDelivery(
id: string,

View File

@@ -1,6 +1,7 @@
import fs from "node:fs";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { attachOutboundDeliveryCommitHook } from "./delivery-commit-hooks.js";
import {
enqueueDelivery,
loadPendingDeliveries,
@@ -14,10 +15,20 @@ import {
setQueuedEntryState,
} from "./delivery-queue.test-helpers.js";
const resolveOutboundChannelMessageAdapterMock = vi.hoisted(() => vi.fn());
vi.mock("./channel-resolution.js", () => ({
resolveOutboundChannelMessageAdapter: resolveOutboundChannelMessageAdapterMock,
}));
describe("delivery-queue recovery", () => {
const { tmpDir } = installDeliveryQueueTmpDirHooks();
const baseCfg = {};
beforeEach(() => {
resolveOutboundChannelMessageAdapterMock.mockReset();
});
const enqueueCrashRecoveryEntries = async () => {
await enqueueDelivery(
{ channel: "demo-channel-a", to: "+1", payloads: [{ text: "a" }] },
@@ -98,6 +109,353 @@ describe("delivery-queue recovery", () => {
expect(entries[0]?.lastError).toBe("network down");
});
it("retains entries abandoned after platform send may have started without reconciliation", async () => {
const id = await enqueueDelivery(
{ channel: "demo-channel-a", to: "+1", payloads: [{ text: "maybe sent" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, {
retryCount: 0,
platformSendStartedAt: Date.now(),
recoveryState: "unknown_after_send",
});
const deliver = vi.fn().mockResolvedValue([]);
const log = createRecoveryLog();
const { result } = await runRecovery({ deliver, log });
expect(deliver).not.toHaveBeenCalled();
expect(result).toEqual({
recovered: 0,
failed: 1,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
const entries = await loadPendingDeliveries(tmpDir());
expect(entries).toHaveLength(1);
expect(entries[0]?.id).toBe(id);
expect(entries[0]?.retryCount).toBe(1);
expect(entries[0]?.lastError).toContain("unknown_after_send");
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("unknown_after_send"));
});
it("retains started entries without reconciliation instead of blindly replaying", async () => {
const id = await enqueueDelivery(
{ channel: "demo-channel-a", to: "+1", payloads: [{ text: "not yet sent" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, {
retryCount: 0,
platformSendStartedAt: Date.now(),
recoveryState: "send_attempt_started",
});
const deliver = vi.fn().mockResolvedValue([]);
const log = createRecoveryLog();
const { result } = await runRecovery({ deliver, log });
expect(deliver).not.toHaveBeenCalled();
expect(result).toEqual({
recovered: 0,
failed: 1,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
const entries = await loadPendingDeliveries(tmpDir());
expect(entries).toHaveLength(1);
expect(entries[0]?.id).toBe(id);
expect(entries[0]?.retryCount).toBe(1);
expect(entries[0]?.lastError).toContain("send_attempt_started");
expect(log.warn).toHaveBeenCalledWith(
expect.stringContaining("refusing blind replay without adapter reconciliation"),
);
});
it("replays started entries only after adapter proves they were not sent", async () => {
const id = await enqueueDelivery(
{ channel: "demo-channel-a", to: "+1", payloads: [{ text: "not yet sent" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, {
retryCount: 0,
platformSendStartedAt: Date.now(),
recoveryState: "send_attempt_started",
});
resolveOutboundChannelMessageAdapterMock.mockReturnValue({
durableFinal: {
capabilities: { reconcileUnknownSend: true },
reconcileUnknownSend: vi.fn().mockResolvedValue({ status: "not_sent" }),
},
});
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver });
expect(resolveOutboundChannelMessageAdapterMock).toHaveBeenCalledWith({
channel: "demo-channel-a",
cfg: baseCfg,
allowBootstrap: true,
});
expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({ channel: "demo-channel-a", to: "+1", skipQueue: true }),
);
expect(result).toEqual({
recovered: 1,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
});
it("acks unknown-after-send entries reconciled as already sent before commit hooks", async () => {
const id = await enqueueDelivery(
{
channel: "demo-channel-a",
to: "+1",
accountId: "acct-1",
payloads: [{ text: "maybe sent" }],
replyToId: "root-message",
threadId: "thread-1",
silent: true,
},
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, {
retryCount: 0,
platformSendStartedAt: Date.now(),
recoveryState: "unknown_after_send",
});
const order: string[] = [];
const afterCommit = vi.fn(() => {
order.push("afterCommit");
});
const reconcileUnknownSend = vi.fn().mockResolvedValue({
status: "sent",
messageId: "platform-1",
receipt: {
primaryPlatformMessageId: "platform-1",
platformMessageIds: ["platform-1"],
parts: [{ platformMessageId: "platform-1", kind: "text", index: 0 }],
sentAt: 1,
},
});
resolveOutboundChannelMessageAdapterMock.mockReturnValue({
durableFinal: {
capabilities: { reconcileUnknownSend: true },
reconcileUnknownSend,
},
send: {
lifecycle: {
afterCommit,
},
},
});
const rename = fs.promises.rename.bind(fs.promises);
const renameSpy = vi.spyOn(fs.promises, "rename").mockImplementation(async (...args) => {
order.push("ack");
return await rename(...args);
});
try {
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver });
expect(deliver).not.toHaveBeenCalled();
expect(result).toEqual({
recovered: 1,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(reconcileUnknownSend).toHaveBeenCalledWith(
expect.objectContaining({
cfg: baseCfg,
queueId: id,
channel: "demo-channel-a",
to: "+1",
accountId: "acct-1",
payloads: [{ text: "maybe sent" }],
replyToId: "root-message",
threadId: "thread-1",
silent: true,
retryCount: 0,
}),
);
expect(afterCommit).toHaveBeenCalledWith(
expect.objectContaining({
kind: "text",
to: "+1",
accountId: "acct-1",
replyToId: "root-message",
threadId: "thread-1",
silent: true,
result: expect.objectContaining({ messageId: "platform-1" }),
}),
);
expect(order).toEqual(["ack", "afterCommit"]);
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
} finally {
renameSpy.mockRestore();
}
});
it("records retry state when acking a reconciled sent entry fails", async () => {
const id = await enqueueDelivery(
{ channel: "demo-channel-a", to: "+1", payloads: [{ text: "maybe sent" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, {
retryCount: 0,
platformSendStartedAt: Date.now(),
recoveryState: "unknown_after_send",
});
resolveOutboundChannelMessageAdapterMock.mockReturnValue({
durableFinal: {
capabilities: { reconcileUnknownSend: true },
reconcileUnknownSend: vi.fn().mockResolvedValue({
status: "sent",
messageId: "platform-1",
receipt: {
primaryPlatformMessageId: "platform-1",
platformMessageIds: ["platform-1"],
parts: [{ platformMessageId: "platform-1", kind: "text", index: 0 }],
sentAt: 1,
},
}),
},
});
const renameSpy = vi
.spyOn(fs.promises, "rename")
.mockRejectedValueOnce(Object.assign(new Error("ack denied"), { code: "EACCES" }));
try {
const deliver = vi.fn().mockResolvedValue([]);
const log = createRecoveryLog();
const { result } = await runRecovery({ deliver, log });
expect(deliver).not.toHaveBeenCalled();
expect(result).toEqual({
recovered: 0,
failed: 1,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
const entries = await loadPendingDeliveries(tmpDir());
expect(entries).toHaveLength(1);
expect(entries[0]?.id).toBe(id);
expect(entries[0]?.retryCount).toBe(1);
expect(entries[0]?.lastError).toContain("failed to ack reconciled sent delivery");
expect(entries[0]?.lastError).toContain("ack denied");
expect(log.warn).toHaveBeenCalledWith(
expect.stringContaining("failed to ack reconciled sent delivery"),
);
} finally {
renameSpy.mockRestore();
}
});
it("replays unknown-after-send entries only after adapter proves they were not sent", async () => {
const id = await enqueueDelivery(
{ channel: "demo-channel-a", to: "+1", payloads: [{ text: "not sent" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, {
retryCount: 0,
platformSendStartedAt: Date.now(),
recoveryState: "unknown_after_send",
});
resolveOutboundChannelMessageAdapterMock.mockReturnValue({
durableFinal: {
capabilities: { reconcileUnknownSend: true },
reconcileUnknownSend: vi.fn().mockResolvedValue({ status: "not_sent" }),
},
});
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledTimes(1);
expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({ channel: "demo-channel-a", to: "+1", skipQueue: true }),
);
expect(result).toEqual({
recovered: 1,
failed: 0,
skippedMaxRetries: 0,
deferredBackoff: 0,
});
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
});
it("keeps retryable unresolved unknown-after-send entries on the queue without replaying", async () => {
const id = await enqueueDelivery(
{ channel: "demo-channel-a", to: "+1", payloads: [{ text: "unknown" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, {
retryCount: 0,
platformSendStartedAt: Date.now(),
recoveryState: "unknown_after_send",
});
resolveOutboundChannelMessageAdapterMock.mockReturnValue({
durableFinal: {
capabilities: { reconcileUnknownSend: true },
reconcileUnknownSend: vi.fn().mockResolvedValue({
status: "unresolved",
error: "provider lookup timed out",
retryable: true,
}),
},
});
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver });
expect(deliver).not.toHaveBeenCalled();
expect(result.failed).toBe(1);
const entries = await loadPendingDeliveries(tmpDir());
expect(entries).toHaveLength(1);
expect(entries[0]?.id).toBe(id);
expect(entries[0]?.retryCount).toBe(1);
expect(entries[0]?.recoveryState).toBe("unknown_after_send");
expect(entries[0]?.lastError).toContain("provider lookup timed out");
});
it("does not reconcile unknown-after-send entries unless the adapter declares the capability", async () => {
const id = await enqueueDelivery(
{ channel: "demo-channel-a", to: "+1", payloads: [{ text: "hidden method" }] },
tmpDir(),
);
setQueuedEntryState(tmpDir(), id, {
retryCount: 0,
platformSendStartedAt: Date.now(),
recoveryState: "unknown_after_send",
});
const reconcileUnknownSend = vi.fn().mockResolvedValue({ status: "not_sent" });
resolveOutboundChannelMessageAdapterMock.mockReturnValue({
durableFinal: {
reconcileUnknownSend,
},
});
const deliver = vi.fn().mockResolvedValue([]);
const log = createRecoveryLog();
const { result } = await runRecovery({ deliver, log });
expect(reconcileUnknownSend).not.toHaveBeenCalled();
expect(deliver).not.toHaveBeenCalled();
expect(result.failed).toBe(1);
const entries = await loadPendingDeliveries(tmpDir());
expect(entries).toHaveLength(1);
expect(entries[0]?.id).toBe(id);
expect(entries[0]?.retryCount).toBe(1);
expect(log.warn).toHaveBeenCalledWith(
expect.stringContaining("refusing blind replay without adapter reconciliation"),
);
});
it("moves entries to failed/ immediately on permanent delivery errors", async () => {
const id = await enqueueDelivery(
{ channel: "demo-channel", to: "user:abc", payloads: [{ text: "hi" }] },
@@ -150,6 +508,36 @@ describe("delivery-queue recovery", () => {
expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true }));
});
it("runs recovered send commit hooks only after the queue entry is acked", async () => {
const id = await enqueueDelivery(
{ channel: "demo-channel-a", to: "+1", payloads: [{ text: "a" }] },
tmpDir(),
);
const order: string[] = [];
const result = attachOutboundDeliveryCommitHook(
{ channel: "demo-channel-a", messageId: "m1" },
async () => {
order.push(
fs.existsSync(path.join(tmpDir(), "delivery-queue", "pending", `${id}.json`))
? "commit-before-ack"
: "commit-after-ack",
);
},
);
const deliver = vi.fn(async () => {
order.push("deliver");
return [result];
});
await runRecovery({ deliver });
expect(order).toEqual(["deliver", "commit-after-ack"]);
expect(await loadPendingDeliveries(tmpDir())).toHaveLength(0);
expect(fs.existsSync(path.join(tmpDir(), "delivery-queue", "pending", `${id}.json`))).toBe(
false,
);
});
it("replays stored delivery options during recovery", async () => {
await enqueueDelivery(
{

View File

@@ -6,6 +6,8 @@ import {
enqueueDelivery,
failDelivery,
loadPendingDeliveries,
markDeliveryPlatformOutcomeUnknown,
markDeliveryPlatformSendAttemptStarted,
moveToFailed,
} from "./delivery-queue.js";
import { installDeliveryQueueTmpDirHooks, readQueuedEntry } from "./delivery-queue.test-helpers.js";
@@ -24,6 +26,16 @@ describe("delivery-queue storage", () => {
channel: "directchat",
to: "+1555",
payloads: [{ text: "hello" }],
renderedBatchPlan: {
payloadCount: 1,
textCount: 1,
mediaCount: 0,
voiceCount: 0,
presentationCount: 0,
interactiveCount: 0,
channelDataCount: 0,
items: [{ index: 0, kinds: ["text"] as const, text: "hello", mediaUrls: [] }],
},
bestEffort: true,
gifPlayback: true,
silent: true,
@@ -50,6 +62,16 @@ describe("delivery-queue storage", () => {
id,
channel: "directchat",
to: "+1555",
renderedBatchPlan: {
payloadCount: 1,
textCount: 1,
mediaCount: 0,
voiceCount: 0,
presentationCount: 0,
interactiveCount: 0,
channelDataCount: 0,
items: [{ index: 0, kinds: ["text"] as const, text: "hello", mediaUrls: [] }],
},
bestEffort: true,
gifPlayback: true,
silent: true,
@@ -115,6 +137,45 @@ describe("delivery-queue storage", () => {
});
describe("failDelivery", () => {
it("marks entries as send-attempt-started before platform I/O", async () => {
const id = await enqueueTextDelivery(
{
channel: "forum",
to: "123",
payloads: [{ text: "test" }],
},
tmpDir(),
);
await markDeliveryPlatformSendAttemptStarted(id, tmpDir());
const entry = readQueuedEntry(tmpDir(), id);
expect(typeof entry.platformSendStartedAt).toBe("number");
expect((entry.platformSendStartedAt as number) > 0).toBe(true);
expect(entry.recoveryState).toBe("send_attempt_started");
expect(entry.retryCount).toBe(0);
});
it("marks entries as unknown-after-send after platform I/O returns", async () => {
const id = await enqueueTextDelivery(
{
channel: "forum",
to: "123",
payloads: [{ text: "test" }],
},
tmpDir(),
);
await markDeliveryPlatformSendAttemptStarted(id, tmpDir());
await markDeliveryPlatformOutcomeUnknown(id, tmpDir());
const entry = readQueuedEntry(tmpDir(), id);
expect(typeof entry.platformSendStartedAt).toBe("number");
expect((entry.platformSendStartedAt as number) > 0).toBe(true);
expect(entry.recoveryState).toBe("unknown_after_send");
expect(entry.retryCount).toBe(0);
});
it("increments retryCount, records attempt time, and sets lastError", async () => {
const id = await enqueueTextDelivery(
{

View File

@@ -40,7 +40,13 @@ export function readQueuedEntry(tmpDir: string, id: string): Record<string, unkn
export function setQueuedEntryState(
tmpDir: string,
id: string,
state: { retryCount: number; lastAttemptAt?: number; enqueuedAt?: number },
state: {
retryCount: number;
lastAttemptAt?: number;
enqueuedAt?: number;
platformSendStartedAt?: number;
recoveryState?: "send_attempt_started" | "unknown_after_send";
},
): void {
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
const entry = readQueuedEntry(tmpDir, id);
@@ -53,6 +59,12 @@ export function setQueuedEntryState(
if (state.enqueuedAt !== undefined) {
entry.enqueuedAt = state.enqueuedAt;
}
if (state.platformSendStartedAt !== undefined) {
entry.platformSendStartedAt = state.platformSendStartedAt;
}
if (state.recoveryState !== undefined) {
entry.recoveryState = state.recoveryState;
}
fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
}

View File

@@ -5,9 +5,15 @@ export {
failDelivery,
loadPendingDelivery,
loadPendingDeliveries,
markDeliveryPlatformOutcomeUnknown,
markDeliveryPlatformSendAttemptStarted,
moveToFailed,
} from "./delivery-queue-storage.js";
export type { QueuedDelivery, QueuedDeliveryPayload } from "./delivery-queue-storage.js";
export type {
QueuedDelivery,
QueuedDeliveryPayload,
QueuedRenderedMessageBatchPlan,
} from "./delivery-queue-storage.js";
export {
computeBackoffMs,
drainPendingDeliveries,

View File

@@ -5,6 +5,7 @@ import {
readStringArrayParam,
readStringParam,
} from "../../agents/tools/common.js";
import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
import { parseReplyDirectives } from "../../auto-reply/reply/reply-directives.js";
import { normalizeChatType, type ChatType } from "../../channels/chat-type.js";
import { getChannelPlugin } from "../../channels/plugins/index.js";
@@ -19,7 +20,9 @@ import {
hasInteractiveReplyBlocks,
hasMessagePresentationBlocks,
hasReplyPayloadContent,
normalizeInteractiveReply,
normalizeMessagePresentation,
type ReplyPayloadDelivery,
} from "../../interactive/payload.js";
import type { OutboundMediaAccess } from "../../media/load-options.js";
import { getAgentScopedMediaLocalRoots } from "../../media/local-roots.js";
@@ -719,6 +722,28 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActi
});
const mirrorMediaUrls =
mergedMediaUrls.length > 0 ? mergedMediaUrls : mediaUrl ? [mediaUrl] : undefined;
const rawDelivery = params.delivery;
const delivery =
rawDelivery && typeof rawDelivery === "object" && !Array.isArray(rawDelivery)
? (rawDelivery as ReplyPayloadDelivery)
: undefined;
const rawChannelData = params.channelData;
const channelData =
rawChannelData && typeof rawChannelData === "object" && !Array.isArray(rawChannelData)
? (rawChannelData as Record<string, unknown>)
: undefined;
const presentation = normalizeMessagePresentation(params.presentation);
const interactive = normalizeInteractiveReply(params.interactive);
const payload: ReplyPayload = {
text: message,
...(mediaUrl ? { mediaUrl } : {}),
...(mergedMediaUrls.length ? { mediaUrls: mergedMediaUrls } : {}),
...(asVoice ? { audioAsVoice: true } : {}),
...(presentation ? { presentation } : {}),
...(interactive ? { interactive } : {}),
...(delivery ? { delivery } : {}),
...(channelData ? { channelData } : {}),
};
throwIfAborted(abortSignal);
const gatewayPluginAction = await runGatewayPluginMessageActionOrNull({
@@ -779,6 +804,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActi
},
to,
message,
payload,
mediaUrl: mediaUrl || undefined,
mediaUrls: mergedMediaUrls.length ? mergedMediaUrls : undefined,
asVoice,

View File

@@ -4,6 +4,7 @@ const mocks = vi.hoisted(() => ({
getChannelPlugin: vi.fn(),
resolveOutboundTarget: vi.fn(),
deliverOutboundPayloads: vi.fn(),
resolveOutboundDurableFinalDeliverySupport: vi.fn(),
resolveRuntimePluginRegistry: vi.fn(),
}));
@@ -43,6 +44,7 @@ vi.mock("./targets.js", () => ({
vi.mock("./deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
resolveOutboundDurableFinalDeliverySupport: mocks.resolveOutboundDurableFinalDeliverySupport,
}));
vi.mock("../../utils/message-channel.js", async () => {
@@ -78,6 +80,7 @@ describe("sendMessage", () => {
mocks.getChannelPlugin.mockClear();
mocks.resolveOutboundTarget.mockClear();
mocks.deliverOutboundPayloads.mockClear();
mocks.resolveOutboundDurableFinalDeliverySupport.mockClear();
mocks.resolveRuntimePluginRegistry.mockClear();
mocks.getChannelPlugin.mockReturnValue({
@@ -85,6 +88,7 @@ describe("sendMessage", () => {
});
mocks.resolveOutboundTarget.mockImplementation(({ to }: { to: string }) => ({ ok: true, to }));
mocks.deliverOutboundPayloads.mockResolvedValue([{ channel: "forum", messageId: "m1" }]);
mocks.resolveOutboundDurableFinalDeliverySupport.mockResolvedValue({ ok: true });
});
it("passes explicit agentId to outbound delivery for scoped media roots", async () => {
@@ -227,6 +231,66 @@ describe("sendMessage", () => {
);
});
it("forwards prepared payloads and required queue policy into outbound delivery", async () => {
const mediaAccess = {
localRoots: ["/tmp/media"],
readFile: vi.fn(async () => Buffer.from("media")),
};
await sendMessage({
cfg: {},
channel: "forum",
to: "123456",
content: "fallback text",
payloads: [{ text: "prepared", channelData: { forum: { card: true } } }],
queuePolicy: "required",
mediaAccess,
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
payloads: [
expect.objectContaining({
text: "prepared",
channelData: { forum: { card: true } },
}),
],
queuePolicy: "required",
mediaAccess,
}),
);
expect(mocks.resolveOutboundDurableFinalDeliverySupport).toHaveBeenCalledWith(
expect.objectContaining({
channel: "forum",
requirements: expect.objectContaining({
payload: true,
reconcileUnknownSend: true,
}),
}),
);
});
it("rejects required durable sends before enqueue when replay safety is unsupported", async () => {
mocks.resolveOutboundDurableFinalDeliverySupport.mockResolvedValueOnce({
ok: false,
reason: "capability_mismatch",
capability: "reconcileUnknownSend",
});
await expect(
sendMessage({
cfg: {},
channel: "forum",
to: "123456",
content: "fallback text",
payloads: [{ text: "prepared", channelData: { forum: { card: true } } }],
queuePolicy: "required",
}),
).rejects.toThrow("missing reconcileUnknownSend");
expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
});
it("applies mirror matrix semantics for MEDIA and silent token variants", async () => {
const matrix: Array<{
name: string;

View File

@@ -1,4 +1,7 @@
import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
import { deriveDurableFinalDeliveryRequirements } from "../../channels/message/capabilities.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import type { OutboundMediaAccess } from "../../media/load-options.js";
import type { PollInput } from "../../polls.js";
import { normalizePollInput } from "../../polls.js";
import {
@@ -11,7 +14,10 @@ import { resolveOutboundChannelPlugin } from "./channel-resolution.js";
import { resolveMessageChannelSelection } from "./channel-selection.js";
import {
deliverOutboundPayloads,
resolveOutboundDurableFinalDeliverySupport,
type DurableFinalDeliveryRequirements,
type OutboundDeliveryResult,
type OutboundDeliveryQueuePolicy,
type OutboundSendDeps,
} from "./deliver.js";
import type { OutboundMirror } from "./mirror.js";
@@ -75,6 +81,9 @@ type MessageSendParams = {
threadId?: string | number;
dryRun?: boolean;
bestEffort?: boolean;
queuePolicy?: OutboundDeliveryQueuePolicy;
payloads?: ReplyPayload[];
mediaAccess?: OutboundMediaAccess;
deps?: OutboundSendDeps;
cfg?: OpenClawConfig;
gateway?: MessageGatewayOptions;
@@ -177,6 +186,76 @@ function resolveRequiredPlugin(channel: string, cfg: OpenClawConfig) {
return plugin;
}
function payloadRequiresDurablePayloadTransport(payload: ReplyPayload): boolean {
return (
payload.presentation !== undefined ||
payload.delivery !== undefined ||
payload.interactive !== undefined ||
(payload.channelData !== undefined && Object.keys(payload.channelData).length > 0)
);
}
function mergeDurableRequirements(
target: DurableFinalDeliveryRequirements,
source: DurableFinalDeliveryRequirements,
): DurableFinalDeliveryRequirements {
for (const [capability, required] of Object.entries(source) as Array<
[keyof DurableFinalDeliveryRequirements, boolean | undefined]
>) {
if (required === true) {
target[capability] = true;
}
}
return target;
}
function deriveRequiredMessageSendCapabilities(params: {
payloads: ReplyPayload[];
replyToId?: string | null;
threadId?: string | number | null;
silent?: boolean;
}): DurableFinalDeliveryRequirements {
const requirements: DurableFinalDeliveryRequirements = { reconcileUnknownSend: true };
for (const payload of params.payloads) {
mergeDurableRequirements(
requirements,
deriveDurableFinalDeliveryRequirements({
payload,
replyToId: params.replyToId,
threadId: params.threadId,
silent: params.silent,
payloadTransport: payloadRequiresDurablePayloadTransport(payload),
batch: params.payloads.length > 1,
reconcileUnknownSend: true,
}),
);
}
return requirements;
}
async function assertRequiredMessageSendDurability(params: {
cfg: OpenClawConfig;
channel: Exclude<string, "none">;
payloads: ReplyPayload[];
replyToId?: string | null;
threadId?: string | number | null;
silent?: boolean;
}): Promise<void> {
const support = await resolveOutboundDurableFinalDeliverySupport({
cfg: params.cfg,
channel: params.channel,
requirements: deriveRequiredMessageSendCapabilities(params),
});
if (support.ok) {
return;
}
const suffix =
support.reason === "capability_mismatch" && support.capability
? `missing ${support.capability}`
: support.reason;
throw new Error(`Required durable message send is unsupported for ${params.channel}: ${suffix}`);
}
function resolveGatewayOptions(opts?: MessageGatewayOptions) {
// Security: backend callers (tools/agents) must not accept user-controlled gateway URLs.
// Use config-derived gateway target only.
@@ -238,14 +317,18 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
const channel = await resolveRequiredChannel({ cfg, channel: params.channel });
const plugin = resolveRequiredPlugin(channel, cfg);
const deliveryMode = plugin.outbound?.deliveryMode ?? "direct";
const outboundPlan = createOutboundPayloadPlan([
{
text: params.content,
mediaUrl: params.mediaUrl,
mediaUrls: params.mediaUrls,
audioAsVoice: params.asVoice === true,
},
]);
const outboundPayloads =
params.payloads && params.payloads.length > 0
? params.payloads
: [
{
text: params.content,
mediaUrl: params.mediaUrl,
mediaUrls: params.mediaUrls,
audioAsVoice: params.asVoice === true,
},
];
const outboundPlan = createOutboundPayloadPlan(outboundPayloads);
const normalizedPayloads = projectOutboundPayloadPlanForDelivery(outboundPlan);
const mirrorProjection = projectOutboundPayloadPlanForMirror(outboundPlan);
const mirrorText = mirrorProjection.text;
@@ -286,6 +369,16 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
requesterSenderUsername: params.requesterSenderUsername,
requesterSenderE164: params.requesterSenderE164,
});
if (params.queuePolicy === "required") {
await assertRequiredMessageSendDurability({
cfg,
channel: outboundChannel,
payloads: normalizedPayloads,
replyToId: params.replyToId,
threadId: params.threadId,
silent: params.silent,
});
}
const results = await deliverOutboundPayloads({
cfg,
channel: outboundChannel,
@@ -299,8 +392,10 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
forceDocument: params.forceDocument,
deps: params.deps,
bestEffort: params.bestEffort,
queuePolicy: params.queuePolicy,
abortSignal: params.abortSignal,
silent: params.silent,
mediaAccess: params.mediaAccess,
mirror: params.mirror
? {
...params.mirror,

View File

@@ -1,4 +1,10 @@
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import type { ChannelPlugin } from "../../channels/plugins/types.public.js";
import { setActivePluginRegistry } from "../../plugins/runtime.js";
import {
createChannelTestPluginBase,
createTestRegistry,
} from "../../test-utils/channel-plugins.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../../utils/message-channel.js";
const getDefaultMediaLocalRootsMock = vi.hoisted(() => vi.fn(() => []));
@@ -175,6 +181,7 @@ describe("executeSendAction", () => {
});
beforeEach(() => {
setActivePluginRegistry(createTestRegistry([]));
mocks.dispatchChannelMessageAction.mockClear();
mocks.sendMessage.mockClear();
mocks.sendPoll.mockClear();
@@ -589,6 +596,92 @@ describe("executeSendAction", () => {
);
});
it("routes prepared plugin send payloads through core best-effort delivery by default", async () => {
const prepareSendPayload = vi.fn(({ payload }) => ({
...payload,
channelData: { prepared: true },
}));
const plugin: ChannelPlugin = {
...createChannelTestPluginBase({ id: "discord" }),
actions: {
describeMessageTool: () => ({ actions: ["send"] }),
prepareSendPayload,
handleAction: async () => ({ content: [], details: { ok: true } }),
},
outbound: { deliveryMode: "direct" },
};
setActivePluginRegistry(createTestRegistry([{ pluginId: "discord", plugin, source: "test" }]));
mocks.sendMessage.mockResolvedValue({
channel: "discord",
to: "channel:123",
via: "direct",
mediaUrl: null,
});
await executeSendAction({
ctx: {
cfg: {},
channel: "discord",
params: { to: "channel:123", message: "hello" },
dryRun: false,
},
to: "channel:123",
message: "hello",
});
expect(prepareSendPayload).toHaveBeenCalled();
expect(mocks.dispatchChannelMessageAction).not.toHaveBeenCalled();
expect(mocks.sendMessage).toHaveBeenCalledWith(
expect.objectContaining({
channel: "discord",
queuePolicy: "best_effort",
payloads: [expect.objectContaining({ channelData: { prepared: true } })],
}),
);
});
it("uses required core delivery only when the send action opts out of best-effort", async () => {
const prepareSendPayload = vi.fn(({ payload }) => ({
...payload,
channelData: { prepared: true },
}));
const plugin: ChannelPlugin = {
...createChannelTestPluginBase({ id: "discord" }),
actions: {
describeMessageTool: () => ({ actions: ["send"] }),
prepareSendPayload,
handleAction: async () => ({ content: [], details: { ok: true } }),
},
outbound: { deliveryMode: "direct" },
};
setActivePluginRegistry(createTestRegistry([{ pluginId: "discord", plugin, source: "test" }]));
mocks.sendMessage.mockResolvedValue({
channel: "discord",
to: "channel:123",
via: "direct",
mediaUrl: null,
});
await executeSendAction({
ctx: {
cfg: {},
channel: "discord",
params: { to: "channel:123", message: "hello" },
dryRun: false,
},
to: "channel:123",
message: "hello",
bestEffort: false,
});
expect(mocks.sendMessage).toHaveBeenCalledWith(
expect.objectContaining({
channel: "discord",
queuePolicy: "required",
}),
);
});
it("forwards poll args to sendPoll on core outbound path", async () => {
mocks.dispatchChannelMessageAction.mockResolvedValue(null);
mocks.sendPoll.mockResolvedValue({

View File

@@ -1,7 +1,9 @@
import type { AgentToolResult } from "@mariozechner/pi-agent-core";
import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
import { dispatchChannelMessageAction } from "../../channels/plugins/message-action-dispatch.js";
import type {
ChannelId,
ChannelMessageActionContext,
ChannelThreadingToolContext,
} from "../../channels/plugins/types.public.js";
import { appendAssistantMessageToSessionTranscript } from "../../config/sessions.js";
@@ -10,6 +12,7 @@ import type { OutboundMediaAccess, OutboundMediaReadFile } from "../../media/loa
import { resolveAgentScopedOutboundMediaAccess } from "../../media/read-capability.js";
import type { GatewayClientMode, GatewayClientName } from "../../utils/message-channel.js";
import { throwIfAborted } from "./abort.js";
import { resolveOutboundChannelPlugin } from "./channel-resolution.js";
import type { OutboundSendDeps } from "./deliver.js";
import type { MessagePollResult, MessageSendResult } from "./message.js";
import { sendMessage, sendPoll } from "./message.js";
@@ -122,10 +125,66 @@ async function tryHandleWithPluginAction(params: {
};
}
function createChannelActionContext(params: {
ctx: OutboundSendContext;
action: "send" | "poll";
mediaAccess?: ReturnType<typeof resolveAgentScopedOutboundMediaAccess>;
}): ChannelMessageActionContext {
const mediaAccess = params.mediaAccess ?? params.ctx.mediaAccess;
return {
channel: params.ctx.channel,
action: params.action,
cfg: params.ctx.cfg,
params: params.ctx.params,
...(mediaAccess ? { mediaAccess } : {}),
mediaLocalRoots: mediaAccess?.localRoots ?? params.ctx.mediaAccess?.localRoots,
mediaReadFile: mediaAccess?.readFile ?? params.ctx.mediaReadFile,
accountId: params.ctx.accountId ?? undefined,
requesterSenderId: params.ctx.requesterSenderId,
senderIsOwner: params.ctx.senderIsOwner,
sessionKey: params.ctx.sessionKey,
sessionId: params.ctx.sessionId,
agentId: params.ctx.agentId,
gateway: params.ctx.gateway,
toolContext: params.ctx.toolContext,
dryRun: params.ctx.dryRun,
};
}
async function tryPreparePluginSendPayload(params: {
ctx: OutboundSendContext;
to: string;
payload: ReplyPayload;
replyToId?: string;
threadId?: string | number;
}): Promise<ReplyPayload | null> {
const plugin = resolveOutboundChannelPlugin({
channel: params.ctx.channel,
cfg: params.ctx.cfg,
});
if (!plugin?.outbound) {
return null;
}
const prepareSendPayload = plugin?.actions?.prepareSendPayload;
if (!prepareSendPayload) {
return null;
}
return (
(await prepareSendPayload({
ctx: createChannelActionContext({ ctx: params.ctx, action: "send" }),
to: params.to,
payload: params.payload,
replyToId: params.replyToId,
threadId: params.threadId,
})) ?? null
);
}
export async function executeSendAction(params: {
ctx: OutboundSendContext;
to: string;
message: string;
payload?: ReplyPayload;
mediaUrl?: string;
mediaUrls?: string[];
asVoice?: boolean;
@@ -141,6 +200,61 @@ export async function executeSendAction(params: {
sendResult?: MessageSendResult;
}> {
throwIfAborted(params.ctx.abortSignal);
const defaultPayload: ReplyPayload = params.payload ?? {
text: params.message,
mediaUrl: params.mediaUrl,
mediaUrls: params.mediaUrls,
audioAsVoice: params.asVoice === true,
};
const queuePolicy = params.bestEffort === false ? "required" : "best_effort";
const preparedPayload = await tryPreparePluginSendPayload({
ctx: params.ctx,
to: params.to,
payload: defaultPayload,
replyToId: params.replyToId,
threadId: params.threadId,
});
if (preparedPayload) {
throwIfAborted(params.ctx.abortSignal);
const result: MessageSendResult = await sendMessage({
cfg: params.ctx.cfg,
to: params.to,
content: params.message,
payloads: [preparedPayload],
agentId: params.ctx.agentId,
requesterSessionKey: params.ctx.sessionKey,
requesterAccountId: params.ctx.requesterAccountId ?? params.ctx.accountId ?? undefined,
requesterSenderId: params.ctx.requesterSenderId,
requesterSenderName: params.ctx.requesterSenderName,
requesterSenderUsername: params.ctx.requesterSenderUsername,
requesterSenderE164: params.ctx.requesterSenderE164,
mediaUrl: params.mediaUrl || undefined,
mediaUrls: params.mediaUrls,
asVoice: params.asVoice,
channel: params.ctx.channel || undefined,
accountId: params.ctx.accountId ?? undefined,
replyToId: params.replyToId,
threadId: params.threadId,
gifPlayback: params.gifPlayback,
forceDocument: params.forceDocument,
dryRun: params.ctx.dryRun,
bestEffort: params.bestEffort ?? undefined,
queuePolicy,
deps: params.ctx.deps,
gateway: params.ctx.gateway,
mirror: params.ctx.mirror,
abortSignal: params.ctx.abortSignal,
silent: params.ctx.silent,
mediaAccess: params.ctx.mediaAccess,
});
return {
handledBy: "core",
payload: result,
sendResult: result,
};
}
const pluginHandled = await tryHandleWithPluginAction({
ctx: params.ctx,
action: "send",
@@ -190,11 +304,13 @@ export async function executeSendAction(params: {
forceDocument: params.forceDocument,
dryRun: params.ctx.dryRun,
bestEffort: params.bestEffort ?? undefined,
queuePolicy,
deps: params.ctx.deps,
gateway: params.ctx.gateway,
mirror: params.ctx.mirror,
abortSignal: params.ctx.abortSignal,
silent: params.ctx.silent,
mediaAccess: params.ctx.mediaAccess,
});
return {

View File

@@ -956,6 +956,7 @@ describe("test-projects args", () => {
includePatterns: [
"extensions/discord/src/api-barrel.test.ts",
"extensions/discord/src/channel-actions.contract.test.ts",
"extensions/discord/src/channel.message-adapter.test.ts",
"extensions/discord/src/channel.test.ts",
"extensions/discord/src/monitor/message-handler.bot-self-filter.test.ts",
"extensions/discord/src/monitor/message-handler.queue.test.ts",

View File

@@ -19,8 +19,8 @@ import {
describe("usage-format", () => {
const originalAgentDir = process.env.OPENCLAW_AGENT_DIR;
const originalStateDir = process.env.OPENCLAW_STATE_DIR;
let stateDir: string;
let agentDir: string;
let stateDir: string;
beforeEach(async () => {
stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-usage-format-"));