Make Telegram sendMessage actions durable (#87261)

Route Telegram sendMessage action replies through durable outbound delivery so completed agent responses remain retryable when the gateway send path times out.

Verified with focused Telegram/outbound tests, extension test typecheck, prepare build/check/full test gates, and green CI rerun for head 20b45687e1.
This commit is contained in:
Mariano
2026-05-27 14:34:47 +02:00
committed by GitHub
parent 5fb57b533e
commit f3fe48e8b7
9 changed files with 405 additions and 76 deletions

View File

@@ -2,6 +2,12 @@
Docs: https://docs.openclaw.ai
## Unreleased
### Fixes
- Telegram: route `sendMessage` action replies through durable outbound delivery so completed agent responses remain retryable when the gateway send path times out. (#87261) Thanks @mbelinky.
## 2026.5.26
### Highlights

View File

@@ -1,3 +1,4 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
@@ -15,10 +16,127 @@ import {
const originalTelegramActionRuntime = { ...telegramActionRuntime };
const reactMessageTelegram = vi.fn(async () => ({ ok: true }));
const sendMessageTelegram = vi.fn(async () => ({
messageId: "789",
chatId: "123",
}));
const sendMessageTelegram = vi.fn(
async (_to: string, _text: string, _opts?: Record<string, unknown>) => ({
messageId: "789",
chatId: "123",
}),
);
const sendDurableMessageBatch = vi.fn(
async (params: {
cfg: OpenClawConfig;
to: string;
accountId?: string;
payloads: Array<{
text?: string;
mediaUrl?: string;
mediaUrls?: string[];
audioAsVoice?: boolean;
delivery?: {
pin?: true | { enabled?: boolean; notify?: boolean; required?: boolean };
};
channelData?: { telegram?: { buttons?: unknown; quoteText?: string } };
}>;
replyToId?: string;
threadId?: string | number;
forceDocument?: boolean;
silent?: boolean;
gatewayClientScopes?: readonly string[];
session?: {
key?: string;
agentId?: string;
requesterAccountId?: string;
};
mediaAccess?: {
localRoots?: readonly string[];
readFile?: (filePath: string) => Promise<Buffer>;
};
}) => {
const payload = params.payloads[0] ?? {};
const mediaUrls = payload.mediaUrls?.length
? payload.mediaUrls
: payload.mediaUrl
? [payload.mediaUrl]
: [];
const telegramData = payload.channelData?.telegram;
const cfg = params.cfg as {
channels?: {
telegram?: {
botToken?: string;
accounts?: Record<string, { botToken?: string }>;
};
};
};
const token =
(params.accountId
? cfg.channels?.telegram?.accounts?.[params.accountId]?.botToken
: undefined) ??
cfg.channels?.telegram?.botToken ??
process.env.TELEGRAM_BOT_TOKEN;
const baseOptions = {
cfg: params.cfg,
token,
accountId: params.accountId,
gatewayClientScopes: params.gatewayClientScopes,
replyToMessageId:
params.replyToId == null ? undefined : Number.parseInt(params.replyToId, 10),
messageThreadId:
params.threadId == null ? undefined : Number.parseInt(String(params.threadId), 10),
quoteText: telegramData?.quoteText,
asVoice: payload.audioAsVoice,
silent: params.silent,
forceDocument: params.forceDocument,
mediaLocalRoots: params.mediaAccess?.localRoots,
mediaReadFile: params.mediaAccess?.readFile,
};
const calls = mediaUrls.length > 0 ? mediaUrls : [undefined];
let last = { messageId: "789", chatId: "123" };
for (const [index, mediaUrl] of calls.entries()) {
last = await sendMessageTelegram(params.to, index === 0 ? (payload.text ?? "") : "", {
...baseOptions,
...(mediaUrl ? { mediaUrl } : {}),
...(index === 0 && telegramData?.buttons ? { buttons: telegramData.buttons } : {}),
});
}
const pin =
payload.delivery?.pin === true
? { enabled: true }
: payload.delivery?.pin && payload.delivery.pin.enabled
? payload.delivery.pin
: undefined;
if (pin && last.messageId) {
try {
await pinMessageTelegram(params.to, last.messageId, {
cfg: params.cfg,
accountId: params.accountId,
notify: pin.notify,
verbose: false,
gatewayClientScopes: params.gatewayClientScopes,
});
} catch (err) {
if (pin.required) {
throw err;
}
}
}
return {
status: "sent",
results: [{ channel: "telegram", messageId: last.messageId, chatId: last.chatId }],
receipt: {
primaryPlatformMessageId: last.messageId,
platformMessageIds: [last.messageId],
parts: [
{
platformMessageId: last.messageId,
kind: mediaUrls.length > 0 ? "media" : "text",
index: 0,
},
],
sentAt: Date.now(),
},
} as const;
},
);
const sendPollTelegram = vi.fn(async () => ({
messageId: "790",
chatId: "123",
@@ -40,11 +158,13 @@ const editForumTopicTelegram = vi.fn(async () => ({
messageThreadId: 42,
name: "Renamed",
}));
const pinMessageTelegram = vi.fn(async () => ({
ok: true,
messageId: "789",
chatId: "123",
}));
const pinMessageTelegram = vi.fn(
async (_to: string, _messageId: string, _opts?: Record<string, unknown>) => ({
ok: true,
messageId: "789",
chatId: "123",
}),
);
const createForumTopicTelegram = vi.fn(async () => ({
topicId: 99,
name: "Topic",
@@ -109,6 +229,20 @@ function resultDetails(result: Awaited<ReturnType<typeof handleTelegramAction>>)
return requireRecord(result.details, "Telegram action details");
}
function readDurableQueueEntries(stateDir: string): Record<string, unknown>[] {
const queueDir = path.join(stateDir, "delivery-queue");
if (!fs.existsSync(queueDir)) {
return [];
}
return fs
.readdirSync(queueDir)
.filter((name) => name.endsWith(".json"))
.map((name) => JSON.parse(fs.readFileSync(path.join(queueDir, name), "utf-8"))) as Record<
string,
unknown
>[];
}
describe("handleTelegramAction", () => {
const defaultReactionAction = {
action: "react",
@@ -175,11 +309,12 @@ describe("handleTelegramAction", () => {
}
beforeEach(() => {
envSnapshot = captureEnv(["TELEGRAM_BOT_TOKEN"]);
envSnapshot = captureEnv(["OPENCLAW_STATE_DIR", "TELEGRAM_BOT_TOKEN"]);
resetTopicNameCacheForTest();
installTopicNameStoreForTest();
Object.assign(telegramActionRuntime, originalTelegramActionRuntime, {
reactMessageTelegram,
sendDurableMessageBatch,
sendMessageTelegram,
sendPollTelegram,
sendStickerTelegram,
@@ -190,6 +325,7 @@ describe("handleTelegramAction", () => {
createForumTopicTelegram,
});
reactMessageTelegram.mockClear();
sendDurableMessageBatch.mockClear();
sendMessageTelegram.mockClear();
sendPollTelegram.mockClear();
sendStickerTelegram.mockClear();
@@ -417,7 +553,10 @@ describe("handleTelegramAction", () => {
content: "Hello, Telegram!",
},
telegramConfig(),
{ gatewayClientScopes: ["operator.write"] },
{
gatewayClientScopes: ["operator.write"],
sessionKey: "agent:main:telegram:direct:123",
},
);
const call = mockCall(sendMessageTelegram, 0, "text message");
expect(call[0]).toBe("@testchannel");
@@ -425,6 +564,15 @@ describe("handleTelegramAction", () => {
const options = requireRecord(call[2], "text message options");
expect(options.token).toBe("tok");
expect(options.mediaUrl).toBeUndefined();
const durableCall = mockCall(sendDurableMessageBatch, 0, "durable text message");
expect(requireRecord(durableCall[0], "durable text message params")).toMatchObject({
channel: "telegram",
to: "@testchannel",
durability: "required",
gatewayClientScopes: ["operator.write"],
session: { key: "agent:main:telegram:direct:123", agentId: "main" },
payloads: [{ text: "Hello, Telegram!" }],
});
expect(result.content).toStrictEqual([
{
type: "text",
@@ -438,6 +586,135 @@ describe("handleTelegramAction", () => {
});
});
it("persists sendMessage action deliveries before Telegram platform send", async () => {
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-action-durable-"));
const { createOutboundTestPlugin, createTestRegistry, setActivePluginRegistry } =
await import("openclaw/plugin-sdk/plugin-test-runtime");
const sendText = vi
.fn()
.mockImplementationOnce(async () => {
const entries = readDurableQueueEntries(stateDir);
expect(entries).toHaveLength(1);
expect(entries[0]).toMatchObject({
channel: "telegram",
to: "12345",
payloads: [
{
text: "times out after queue write",
delivery: { pin: { enabled: true, required: true } },
},
],
session: { key: "agent:main:telegram:direct:12345", agentId: "main" },
gatewayClientScopes: ["operator.write"],
retryCount: 0,
});
throw new Error("telegram timeout");
})
.mockImplementationOnce(async () => {
const entries = readDurableQueueEntries(stateDir);
const liveEntry = entries.find((entry) =>
JSON.stringify(entry.payloads).includes("delivers after queue write"),
);
expect(liveEntry).toMatchObject({
channel: "telegram",
to: "12345",
payloads: [{ text: "delivers after queue write" }],
retryCount: 0,
});
return { channel: "telegram", messageId: "tg-ok" };
});
process.env.OPENCLAW_STATE_DIR = stateDir;
telegramActionRuntime.sendDurableMessageBatch =
originalTelegramActionRuntime.sendDurableMessageBatch;
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "telegram",
source: "test",
plugin: createOutboundTestPlugin({
id: "telegram",
outbound: {
deliveryMode: "direct",
deliveryCapabilities: {
durableFinal: {
text: true,
media: true,
payload: true,
silent: true,
replyTo: true,
thread: true,
messageSendingHooks: true,
batch: true,
},
},
sendText,
},
}),
},
]),
);
try {
await expect(
handleTelegramAction(
{
action: "sendMessage",
to: "12345",
content: "times out after queue write",
delivery: { pin: { enabled: true, required: true } },
},
telegramConfig(),
{
gatewayClientScopes: ["operator.write"],
sessionKey: "agent:main:telegram:direct:12345",
},
),
).rejects.toThrow("telegram timeout");
const retryableEntries = readDurableQueueEntries(stateDir);
expect(retryableEntries).toHaveLength(1);
expect(retryableEntries[0]).toMatchObject({
payloads: [
{
text: "times out after queue write",
delivery: { pin: { enabled: true, required: true } },
},
],
retryCount: 1,
});
expect(String(retryableEntries[0]?.lastError)).toContain("telegram timeout");
const result = await handleTelegramAction(
{
action: "sendMessage",
to: "12345",
content: "delivers after queue write",
},
telegramConfig(),
{ sessionKey: "agent:main:telegram:direct:12345" },
);
expect(result.details).toMatchObject({
ok: true,
messageId: "tg-ok",
});
expect(readDurableQueueEntries(stateDir)).toHaveLength(1);
expect(readDurableQueueEntries(stateDir)[0]).toMatchObject({
payloads: [
{
text: "times out after queue write",
delivery: { pin: { enabled: true, required: true } },
},
],
retryCount: 1,
});
} finally {
setActivePluginRegistry(createTestRegistry([]));
fs.rmSync(stateDir, { recursive: true, force: true });
}
});
it("normalizes legacy group targets for sendMessage actions", async () => {
await handleTelegramAction(
{
@@ -1092,6 +1369,10 @@ describe("handleTelegramAction", () => {
expect(options.accountId).toBeUndefined();
expect(options.verbose).toBe(false);
expect(options.gatewayClientScopes).toEqual(["operator.write"]);
const durableCall = mockCall(sendDurableMessageBatch, 0, "durable delivery pin");
expect(requireRecord(durableCall[0], "durable delivery pin params")).toMatchObject({
payloads: [{ delivery: { pin: { enabled: true } } }],
});
});
it("passes delivery pin notify requests for action sends", async () => {
@@ -1109,6 +1390,10 @@ describe("handleTelegramAction", () => {
expect(call[0]).toBe("123456");
expect(call[1]).toBe("789");
expect(requireRecord(call[2], "delivery pin notify options").notify).toBe(true);
const durableCall = mockCall(sendDurableMessageBatch, 0, "durable delivery pin notify");
expect(requireRecord(durableCall[0], "durable delivery pin notify params")).toMatchObject({
payloads: [{ delivery: { pin: { enabled: true, notify: true } } }],
});
});
it("fails required action-send pins when pinning fails", async () => {

View File

@@ -10,12 +10,18 @@ import {
resolvePollMaxSelections,
resolveReactionMessageId,
} from "openclaw/plugin-sdk/channel-actions";
import {
buildOutboundSessionContext,
sendDurableMessageBatch,
type DurableMessageBatchSendResult,
} from "openclaw/plugin-sdk/channel-outbound";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import {
normalizeMessagePresentation,
renderMessagePresentationFallbackText,
} from "openclaw/plugin-sdk/interactive-runtime";
import type { MessagePresentation } from "openclaw/plugin-sdk/interactive-runtime";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import { resolveStorePath } from "openclaw/plugin-sdk/session-store-runtime";
import {
createTelegramActionGate,
@@ -56,6 +62,7 @@ export const telegramActionRuntime = {
pinMessageTelegram,
reactMessageTelegram,
searchStickers,
sendDurableMessageBatch,
sendMessageTelegram,
sendPollTelegram,
sendStickerTelegram,
@@ -257,37 +264,42 @@ function normalizeTelegramDeliveryPin(params: Record<string, unknown>) {
} as const;
}
async function maybePinTelegramActionSend(params: {
args: Record<string, unknown>;
cfg: OpenClawConfig;
accountId?: string;
to: string;
messageId?: string;
gatewayClientScopes?: readonly string[];
}) {
const pin = normalizeTelegramDeliveryPin(params.args);
if (!pin) {
return;
}
if (!params.messageId) {
if (pin.required) {
throw new Error("Telegram delivery pin requested, but no message id was returned.");
}
return;
}
try {
await telegramActionRuntime.pinMessageTelegram(params.to, params.messageId, {
cfg: params.cfg,
accountId: params.accountId,
notify: pin.notify,
verbose: false,
gatewayClientScopes: params.gatewayClientScopes,
});
} catch (err) {
if (pin.required) {
throw err;
}
}
function buildTelegramActionSendPayload(params: {
content: string;
mediaUrls: string[];
asVoice?: boolean;
pin?: ReturnType<typeof normalizeTelegramDeliveryPin>;
buttons?: ReturnType<typeof resolveTelegramButtonsFromParams>;
quoteText?: string;
}): ReplyPayload {
const telegramData =
params.buttons || params.quoteText
? {
...(params.buttons ? { buttons: params.buttons } : {}),
...(params.quoteText ? { quoteText: params.quoteText } : {}),
}
: undefined;
return {
text: params.content,
...(params.mediaUrls.length > 0 ? { mediaUrls: params.mediaUrls } : {}),
...(params.asVoice === true ? { audioAsVoice: true } : {}),
...(params.pin ? { delivery: { pin: params.pin } } : {}),
...(telegramData ? { channelData: { telegram: telegramData } } : {}),
};
}
function getLastDurableTelegramActionResult(
result: Extract<DurableMessageBatchSendResult, { status: "sent" }>,
): { messageId?: string; chatId?: string } {
const lastResult = result.results.at(-1);
const receipt = result.receipt;
return {
messageId:
lastResult?.messageId ??
receipt.primaryPlatformMessageId ??
receipt.platformMessageIds.at(-1),
chatId: lastResult?.chatId,
};
}
export async function handleTelegramAction(
@@ -455,10 +467,7 @@ export async function handleTelegramAction(
}
const sendOptions = {
cfg,
token,
accountId: accountId ?? undefined,
mediaLocalRoots: options?.mediaLocalRoots,
mediaReadFile: options?.mediaReadFile,
gatewayClientScopes: options?.gatewayClientScopes,
replyToMessageId: replyToMessageId ?? undefined,
messageThreadId: messageThreadId ?? undefined,
@@ -470,34 +479,49 @@ export async function handleTelegramAction(
readBooleanParam(params, "asDocument") ??
false,
};
let result: Awaited<ReturnType<typeof telegramActionRuntime.sendMessageTelegram>>;
if (!firstMediaUrl) {
result = await telegramActionRuntime.sendMessageTelegram(to, content, {
...sendOptions,
buttons,
});
} else {
result = await telegramActionRuntime.sendMessageTelegram(to, content, {
...sendOptions,
mediaUrl: firstMediaUrl,
buttons,
});
for (const mediaUrl of mediaUrls.slice(1)) {
result = await telegramActionRuntime.sendMessageTelegram(to, "", {
...sendOptions,
mediaUrl,
});
}
}
notifyVisibleOutboundSuccess(to, messageThreadId);
await maybePinTelegramActionSend({
args: params,
cfg,
accountId: accountId ?? undefined,
to,
messageId: result.messageId,
gatewayClientScopes: options?.gatewayClientScopes,
const payload = buildTelegramActionSendPayload({
content,
mediaUrls,
asVoice: sendOptions.asVoice,
pin: normalizeTelegramDeliveryPin(params),
buttons,
quoteText,
});
const mediaAccess =
options?.mediaLocalRoots || options?.mediaReadFile
? {
...(options.mediaLocalRoots ? { localRoots: options.mediaLocalRoots } : {}),
...(options.mediaReadFile ? { readFile: options.mediaReadFile } : {}),
}
: undefined;
const outboundSession = buildOutboundSessionContext({
cfg,
sessionKey: options?.sessionKey,
requesterAccountId: accountId,
});
const durableResult = await telegramActionRuntime.sendDurableMessageBatch({
cfg,
channel: "telegram",
to,
accountId: accountId ?? undefined,
payloads: [payload],
replyToId: replyToMessageId == null ? undefined : String(replyToMessageId),
threadId: messageThreadId,
forceDocument: sendOptions.forceDocument,
silent: sendOptions.silent,
durability: "required",
gatewayClientScopes: options?.gatewayClientScopes,
...(mediaAccess ? { mediaAccess } : {}),
...(outboundSession ? { session: outboundSession } : {}),
});
if (durableResult.status === "failed" || durableResult.status === "partial_failed") {
throw durableResult.error;
}
if (durableResult.status === "suppressed") {
throw new Error("Telegram sendMessage was suppressed before delivery.");
}
const result = getLastDurableTelegramActionResult(durableResult);
notifyVisibleOutboundSuccess(to, messageThreadId);
return jsonResult({
ok: true,
messageId: result.messageId,

View File

@@ -595,12 +595,14 @@ describe("telegramOutbound", () => {
target: { channel: "telegram", to: "12345", accountId: "ops" },
messageId: "tg-1",
pin: { enabled: true, notify: true },
gatewayClientScopes: ["operator.write"],
});
const options = callOptionsAt(pinMessageTelegramMock, 0, "12345", "tg-1");
expect(options.accountId).toBe("ops");
expect(options.notify).toBe(true);
expect(options.verbose).toBe(false);
expect(options.gatewayClientScopes).toEqual(["operator.write"]);
});
it("normalizes legacy durable group retry targets before Telegram pinning", async () => {

View File

@@ -227,7 +227,7 @@ export function createTelegramOutboundAdapter(
},
};
},
pinDeliveredMessage: async ({ cfg, target, messageId, pin }) => {
pinDeliveredMessage: async ({ cfg, target, messageId, pin, gatewayClientScopes }) => {
const { pinMessageTelegram } = await loadSendModule();
const outboundTo = normalizeTelegramOutboundTarget(target.to);
const pinTarget = parseTelegramTarget(outboundTo);
@@ -236,6 +236,7 @@ export function createTelegramOutboundAdapter(
accountId: target.accountId ?? undefined,
notify: pin.notify,
verbose: false,
gatewayClientScopes,
});
},
resolveEffectiveTextChunkLimit: ({ fallbackLimit }) =>

View File

@@ -193,6 +193,7 @@ export type ChannelOutboundAdapter = {
target: ChannelOutboundTargetRef;
messageId: string;
pin: ReplyPayloadDeliveryPin;
gatewayClientScopes?: readonly string[];
}) => Promise<void> | void;
/**
* @deprecated Use shouldTreatDeliveredTextAsVisible instead.

View File

@@ -3150,10 +3150,13 @@ describe("deliverOutboundPayloads", () => {
channel: "matrix",
to: "!room:1",
payloads: [{ text: "hello", delivery: { pin: true } }],
gatewayClientScopes: ["operator.write"],
});
expect(results).toEqual([{ channel: "matrix", messageId: "mx-1" }]);
expect(pinDeliveredMessage).toHaveBeenCalledTimes(1);
const pinCall = requireMockCallArg(pinDeliveredMessage, "pin delivered message");
expect(pinCall.gatewayClientScopes).toEqual(["operator.write"]);
const warnCall = requireMockCall(logMocks.warn, "warn");
expect(warnCall[0]).toBe(
"Delivery pin requested, but channel failed to pin delivered message.",

View File

@@ -152,6 +152,7 @@ type ChannelHandler = {
target: ChannelOutboundTargetRef;
messageId: string;
pin: ReplyPayloadDeliveryPin;
gatewayClientScopes?: readonly string[];
}) => Promise<void>;
afterDeliverPayload?: (params: {
target: ChannelOutboundTargetRef;
@@ -413,12 +414,13 @@ function createPluginHandler(
}
: undefined,
pinDeliveredMessage: outbound?.pinDeliveredMessage
? async ({ target, messageId, pin }) =>
? async ({ target, messageId, pin, gatewayClientScopes }) =>
outbound.pinDeliveredMessage!({
cfg: params.cfg,
target,
messageId,
pin,
gatewayClientScopes,
})
: undefined,
afterDeliverPayload: outbound?.afterDeliverPayload
@@ -880,6 +882,7 @@ async function maybePinDeliveredMessage(params: {
payload: ReplyPayload;
target: ChannelOutboundTargetRef;
messageId?: string;
gatewayClientScopes?: readonly string[];
}): Promise<void> {
const pin = normalizeDeliveryPin(params.payload);
if (!pin) {
@@ -910,6 +913,7 @@ async function maybePinDeliveredMessage(params: {
target: params.target,
messageId: params.messageId,
pin,
gatewayClientScopes: params.gatewayClientScopes,
});
} catch (err) {
if (pin.required) {
@@ -1621,6 +1625,7 @@ async function deliverOutboundPayloadsCore(
payload: effectivePayload,
target: deliveryTarget,
messageId: delivery.messageId,
gatewayClientScopes: params.gatewayClientScopes,
});
await maybeNotifyAfterDeliveredPayload({
handler,
@@ -1670,6 +1675,7 @@ async function deliverOutboundPayloadsCore(
payload: effectivePayload,
target: deliveryTarget,
messageId: pinMessageId,
gatewayClientScopes: params.gatewayClientScopes,
});
await maybeNotifyAfterDeliveredPayload({
handler,
@@ -1725,6 +1731,7 @@ async function deliverOutboundPayloadsCore(
payload: effectivePayload,
target: deliveryTarget,
messageId: pinMessageId,
gatewayClientScopes: params.gatewayClientScopes,
});
await maybeNotifyAfterDeliveredPayload({
handler,
@@ -1767,6 +1774,7 @@ async function deliverOutboundPayloadsCore(
payload: effectivePayload,
target: deliveryTarget,
messageId: firstMessageId,
gatewayClientScopes: params.gatewayClientScopes,
});
await maybeNotifyAfterDeliveredPayload({
handler,

View File

@@ -5,7 +5,6 @@ import type {
} from "../channels/message/index.js";
import type { ChannelMessageReceiveAdapterShape } from "../channels/message/index.js";
import type {
DurableMessageBatchSendParams,
DurableMessageBatchSendResult,
DurableMessageSendContext,
DurableMessageSendContextParams,
@@ -190,7 +189,7 @@ export const deliverInboundReplyWithMessageSendContext: ChannelInboundKernelModu
};
export async function sendDurableMessageBatch(
params: DurableMessageBatchSendParams,
params: DurableMessageSendContextParams,
): Promise<DurableMessageBatchSendResult> {
const mod = await import("../channels/message/runtime.js");
return await mod.sendDurableMessageBatch(params);