From f3fe48e8b791868cae6fa0fb6d68dfc2d18aa9ca Mon Sep 17 00:00:00 2001 From: Mariano <132747814+mbelinky@users.noreply.github.com> Date: Wed, 27 May 2026 14:34:47 +0200 Subject: [PATCH] Make Telegram sendMessage actions durable (#87261) Route Telegram sendMessage action replies through durable outbound delivery so completed agent responses remain retryable when the gateway send path times out. Verified with focused Telegram/outbound tests, extension test typecheck, prepare build/check/full test gates, and green CI rerun for head 20b45687e11f2e0382df336e30f533874b8e66c2. --- CHANGELOG.md | 6 + .../telegram/src/action-runtime.test.ts | 307 +++++++++++++++++- extensions/telegram/src/action-runtime.ts | 146 +++++---- .../telegram/src/outbound-adapter.test.ts | 2 + extensions/telegram/src/outbound-adapter.ts | 3 +- src/channels/plugins/outbound.types.ts | 1 + src/infra/outbound/deliver.test.ts | 3 + src/infra/outbound/deliver.ts | 10 +- src/plugin-sdk/channel-outbound.ts | 3 +- 9 files changed, 405 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 99879bf6933..25e60d33ee0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ Docs: https://docs.openclaw.ai +## Unreleased + +### Fixes + +- Telegram: route `sendMessage` action replies through durable outbound delivery so completed agent responses remain retryable when the gateway send path times out. (#87261) Thanks @mbelinky. + ## 2026.5.26 ### Highlights diff --git a/extensions/telegram/src/action-runtime.test.ts b/extensions/telegram/src/action-runtime.test.ts index 0c8ecf69150..943c79c9298 100644 --- a/extensions/telegram/src/action-runtime.test.ts +++ b/extensions/telegram/src/action-runtime.test.ts @@ -1,3 +1,4 @@ +import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; @@ -15,10 +16,127 @@ import { const originalTelegramActionRuntime = { ...telegramActionRuntime }; const reactMessageTelegram = vi.fn(async () => ({ ok: true })); -const sendMessageTelegram = vi.fn(async () => ({ - messageId: "789", - chatId: "123", -})); +const sendMessageTelegram = vi.fn( + async (_to: string, _text: string, _opts?: Record) => ({ + messageId: "789", + chatId: "123", + }), +); +const sendDurableMessageBatch = vi.fn( + async (params: { + cfg: OpenClawConfig; + to: string; + accountId?: string; + payloads: Array<{ + text?: string; + mediaUrl?: string; + mediaUrls?: string[]; + audioAsVoice?: boolean; + delivery?: { + pin?: true | { enabled?: boolean; notify?: boolean; required?: boolean }; + }; + channelData?: { telegram?: { buttons?: unknown; quoteText?: string } }; + }>; + replyToId?: string; + threadId?: string | number; + forceDocument?: boolean; + silent?: boolean; + gatewayClientScopes?: readonly string[]; + session?: { + key?: string; + agentId?: string; + requesterAccountId?: string; + }; + mediaAccess?: { + localRoots?: readonly string[]; + readFile?: (filePath: string) => Promise; + }; + }) => { + const payload = params.payloads[0] ?? {}; + const mediaUrls = payload.mediaUrls?.length + ? payload.mediaUrls + : payload.mediaUrl + ? [payload.mediaUrl] + : []; + const telegramData = payload.channelData?.telegram; + const cfg = params.cfg as { + channels?: { + telegram?: { + botToken?: string; + accounts?: Record; + }; + }; + }; + const token = + (params.accountId + ? cfg.channels?.telegram?.accounts?.[params.accountId]?.botToken + : undefined) ?? + cfg.channels?.telegram?.botToken ?? + process.env.TELEGRAM_BOT_TOKEN; + const baseOptions = { + cfg: params.cfg, + token, + accountId: params.accountId, + gatewayClientScopes: params.gatewayClientScopes, + replyToMessageId: + params.replyToId == null ? undefined : Number.parseInt(params.replyToId, 10), + messageThreadId: + params.threadId == null ? undefined : Number.parseInt(String(params.threadId), 10), + quoteText: telegramData?.quoteText, + asVoice: payload.audioAsVoice, + silent: params.silent, + forceDocument: params.forceDocument, + mediaLocalRoots: params.mediaAccess?.localRoots, + mediaReadFile: params.mediaAccess?.readFile, + }; + const calls = mediaUrls.length > 0 ? mediaUrls : [undefined]; + let last = { messageId: "789", chatId: "123" }; + for (const [index, mediaUrl] of calls.entries()) { + last = await sendMessageTelegram(params.to, index === 0 ? (payload.text ?? "") : "", { + ...baseOptions, + ...(mediaUrl ? { mediaUrl } : {}), + ...(index === 0 && telegramData?.buttons ? { buttons: telegramData.buttons } : {}), + }); + } + const pin = + payload.delivery?.pin === true + ? { enabled: true } + : payload.delivery?.pin && payload.delivery.pin.enabled + ? payload.delivery.pin + : undefined; + if (pin && last.messageId) { + try { + await pinMessageTelegram(params.to, last.messageId, { + cfg: params.cfg, + accountId: params.accountId, + notify: pin.notify, + verbose: false, + gatewayClientScopes: params.gatewayClientScopes, + }); + } catch (err) { + if (pin.required) { + throw err; + } + } + } + return { + status: "sent", + results: [{ channel: "telegram", messageId: last.messageId, chatId: last.chatId }], + receipt: { + primaryPlatformMessageId: last.messageId, + platformMessageIds: [last.messageId], + parts: [ + { + platformMessageId: last.messageId, + kind: mediaUrls.length > 0 ? "media" : "text", + index: 0, + }, + ], + sentAt: Date.now(), + }, + } as const; + }, +); const sendPollTelegram = vi.fn(async () => ({ messageId: "790", chatId: "123", @@ -40,11 +158,13 @@ const editForumTopicTelegram = vi.fn(async () => ({ messageThreadId: 42, name: "Renamed", })); -const pinMessageTelegram = vi.fn(async () => ({ - ok: true, - messageId: "789", - chatId: "123", -})); +const pinMessageTelegram = vi.fn( + async (_to: string, _messageId: string, _opts?: Record) => ({ + ok: true, + messageId: "789", + chatId: "123", + }), +); const createForumTopicTelegram = vi.fn(async () => ({ topicId: 99, name: "Topic", @@ -109,6 +229,20 @@ function resultDetails(result: Awaited>) return requireRecord(result.details, "Telegram action details"); } +function readDurableQueueEntries(stateDir: string): Record[] { + const queueDir = path.join(stateDir, "delivery-queue"); + if (!fs.existsSync(queueDir)) { + return []; + } + return fs + .readdirSync(queueDir) + .filter((name) => name.endsWith(".json")) + .map((name) => JSON.parse(fs.readFileSync(path.join(queueDir, name), "utf-8"))) as Record< + string, + unknown + >[]; +} + describe("handleTelegramAction", () => { const defaultReactionAction = { action: "react", @@ -175,11 +309,12 @@ describe("handleTelegramAction", () => { } beforeEach(() => { - envSnapshot = captureEnv(["TELEGRAM_BOT_TOKEN"]); + envSnapshot = captureEnv(["OPENCLAW_STATE_DIR", "TELEGRAM_BOT_TOKEN"]); resetTopicNameCacheForTest(); installTopicNameStoreForTest(); Object.assign(telegramActionRuntime, originalTelegramActionRuntime, { reactMessageTelegram, + sendDurableMessageBatch, sendMessageTelegram, sendPollTelegram, sendStickerTelegram, @@ -190,6 +325,7 @@ describe("handleTelegramAction", () => { createForumTopicTelegram, }); reactMessageTelegram.mockClear(); + sendDurableMessageBatch.mockClear(); sendMessageTelegram.mockClear(); sendPollTelegram.mockClear(); sendStickerTelegram.mockClear(); @@ -417,7 +553,10 @@ describe("handleTelegramAction", () => { content: "Hello, Telegram!", }, telegramConfig(), - { gatewayClientScopes: ["operator.write"] }, + { + gatewayClientScopes: ["operator.write"], + sessionKey: "agent:main:telegram:direct:123", + }, ); const call = mockCall(sendMessageTelegram, 0, "text message"); expect(call[0]).toBe("@testchannel"); @@ -425,6 +564,15 @@ describe("handleTelegramAction", () => { const options = requireRecord(call[2], "text message options"); expect(options.token).toBe("tok"); expect(options.mediaUrl).toBeUndefined(); + const durableCall = mockCall(sendDurableMessageBatch, 0, "durable text message"); + expect(requireRecord(durableCall[0], "durable text message params")).toMatchObject({ + channel: "telegram", + to: "@testchannel", + durability: "required", + gatewayClientScopes: ["operator.write"], + session: { key: "agent:main:telegram:direct:123", agentId: "main" }, + payloads: [{ text: "Hello, Telegram!" }], + }); expect(result.content).toStrictEqual([ { type: "text", @@ -438,6 +586,135 @@ describe("handleTelegramAction", () => { }); }); + it("persists sendMessage action deliveries before Telegram platform send", async () => { + const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-action-durable-")); + const { createOutboundTestPlugin, createTestRegistry, setActivePluginRegistry } = + await import("openclaw/plugin-sdk/plugin-test-runtime"); + const sendText = vi + .fn() + .mockImplementationOnce(async () => { + const entries = readDurableQueueEntries(stateDir); + expect(entries).toHaveLength(1); + expect(entries[0]).toMatchObject({ + channel: "telegram", + to: "12345", + payloads: [ + { + text: "times out after queue write", + delivery: { pin: { enabled: true, required: true } }, + }, + ], + session: { key: "agent:main:telegram:direct:12345", agentId: "main" }, + gatewayClientScopes: ["operator.write"], + retryCount: 0, + }); + throw new Error("telegram timeout"); + }) + .mockImplementationOnce(async () => { + const entries = readDurableQueueEntries(stateDir); + const liveEntry = entries.find((entry) => + JSON.stringify(entry.payloads).includes("delivers after queue write"), + ); + expect(liveEntry).toMatchObject({ + channel: "telegram", + to: "12345", + payloads: [{ text: "delivers after queue write" }], + retryCount: 0, + }); + return { channel: "telegram", messageId: "tg-ok" }; + }); + + process.env.OPENCLAW_STATE_DIR = stateDir; + telegramActionRuntime.sendDurableMessageBatch = + originalTelegramActionRuntime.sendDurableMessageBatch; + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "telegram", + source: "test", + plugin: createOutboundTestPlugin({ + id: "telegram", + outbound: { + deliveryMode: "direct", + deliveryCapabilities: { + durableFinal: { + text: true, + media: true, + payload: true, + silent: true, + replyTo: true, + thread: true, + messageSendingHooks: true, + batch: true, + }, + }, + sendText, + }, + }), + }, + ]), + ); + + try { + await expect( + handleTelegramAction( + { + action: "sendMessage", + to: "12345", + content: "times out after queue write", + delivery: { pin: { enabled: true, required: true } }, + }, + telegramConfig(), + { + gatewayClientScopes: ["operator.write"], + sessionKey: "agent:main:telegram:direct:12345", + }, + ), + ).rejects.toThrow("telegram timeout"); + + const retryableEntries = readDurableQueueEntries(stateDir); + expect(retryableEntries).toHaveLength(1); + expect(retryableEntries[0]).toMatchObject({ + payloads: [ + { + text: "times out after queue write", + delivery: { pin: { enabled: true, required: true } }, + }, + ], + retryCount: 1, + }); + expect(String(retryableEntries[0]?.lastError)).toContain("telegram timeout"); + + const result = await handleTelegramAction( + { + action: "sendMessage", + to: "12345", + content: "delivers after queue write", + }, + telegramConfig(), + { sessionKey: "agent:main:telegram:direct:12345" }, + ); + + expect(result.details).toMatchObject({ + ok: true, + messageId: "tg-ok", + }); + expect(readDurableQueueEntries(stateDir)).toHaveLength(1); + expect(readDurableQueueEntries(stateDir)[0]).toMatchObject({ + payloads: [ + { + text: "times out after queue write", + delivery: { pin: { enabled: true, required: true } }, + }, + ], + retryCount: 1, + }); + } finally { + setActivePluginRegistry(createTestRegistry([])); + fs.rmSync(stateDir, { recursive: true, force: true }); + } + }); + it("normalizes legacy group targets for sendMessage actions", async () => { await handleTelegramAction( { @@ -1092,6 +1369,10 @@ describe("handleTelegramAction", () => { expect(options.accountId).toBeUndefined(); expect(options.verbose).toBe(false); expect(options.gatewayClientScopes).toEqual(["operator.write"]); + const durableCall = mockCall(sendDurableMessageBatch, 0, "durable delivery pin"); + expect(requireRecord(durableCall[0], "durable delivery pin params")).toMatchObject({ + payloads: [{ delivery: { pin: { enabled: true } } }], + }); }); it("passes delivery pin notify requests for action sends", async () => { @@ -1109,6 +1390,10 @@ describe("handleTelegramAction", () => { expect(call[0]).toBe("123456"); expect(call[1]).toBe("789"); expect(requireRecord(call[2], "delivery pin notify options").notify).toBe(true); + const durableCall = mockCall(sendDurableMessageBatch, 0, "durable delivery pin notify"); + expect(requireRecord(durableCall[0], "durable delivery pin notify params")).toMatchObject({ + payloads: [{ delivery: { pin: { enabled: true, notify: true } } }], + }); }); it("fails required action-send pins when pinning fails", async () => { diff --git a/extensions/telegram/src/action-runtime.ts b/extensions/telegram/src/action-runtime.ts index 433dd05defa..3547c6fe388 100644 --- a/extensions/telegram/src/action-runtime.ts +++ b/extensions/telegram/src/action-runtime.ts @@ -10,12 +10,18 @@ import { resolvePollMaxSelections, resolveReactionMessageId, } from "openclaw/plugin-sdk/channel-actions"; +import { + buildOutboundSessionContext, + sendDurableMessageBatch, + type DurableMessageBatchSendResult, +} from "openclaw/plugin-sdk/channel-outbound"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; import { normalizeMessagePresentation, renderMessagePresentationFallbackText, } from "openclaw/plugin-sdk/interactive-runtime"; import type { MessagePresentation } from "openclaw/plugin-sdk/interactive-runtime"; +import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import { resolveStorePath } from "openclaw/plugin-sdk/session-store-runtime"; import { createTelegramActionGate, @@ -56,6 +62,7 @@ export const telegramActionRuntime = { pinMessageTelegram, reactMessageTelegram, searchStickers, + sendDurableMessageBatch, sendMessageTelegram, sendPollTelegram, sendStickerTelegram, @@ -257,37 +264,42 @@ function normalizeTelegramDeliveryPin(params: Record) { } as const; } -async function maybePinTelegramActionSend(params: { - args: Record; - cfg: OpenClawConfig; - accountId?: string; - to: string; - messageId?: string; - gatewayClientScopes?: readonly string[]; -}) { - const pin = normalizeTelegramDeliveryPin(params.args); - if (!pin) { - return; - } - if (!params.messageId) { - if (pin.required) { - throw new Error("Telegram delivery pin requested, but no message id was returned."); - } - return; - } - try { - await telegramActionRuntime.pinMessageTelegram(params.to, params.messageId, { - cfg: params.cfg, - accountId: params.accountId, - notify: pin.notify, - verbose: false, - gatewayClientScopes: params.gatewayClientScopes, - }); - } catch (err) { - if (pin.required) { - throw err; - } - } +function buildTelegramActionSendPayload(params: { + content: string; + mediaUrls: string[]; + asVoice?: boolean; + pin?: ReturnType; + buttons?: ReturnType; + quoteText?: string; +}): ReplyPayload { + const telegramData = + params.buttons || params.quoteText + ? { + ...(params.buttons ? { buttons: params.buttons } : {}), + ...(params.quoteText ? { quoteText: params.quoteText } : {}), + } + : undefined; + return { + text: params.content, + ...(params.mediaUrls.length > 0 ? { mediaUrls: params.mediaUrls } : {}), + ...(params.asVoice === true ? { audioAsVoice: true } : {}), + ...(params.pin ? { delivery: { pin: params.pin } } : {}), + ...(telegramData ? { channelData: { telegram: telegramData } } : {}), + }; +} + +function getLastDurableTelegramActionResult( + result: Extract, +): { messageId?: string; chatId?: string } { + const lastResult = result.results.at(-1); + const receipt = result.receipt; + return { + messageId: + lastResult?.messageId ?? + receipt.primaryPlatformMessageId ?? + receipt.platformMessageIds.at(-1), + chatId: lastResult?.chatId, + }; } export async function handleTelegramAction( @@ -455,10 +467,7 @@ export async function handleTelegramAction( } const sendOptions = { cfg, - token, accountId: accountId ?? undefined, - mediaLocalRoots: options?.mediaLocalRoots, - mediaReadFile: options?.mediaReadFile, gatewayClientScopes: options?.gatewayClientScopes, replyToMessageId: replyToMessageId ?? undefined, messageThreadId: messageThreadId ?? undefined, @@ -470,34 +479,49 @@ export async function handleTelegramAction( readBooleanParam(params, "asDocument") ?? false, }; - let result: Awaited>; - if (!firstMediaUrl) { - result = await telegramActionRuntime.sendMessageTelegram(to, content, { - ...sendOptions, - buttons, - }); - } else { - result = await telegramActionRuntime.sendMessageTelegram(to, content, { - ...sendOptions, - mediaUrl: firstMediaUrl, - buttons, - }); - for (const mediaUrl of mediaUrls.slice(1)) { - result = await telegramActionRuntime.sendMessageTelegram(to, "", { - ...sendOptions, - mediaUrl, - }); - } - } - notifyVisibleOutboundSuccess(to, messageThreadId); - await maybePinTelegramActionSend({ - args: params, - cfg, - accountId: accountId ?? undefined, - to, - messageId: result.messageId, - gatewayClientScopes: options?.gatewayClientScopes, + const payload = buildTelegramActionSendPayload({ + content, + mediaUrls, + asVoice: sendOptions.asVoice, + pin: normalizeTelegramDeliveryPin(params), + buttons, + quoteText, }); + const mediaAccess = + options?.mediaLocalRoots || options?.mediaReadFile + ? { + ...(options.mediaLocalRoots ? { localRoots: options.mediaLocalRoots } : {}), + ...(options.mediaReadFile ? { readFile: options.mediaReadFile } : {}), + } + : undefined; + const outboundSession = buildOutboundSessionContext({ + cfg, + sessionKey: options?.sessionKey, + requesterAccountId: accountId, + }); + const durableResult = await telegramActionRuntime.sendDurableMessageBatch({ + cfg, + channel: "telegram", + to, + accountId: accountId ?? undefined, + payloads: [payload], + replyToId: replyToMessageId == null ? undefined : String(replyToMessageId), + threadId: messageThreadId, + forceDocument: sendOptions.forceDocument, + silent: sendOptions.silent, + durability: "required", + gatewayClientScopes: options?.gatewayClientScopes, + ...(mediaAccess ? { mediaAccess } : {}), + ...(outboundSession ? { session: outboundSession } : {}), + }); + if (durableResult.status === "failed" || durableResult.status === "partial_failed") { + throw durableResult.error; + } + if (durableResult.status === "suppressed") { + throw new Error("Telegram sendMessage was suppressed before delivery."); + } + const result = getLastDurableTelegramActionResult(durableResult); + notifyVisibleOutboundSuccess(to, messageThreadId); return jsonResult({ ok: true, messageId: result.messageId, diff --git a/extensions/telegram/src/outbound-adapter.test.ts b/extensions/telegram/src/outbound-adapter.test.ts index c03a3078990..b75cfb01f76 100644 --- a/extensions/telegram/src/outbound-adapter.test.ts +++ b/extensions/telegram/src/outbound-adapter.test.ts @@ -595,12 +595,14 @@ describe("telegramOutbound", () => { target: { channel: "telegram", to: "12345", accountId: "ops" }, messageId: "tg-1", pin: { enabled: true, notify: true }, + gatewayClientScopes: ["operator.write"], }); const options = callOptionsAt(pinMessageTelegramMock, 0, "12345", "tg-1"); expect(options.accountId).toBe("ops"); expect(options.notify).toBe(true); expect(options.verbose).toBe(false); + expect(options.gatewayClientScopes).toEqual(["operator.write"]); }); it("normalizes legacy durable group retry targets before Telegram pinning", async () => { diff --git a/extensions/telegram/src/outbound-adapter.ts b/extensions/telegram/src/outbound-adapter.ts index c0bf3a4c938..1a5c8c2c0e0 100644 --- a/extensions/telegram/src/outbound-adapter.ts +++ b/extensions/telegram/src/outbound-adapter.ts @@ -227,7 +227,7 @@ export function createTelegramOutboundAdapter( }, }; }, - pinDeliveredMessage: async ({ cfg, target, messageId, pin }) => { + pinDeliveredMessage: async ({ cfg, target, messageId, pin, gatewayClientScopes }) => { const { pinMessageTelegram } = await loadSendModule(); const outboundTo = normalizeTelegramOutboundTarget(target.to); const pinTarget = parseTelegramTarget(outboundTo); @@ -236,6 +236,7 @@ export function createTelegramOutboundAdapter( accountId: target.accountId ?? undefined, notify: pin.notify, verbose: false, + gatewayClientScopes, }); }, resolveEffectiveTextChunkLimit: ({ fallbackLimit }) => diff --git a/src/channels/plugins/outbound.types.ts b/src/channels/plugins/outbound.types.ts index 9d1f1ebcb2e..a347073d381 100644 --- a/src/channels/plugins/outbound.types.ts +++ b/src/channels/plugins/outbound.types.ts @@ -193,6 +193,7 @@ export type ChannelOutboundAdapter = { target: ChannelOutboundTargetRef; messageId: string; pin: ReplyPayloadDeliveryPin; + gatewayClientScopes?: readonly string[]; }) => Promise | void; /** * @deprecated Use shouldTreatDeliveredTextAsVisible instead. diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 2959bad9c7f..e5ceeb84cb9 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -3150,10 +3150,13 @@ describe("deliverOutboundPayloads", () => { channel: "matrix", to: "!room:1", payloads: [{ text: "hello", delivery: { pin: true } }], + gatewayClientScopes: ["operator.write"], }); expect(results).toEqual([{ channel: "matrix", messageId: "mx-1" }]); expect(pinDeliveredMessage).toHaveBeenCalledTimes(1); + const pinCall = requireMockCallArg(pinDeliveredMessage, "pin delivered message"); + expect(pinCall.gatewayClientScopes).toEqual(["operator.write"]); const warnCall = requireMockCall(logMocks.warn, "warn"); expect(warnCall[0]).toBe( "Delivery pin requested, but channel failed to pin delivered message.", diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index d6393332f12..27ad5eb77a2 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -152,6 +152,7 @@ type ChannelHandler = { target: ChannelOutboundTargetRef; messageId: string; pin: ReplyPayloadDeliveryPin; + gatewayClientScopes?: readonly string[]; }) => Promise; afterDeliverPayload?: (params: { target: ChannelOutboundTargetRef; @@ -413,12 +414,13 @@ function createPluginHandler( } : undefined, pinDeliveredMessage: outbound?.pinDeliveredMessage - ? async ({ target, messageId, pin }) => + ? async ({ target, messageId, pin, gatewayClientScopes }) => outbound.pinDeliveredMessage!({ cfg: params.cfg, target, messageId, pin, + gatewayClientScopes, }) : undefined, afterDeliverPayload: outbound?.afterDeliverPayload @@ -880,6 +882,7 @@ async function maybePinDeliveredMessage(params: { payload: ReplyPayload; target: ChannelOutboundTargetRef; messageId?: string; + gatewayClientScopes?: readonly string[]; }): Promise { const pin = normalizeDeliveryPin(params.payload); if (!pin) { @@ -910,6 +913,7 @@ async function maybePinDeliveredMessage(params: { target: params.target, messageId: params.messageId, pin, + gatewayClientScopes: params.gatewayClientScopes, }); } catch (err) { if (pin.required) { @@ -1621,6 +1625,7 @@ async function deliverOutboundPayloadsCore( payload: effectivePayload, target: deliveryTarget, messageId: delivery.messageId, + gatewayClientScopes: params.gatewayClientScopes, }); await maybeNotifyAfterDeliveredPayload({ handler, @@ -1670,6 +1675,7 @@ async function deliverOutboundPayloadsCore( payload: effectivePayload, target: deliveryTarget, messageId: pinMessageId, + gatewayClientScopes: params.gatewayClientScopes, }); await maybeNotifyAfterDeliveredPayload({ handler, @@ -1725,6 +1731,7 @@ async function deliverOutboundPayloadsCore( payload: effectivePayload, target: deliveryTarget, messageId: pinMessageId, + gatewayClientScopes: params.gatewayClientScopes, }); await maybeNotifyAfterDeliveredPayload({ handler, @@ -1767,6 +1774,7 @@ async function deliverOutboundPayloadsCore( payload: effectivePayload, target: deliveryTarget, messageId: firstMessageId, + gatewayClientScopes: params.gatewayClientScopes, }); await maybeNotifyAfterDeliveredPayload({ handler, diff --git a/src/plugin-sdk/channel-outbound.ts b/src/plugin-sdk/channel-outbound.ts index b1f05e3f8e0..736275d1d82 100644 --- a/src/plugin-sdk/channel-outbound.ts +++ b/src/plugin-sdk/channel-outbound.ts @@ -5,7 +5,6 @@ import type { } from "../channels/message/index.js"; import type { ChannelMessageReceiveAdapterShape } from "../channels/message/index.js"; import type { - DurableMessageBatchSendParams, DurableMessageBatchSendResult, DurableMessageSendContext, DurableMessageSendContextParams, @@ -190,7 +189,7 @@ export const deliverInboundReplyWithMessageSendContext: ChannelInboundKernelModu }; export async function sendDurableMessageBatch( - params: DurableMessageBatchSendParams, + params: DurableMessageSendContextParams, ): Promise { const mod = await import("../channels/message/runtime.js"); return await mod.sendDurableMessageBatch(params);