refactor: migrate bundled plugins to message lifecycle

This commit is contained in:
Peter Steinberger
2026-05-06 01:40:53 +01:00
parent 2ead1502c9
commit 05eda57b3c
223 changed files with 8568 additions and 1354 deletions

View File

@@ -368,6 +368,7 @@ export function createTelegramBotCore(
};
const updateTracker = createTelegramUpdateTracker({
initialUpdateId,
ackPolicy: "after_agent_dispatch",
...(typeof opts.updateOffset?.onUpdateId === "function"
? { onAcceptedUpdateId: opts.updateOffset.onUpdateId }
: {}),

View File

@@ -1,4 +1,7 @@
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import {
createChannelMessageReplyPipeline,
deliverInboundReplyWithMessageSendContext,
} from "openclaw/plugin-sdk/channel-message";
import { readChannelAllowFromStore } from "openclaw/plugin-sdk/conversation-runtime";
import { upsertChannelPairingRequest } from "openclaw/plugin-sdk/conversation-runtime";
import { buildModelsProviderData } from "openclaw/plugin-sdk/models-provider-runtime";
@@ -32,9 +35,10 @@ export type TelegramBotDeps = {
resolveExecApproval?: typeof resolveTelegramExecApproval;
createTelegramDraftStream?: typeof createTelegramDraftStream;
deliverReplies?: typeof deliverReplies;
deliverInboundReplyWithMessageSendContext?: typeof deliverInboundReplyWithMessageSendContext;
emitInternalMessageSentHook?: typeof emitInternalMessageSentHook;
editMessageTelegram?: typeof editMessageTelegram;
createChannelReplyPipeline?: typeof createChannelReplyPipeline;
createChannelMessageReplyPipeline?: typeof createChannelMessageReplyPipeline;
};
export const defaultTelegramBotDeps: TelegramBotDeps = {
@@ -83,13 +87,16 @@ export const defaultTelegramBotDeps: TelegramBotDeps = {
get deliverReplies() {
return deliverReplies;
},
get deliverInboundReplyWithMessageSendContext() {
return deliverInboundReplyWithMessageSendContext;
},
get emitInternalMessageSentHook() {
return emitInternalMessageSentHook;
},
get editMessageTelegram() {
return editMessageTelegram;
},
get createChannelReplyPipeline() {
return createChannelReplyPipeline;
get createChannelMessageReplyPipeline() {
return createChannelMessageReplyPipeline;
},
};

View File

@@ -18,6 +18,7 @@ const dispatchReplyWithBufferedBlockDispatcher = vi.hoisted(() =>
vi.fn<(params: DispatchReplyWithBufferedBlockDispatcherArgs) => Promise<unknown>>(),
);
const deliverReplies = vi.hoisted(() => vi.fn());
const deliverInboundReplyWithMessageSendContext = vi.hoisted(() => vi.fn());
const emitInternalMessageSentHook = vi.hoisted(() => vi.fn());
const createForumTopicTelegram = vi.hoisted(() => vi.fn());
const deleteMessageTelegram = vi.hoisted(() => vi.fn());
@@ -45,7 +46,7 @@ const buildModelsProviderData = vi.hoisted(() =>
})),
);
const listSkillCommandsForAgents = vi.hoisted(() => vi.fn(() => []));
const createChannelReplyPipeline = vi.hoisted(() =>
const createChannelMessageReplyPipeline = vi.hoisted(() =>
vi.fn(() => ({
responsePrefix: undefined,
responsePrefixContextProvider: () => ({ identityName: undefined }),
@@ -79,6 +80,14 @@ vi.mock("./draft-stream.js", () => ({
createTelegramDraftStream,
}));
vi.mock("openclaw/plugin-sdk/channel-message", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/channel-message")>();
return {
...actual,
deliverInboundReplyWithMessageSendContext,
};
});
vi.mock("./bot/delivery.js", () => ({
deliverReplies,
emitInternalMessageSentHook,
@@ -146,12 +155,14 @@ const telegramDepsForTest: TelegramBotDeps = {
buildModelsProviderData: buildModelsProviderData as TelegramBotDeps["buildModelsProviderData"],
listSkillCommandsForAgents:
listSkillCommandsForAgents as TelegramBotDeps["listSkillCommandsForAgents"],
createChannelReplyPipeline:
createChannelReplyPipeline as TelegramBotDeps["createChannelReplyPipeline"],
createChannelMessageReplyPipeline:
createChannelMessageReplyPipeline as TelegramBotDeps["createChannelMessageReplyPipeline"],
wasSentByBot: wasSentByBot as TelegramBotDeps["wasSentByBot"],
createTelegramDraftStream:
createTelegramDraftStream as TelegramBotDeps["createTelegramDraftStream"],
deliverReplies: deliverReplies as TelegramBotDeps["deliverReplies"],
deliverInboundReplyWithMessageSendContext:
deliverInboundReplyWithMessageSendContext as TelegramBotDeps["deliverInboundReplyWithMessageSendContext"],
emitInternalMessageSentHook:
emitInternalMessageSentHook as TelegramBotDeps["emitInternalMessageSentHook"],
editMessageTelegram: editMessageTelegram as TelegramBotDeps["editMessageTelegram"],
@@ -173,6 +184,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
createTelegramDraftStream.mockReset();
dispatchReplyWithBufferedBlockDispatcher.mockReset();
deliverReplies.mockReset();
deliverInboundReplyWithMessageSendContext.mockReset();
emitInternalMessageSentHook.mockReset();
createForumTopicTelegram.mockReset();
deleteMessageTelegram.mockReset();
@@ -188,7 +200,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
enqueueSystemEvent.mockReset();
buildModelsProviderData.mockReset();
listSkillCommandsForAgents.mockReset();
createChannelReplyPipeline.mockReset();
createChannelMessageReplyPipeline.mockReset();
wasSentByBot.mockReset();
loadSessionStore.mockReset();
resolveStorePath.mockReset();
@@ -209,6 +221,10 @@ describe("dispatchTelegramMessage draft streaming", () => {
counts: { block: 0, final: 0, tool: 0 },
});
deliverReplies.mockResolvedValue({ delivered: true });
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "unsupported",
reason: "missing_outbound_handler",
});
emitInternalMessageSentHook.mockResolvedValue(undefined);
createForumTopicTelegram.mockResolvedValue({ message_thread_id: 777 });
deleteMessageTelegram.mockResolvedValue(true);
@@ -231,7 +247,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
modelNames: new Map<string, string>(),
});
listSkillCommandsForAgents.mockReturnValue([]);
createChannelReplyPipeline.mockReturnValue({
createChannelMessageReplyPipeline.mockReturnValue({
responsePrefix: undefined,
responsePrefixContextProvider: () => ({ identityName: undefined }),
onModelSelected: () => undefined,
@@ -448,7 +464,95 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("queues final Telegram replies through outbound delivery when available", async () => {
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: {
messageIds: ["1001"],
visibleReplySent: true,
},
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello queued" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext({
ctxPayload: {
SessionKey: "s1",
ChatType: "direct",
SenderId: "42",
SenderName: "Alice",
SenderUsername: "alice",
} as unknown as TelegramMessageContext["ctxPayload"],
}),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
expect(deliverInboundReplyWithMessageSendContext).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "123",
accountId: "default",
payload: expect.objectContaining({ text: "Hello queued" }),
info: { kind: "final" },
replyToMode: "first",
threadId: 777,
formatting: expect.objectContaining({ textLimit: 4096, tableMode: "preserve" }),
agentId: "default",
ctxPayload: expect.objectContaining({
SessionKey: "s1",
ChatType: "direct",
SenderId: "42",
SenderName: "Alice",
SenderUsername: "alice",
}),
}),
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("queues media-only final Telegram replies through outbound delivery when available", async () => {
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: {
messageIds: ["1002"],
visibleReplySent: true,
},
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ mediaUrl: "file:///tmp/final.png" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext(),
streamMode: "off",
telegramDeps: telegramDepsForTest,
});
expect(deliverInboundReplyWithMessageSendContext).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
payload: expect.objectContaining({ mediaUrl: "file:///tmp/final.png" }),
info: { kind: "final" },
requiredCapabilities: expect.objectContaining({
media: true,
payload: true,
}),
}),
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("skips answer draft preview for same-chat selected quotes", async () => {
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "unsupported",
reason: "capability_mismatch",
capability: "nativeQuote",
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello", replyToId: "1001" }, { kind: "final" });
return { queuedFinal: true };
@@ -478,6 +582,13 @@ describe("dispatchTelegramMessage draft streaming", () => {
replyQuoteText: " quoted slice\n",
}),
);
expect(deliverInboundReplyWithMessageSendContext).toHaveBeenCalledWith(
expect.objectContaining({
requiredCapabilities: expect.objectContaining({
nativeQuote: true,
}),
}),
);
});
it("keeps answer draft preview for current message replies with native quote candidates", async () => {
@@ -1066,6 +1177,35 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
});
it("queues silent error replies through durable delivery with silent preserved", async () => {
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: {
messageIds: ["durable-silent"],
visibleReplySent: true,
},
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "oops", isError: true }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({
context: createContext(),
telegramCfg: { silentErrorReplies: true },
});
expect(deliverInboundReplyWithMessageSendContext).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
payload: expect.objectContaining({ isError: true }),
silent: true,
}),
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("keeps error replies notifying by default", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "oops", isError: true }, { kind: "final" });
@@ -4327,6 +4467,13 @@ describe("dispatchTelegramMessage draft streaming", () => {
},
);
deliverReplies.mockResolvedValue({ delivered: true });
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",
delivery: {
messageIds: ["2002"],
visibleReplySent: true,
},
});
const preConnectErr = new Error("connect ECONNREFUSED 149.154.167.220:443");
(preConnectErr as NodeJS.ErrnoException).code = "ECONNREFUSED";
editMessageTelegram.mockRejectedValue(preConnectErr);
@@ -4334,13 +4481,20 @@ describe("dispatchTelegramMessage draft streaming", () => {
await dispatchWithContext({ context: createContext() });
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
const deliverCalls = deliverReplies.mock.calls;
const finalTextSentViaDeliverReplies = deliverCalls.some((call: unknown[]) =>
(call[0] as { replies?: Array<{ text?: string }> })?.replies?.some(
(r: { text?: string }) => r.text === "Final answer",
),
expect(deliverInboundReplyWithMessageSendContext).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "123",
accountId: "default",
agentId: "default",
payload: expect.objectContaining({ text: "Final answer" }),
info: { kind: "final" },
replyToMode: "first",
threadId: 777,
formatting: expect.objectContaining({ textLimit: 4096, tableMode: "preserve" }),
}),
);
expect(finalTextSentViaDeliverReplies).toBe(true);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("falls back when Telegram reports the current final edit target missing", async () => {

View File

@@ -5,7 +5,10 @@ import {
logTypingFailure,
removeAckReactionAfterReply,
} from "openclaw/plugin-sdk/channel-feedback";
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import {
createChannelMessageReplyPipeline,
deriveDurableFinalDeliveryRequirements,
} from "openclaw/plugin-sdk/channel-message";
import {
createChannelProgressDraftGate,
formatChannelProgressDraftLine,
@@ -821,16 +824,69 @@ export const dispatchTelegramMessage = async ({
}
return { ...payload, replyToId: implicitQuoteReplyTargetId };
};
const usesNativeTelegramQuote = (payload: ReplyPayload): boolean => {
if (replyQuoteText != null) {
return true;
}
return payload.replyToId != null && replyQuoteByMessageId[payload.replyToId] != null;
};
let lastVisibleNonPreviewDeliveryAtMs: number | undefined;
const sendPayload = async (payload: ReplyPayload) => {
const sendPayload = async (
payload: ReplyPayload,
options?: { durable?: boolean; silent?: boolean },
) => {
if (isDispatchSuperseded()) {
return false;
}
const deliverablePayload = applyQuoteReplyTarget(payload);
const silent = options?.silent ?? (silentErrorReplies && payload.isError === true);
const durableDelivery = telegramDeps.deliverInboundReplyWithMessageSendContext;
if (options?.durable && durableDelivery) {
const durable = await durableDelivery({
cfg,
channel: "telegram",
to: String(chatId),
accountId: route.accountId,
agentId: route.agentId,
ctxPayload,
payload: deliverablePayload,
info: { kind: "final" },
replyToMode,
threadId: threadSpec.id,
formatting: {
textLimit,
tableMode,
chunkMode,
},
silent,
requiredCapabilities: deriveDurableFinalDeliveryRequirements({
payload: deliverablePayload,
replyToId: deliverablePayload.replyToId,
threadId: threadSpec.id,
silent,
payloadTransport: true,
extraCapabilities: {
nativeQuote: usesNativeTelegramQuote(deliverablePayload),
},
}),
});
if (durable.status === "failed") {
throw durable.error;
}
if (durable.status === "handled_visible") {
deliveryState.markDelivered();
lastVisibleNonPreviewDeliveryAtMs = Date.now();
return true;
}
if (durable.status === "handled_no_send") {
return false;
}
}
const result = await (telegramDeps.deliverReplies ?? deliverReplies)({
...deliveryBaseOptions,
replies: [applyQuoteReplyTarget(payload)],
replies: [deliverablePayload],
onVoiceRecording: sendRecordVoice,
silent: silentErrorReplies && payload.isError === true,
silent,
mediaLoader: telegramDeps.loadWebMedia,
});
if (result.delivered) {
@@ -918,7 +974,7 @@ export const dispatchTelegramMessage = async ({
}
const { onModelSelected, ...replyPipeline } = (
telegramDeps.createChannelReplyPipeline ?? createChannelReplyPipeline
telegramDeps.createChannelMessageReplyPipeline ?? createChannelMessageReplyPipeline
)({
cfg,
agentId: route.agentId,
@@ -1076,7 +1132,9 @@ export const dispatchTelegramMessage = async ({
const payloadWithoutSuppressedReasoning =
typeof payload.text === "string" ? { ...payload, text: "" } : payload;
markVisibleNonPreviewBoundary(
await sendPayload(payloadWithoutSuppressedReasoning),
await sendPayload(payloadWithoutSuppressedReasoning, {
durable: info.kind === "final",
}),
);
}
if (info.kind === "final") {
@@ -1099,7 +1157,9 @@ export const dispatchTelegramMessage = async ({
}
return;
}
markVisibleNonPreviewBoundary(await sendPayload(payload));
markVisibleNonPreviewBoundary(
await sendPayload(payload, { durable: info.kind === "final" }),
);
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;

View File

@@ -1,4 +1,4 @@
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message";
import { deliverReplies, emitTelegramMessageSentHooks } from "./bot/delivery.js";
export { createChannelReplyPipeline, deliverReplies, emitTelegramMessageSentHooks };
export { createChannelMessageReplyPipeline, deliverReplies, emitTelegramMessageSentHooks };

View File

@@ -25,7 +25,7 @@ type EnsureConfiguredBindingRouteReadyFn =
type GetAgentScopedMediaLocalRootsFn =
typeof import("./bot-native-commands.runtime.js").getAgentScopedMediaLocalRoots;
type CreateChannelReplyPipelineFn =
typeof import("./bot-native-commands.delivery.runtime.js").createChannelReplyPipeline;
typeof import("./bot-native-commands.delivery.runtime.js").createChannelMessageReplyPipeline;
type AnyMock = MockFn<(...args: unknown[]) => unknown>;
type AnyAsyncMock = MockFn<(...args: unknown[]) => Promise<unknown>>;
type NativeCommandHarness = {
@@ -57,7 +57,7 @@ const replyPipelineMocks = vi.hoisted(() => {
dispatchReplyWithBufferedBlockDispatcher: vi.fn(
(async () => dispatchReplyResult) as DispatchReplyWithBufferedBlockDispatcherFn,
),
createChannelReplyPipeline: vi.fn((() => ({
createChannelMessageReplyPipeline: vi.fn((() => ({
onModelSelected: () => {},
responsePrefixContextProvider: () => undefined,
})) as unknown as CreateChannelReplyPipelineFn),
@@ -84,7 +84,7 @@ vi.mock("./bot-native-commands.runtime.js", () => ({
getAgentScopedMediaLocalRoots: replyPipelineMocks.getAgentScopedMediaLocalRoots,
}));
vi.mock("./bot-native-commands.delivery.runtime.js", () => ({
createChannelReplyPipeline: replyPipelineMocks.createChannelReplyPipeline,
createChannelMessageReplyPipeline: replyPipelineMocks.createChannelMessageReplyPipeline,
deliverReplies: deliveryMocks.deliverReplies,
emitTelegramMessageSentHooks: vi.fn(),
}));

View File

@@ -1115,9 +1115,9 @@ export const registerTelegramNativeCommands = ({
skippedNonSilent: 0,
};
const { createChannelReplyPipeline, deliverReplies } =
const { createChannelMessageReplyPipeline, deliverReplies } =
await loadTelegramNativeCommandDeliveryRuntime();
const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({
const { onModelSelected, ...replyPipeline } = createChannelMessageReplyPipeline({
cfg: executionCfg,
agentId: route.agentId,
channel: "telegram",

View File

@@ -62,6 +62,44 @@ describe("createTelegramUpdateTracker", () => {
} satisfies Partial<TelegramUpdateTrackerState>);
});
it("can persist offsets only after successful agent dispatch", async () => {
const onAcceptedUpdateId = vi.fn();
const tracker = createTelegramUpdateTracker({
initialUpdateId: 100,
ackPolicy: "after_agent_dispatch",
onAcceptedUpdateId,
});
const update101 = tracker.beginUpdate(updateCtx(101));
if (!update101.accepted) {
throw new Error("expected update 101 to be accepted");
}
await flushTrackerMicrotasks();
expect(onAcceptedUpdateId).not.toHaveBeenCalled();
tracker.finishUpdate(update101.update, { completed: false });
await flushTrackerMicrotasks();
expect(onAcceptedUpdateId).not.toHaveBeenCalled();
expect(tracker.getState()).toMatchObject({
failedUpdateIds: [101],
highestPersistedAcceptedUpdateId: 100,
} satisfies Partial<TelegramUpdateTrackerState>);
const retry = tracker.beginUpdate(updateCtx(101));
if (!retry.accepted) {
throw new Error("expected update 101 retry to be accepted");
}
tracker.finishUpdate(retry.update, { completed: true });
await flushTrackerMicrotasks();
expect(onAcceptedUpdateId).toHaveBeenCalledWith(101);
expect(tracker.getState()).toMatchObject({
failedUpdateIds: [],
highestPersistedAcceptedUpdateId: 101,
safeCompletedUpdateId: 101,
} satisfies Partial<TelegramUpdateTrackerState>);
});
it("skips restart replays once the accepted offset is restored", async () => {
const onAcceptedUpdateId = vi.fn();
const firstProcess = createTelegramUpdateTracker({

View File

@@ -1,3 +1,8 @@
import {
createMessageReceiveContext,
type MessageAckPolicy,
type MessageReceiveContext,
} from "openclaw/plugin-sdk/channel-message";
import {
buildTelegramUpdateKey,
createTelegramUpdateDedupe,
@@ -9,6 +14,7 @@ type PersistUpdateId = (updateId: number) => void | Promise<void>;
type TelegramUpdateTrackerOptions = {
initialUpdateId?: number | null;
ackPolicy?: MessageAckPolicy;
onAcceptedUpdateId?: PersistUpdateId;
onPersistError?: (error: unknown) => void;
onSkip?: (key: string) => void;
@@ -17,6 +23,7 @@ type TelegramUpdateTrackerOptions = {
type AcceptedTelegramUpdate = {
key?: string;
updateId?: number;
receiveContext?: MessageReceiveContext<TelegramUpdateKeyContext>;
};
type BeginUpdateResult =
@@ -49,6 +56,7 @@ function sortedIds(ids: Set<number>): number[] {
export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOptions = {}) {
const initialUpdateId =
typeof options.initialUpdateId === "number" ? options.initialUpdateId : null;
const ackPolicy = options.ackPolicy ?? "after_receive_record";
const recentUpdates = createTelegramUpdateDedupe();
const pendingUpdateKeys = new Set<string>();
const activeHandledUpdateKeys = new Map<string, boolean>();
@@ -114,7 +122,44 @@ export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOption
return;
}
highestAcceptedUpdateId = updateId;
requestPersistAcceptedUpdateId(updateId);
};
function resolveSafeCompletedUpdateId() {
if (highestCompletedUpdateId === null) {
return null;
}
let safeCompletedUpdateId = highestCompletedUpdateId;
for (const updateId of pendingUpdateIds) {
if (updateId <= safeCompletedUpdateId) {
safeCompletedUpdateId = updateId - 1;
}
}
for (const updateId of failedUpdateIds) {
if (updateId <= safeCompletedUpdateId) {
safeCompletedUpdateId = updateId - 1;
}
}
return safeCompletedUpdateId;
}
const persistUpdateIdAfterAck = async (updateId: number) => {
const persistUpdateId =
ackPolicy === "after_agent_dispatch" ? resolveSafeCompletedUpdateId() : updateId;
if (persistUpdateId !== null) {
requestPersistAcceptedUpdateId(persistUpdateId);
}
};
const ackUpdateAfterStage = (
receiveContext: MessageReceiveContext<TelegramUpdateKeyContext> | undefined,
stage: "receive_record" | "agent_dispatch",
) => {
if (!receiveContext?.shouldAckAfter(stage)) {
return;
}
void receiveContext.ack().catch((err) => {
options.onPersistError?.(err);
});
};
const beginUpdate = (ctx: TelegramUpdateKeyContext): BeginUpdateResult => {
@@ -138,15 +183,25 @@ export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOption
pendingUpdateKeys.add(updateKey);
activeHandledUpdateKeys.set(updateKey, false);
}
let receiveContext: MessageReceiveContext<TelegramUpdateKeyContext> | undefined;
if (typeof updateId === "number") {
pendingUpdateIds.add(updateId);
acceptUpdateId(updateId);
receiveContext = createMessageReceiveContext({
id: updateKey ?? `telegram:update:${updateId}`,
channel: "telegram",
message: ctx,
ackPolicy,
onAck: () => persistUpdateIdAfterAck(updateId),
});
ackUpdateAfterStage(receiveContext, "receive_record");
}
return {
accepted: true,
update: {
...(updateKey ? { key: updateKey } : {}),
...(typeof updateId === "number" ? { updateId } : {}),
...(receiveContext ? { receiveContext } : {}),
},
};
};
@@ -166,8 +221,14 @@ export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOption
if (highestCompletedUpdateId === null || update.updateId > highestCompletedUpdateId) {
highestCompletedUpdateId = update.updateId;
}
ackUpdateAfterStage(update.receiveContext, "agent_dispatch");
} else {
failedUpdateIds.add(update.updateId);
void update.receiveContext
?.nack(new Error("Telegram update handler did not complete"))
.catch((err) => {
options.onPersistError?.(err);
});
}
}
};
@@ -197,24 +258,6 @@ export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOption
return skipped;
};
const resolveSafeCompletedUpdateId = () => {
if (highestCompletedUpdateId === null) {
return null;
}
let safeCompletedUpdateId = highestCompletedUpdateId;
for (const updateId of pendingUpdateIds) {
if (updateId <= safeCompletedUpdateId) {
safeCompletedUpdateId = updateId - 1;
}
}
for (const updateId of failedUpdateIds) {
if (updateId <= safeCompletedUpdateId) {
safeCompletedUpdateId = updateId - 1;
}
}
return safeCompletedUpdateId;
};
const getState = (): TelegramUpdateTrackerState => ({
highestAcceptedUpdateId,
highestPersistedAcceptedUpdateId,

View File

@@ -1186,7 +1186,7 @@ describe("createTelegramBot", () => {
expect(replySpy).toHaveBeenCalledTimes(1);
});
it("persists accepted update offsets before completion", async () => {
it("persists update offsets after successful dispatch completion", async () => {
// For this test we need sequentialize(...) to behave like a normal middleware and call next().
sequentializeSpy.mockImplementationOnce(
() => async (_ctx: unknown, next: () => Promise<void>) => {
@@ -1243,18 +1243,20 @@ describe("createTelegramBot", () => {
// Start processing update 101 but keep it pending (simulates a long-running turn).
const p101 = runMiddlewareChain({ update: { update_id: 101 } }, async () => update101Gate);
// Let update 101 enter the chain and persist acceptance before 102 completes.
// Let update 101 enter the chain. Telegram now persists the restart watermark only after
// the handler completes, so a crash during the pending turn can replay the update.
await Promise.resolve();
expect(onUpdateId).toHaveBeenCalledWith(101);
expect(onUpdateId).not.toHaveBeenCalled();
// Complete update 102 while 101 is still pending. Restart replay protection is at-most-once.
// Complete update 102 while 101 is still pending. The persisted watermark must not advance
// past pending lower ids.
await runMiddlewareChain({ update: { update_id: 102 } }, async () => {});
expect(onUpdateId).toHaveBeenCalledWith(102);
expect(onUpdateId).not.toHaveBeenCalled();
releaseUpdate101?.();
await p101;
expect(onUpdateId.mock.calls.map((call) => Number(call[0]))).toEqual([101, 102]);
expect(onUpdateId.mock.calls.map((call) => Number(call[0]))).toEqual([102]);
});
it("logs and swallows update watermark persistence failures", async () => {
sequentializeSpy.mockImplementationOnce(
@@ -1326,7 +1328,7 @@ describe("createTelegramBot", () => {
}
});
it("persists failed updates once accepted while preserving same-process retries", async () => {
it("keeps failed updates unpersisted while preserving same-process retries", async () => {
sequentializeSpy.mockImplementationOnce(
() => async (_ctx: unknown, next: () => Promise<void>) => {
await next();
@@ -1378,12 +1380,12 @@ describe("createTelegramBot", () => {
}),
).rejects.toThrow("middleware boom");
await flushTelegramTestMicrotasks();
expect(onUpdateId).toHaveBeenCalledWith(201);
expect(onUpdateId).not.toHaveBeenCalled();
await runMiddlewareChain({ update: { update_id: 202 } }, async () => {});
await flushTelegramTestMicrotasks();
expect(onUpdateId).toHaveBeenCalledWith(202);
expect(onUpdateId).not.toHaveBeenCalled();
const retryHandler = vi.fn();
await runMiddlewareChain({ update: { update_id: 201 } }, async () => {
@@ -1392,7 +1394,7 @@ describe("createTelegramBot", () => {
await flushTelegramTestMicrotasks();
expect(retryHandler).toHaveBeenCalledTimes(1);
expect(onUpdateId.mock.calls.map((call) => Number(call[0]))).toEqual([201, 202]);
expect(onUpdateId.mock.calls.map((call) => Number(call[0]))).toEqual([202]);
});
it("skips replayed update ids even when the semantic update key differs", async () => {

View File

@@ -0,0 +1,210 @@
import {
verifyChannelMessageAdapterCapabilityProofs,
verifyChannelMessageLiveCapabilityAdapterProofs,
verifyChannelMessageLiveFinalizerProofs,
verifyChannelMessageReceiveAckPolicyAdapterProofs,
} from "openclaw/plugin-sdk/channel-message";
import { beforeEach, describe, expect, it, vi } from "vitest";
const sendMessageTelegramMock = vi.fn();
vi.mock("./send.js", () => ({
sendMessageTelegram: (...args: unknown[]) => sendMessageTelegramMock(...args),
}));
import { telegramPlugin } from "./channel.js";
describe("telegram channel message adapter", () => {
beforeEach(() => {
sendMessageTelegramMock.mockReset();
});
it("backs declared durable-final capabilities with native send proofs", async () => {
const adapter = telegramPlugin.message;
expect(adapter).toBeDefined();
const proveText = async () => {
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-text", chatId: "12345" });
const result = await adapter!.send!.text!({
cfg: {} as never,
to: "12345",
text: "hello",
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenLastCalledWith(
"12345",
"hello",
expect.objectContaining({ verbose: false }),
);
expect(result.receipt.platformMessageIds).toEqual(["tg-text"]);
};
const proveMedia = async () => {
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-media", chatId: "12345" });
const result = await adapter!.send!.media!({
cfg: {} as never,
to: "12345",
text: "caption",
mediaUrl: "https://example.com/a.png",
mediaLocalRoots: ["/tmp/media"],
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenLastCalledWith(
"12345",
"caption",
expect.objectContaining({
mediaUrl: "https://example.com/a.png",
mediaLocalRoots: ["/tmp/media"],
}),
);
expect(result.receipt.parts[0]?.kind).toBe("media");
};
const provePayload = async () => {
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-payload", chatId: "12345" });
const result = await adapter!.send!.payload!({
cfg: {} as never,
to: "12345",
text: "payload",
payload: { text: "payload" },
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenLastCalledWith(
"12345",
"payload",
expect.objectContaining({ verbose: false }),
);
expect(result.receipt.platformMessageIds).toEqual(["tg-payload"]);
};
const proveReplyThreadSilent = async () => {
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-thread", chatId: "12345" });
await adapter!.send!.text!({
cfg: {} as never,
to: "12345",
text: "threaded",
replyToId: "900",
threadId: "12",
silent: true,
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenLastCalledWith(
"12345",
"threaded",
expect.objectContaining({
replyToMessageId: 900,
messageThreadId: 12,
silent: true,
}),
);
};
const proveBatch = async () => {
const startCallCount = sendMessageTelegramMock.mock.calls.length;
sendMessageTelegramMock
.mockResolvedValueOnce({ messageId: "tg-batch-1", chatId: "12345" })
.mockResolvedValueOnce({ messageId: "tg-batch-2", chatId: "12345" });
await adapter!.send!.payload!({
cfg: {} as never,
to: "12345",
text: "batch",
payload: {
text: "batch",
mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"],
},
deps: { sendTelegram: sendMessageTelegramMock },
});
const batchCalls = sendMessageTelegramMock.mock.calls.slice(startCallCount);
expect(batchCalls[0]).toEqual([
"12345",
"batch",
expect.objectContaining({ mediaUrl: "https://example.com/a.png" }),
]);
expect(batchCalls[1]).toEqual([
"12345",
"",
expect.objectContaining({ mediaUrl: "https://example.com/b.png" }),
]);
};
await verifyChannelMessageAdapterCapabilityProofs({
adapterName: "telegramMessageAdapter",
adapter: adapter!,
proofs: {
text: proveText,
media: proveMedia,
payload: provePayload,
silent: proveReplyThreadSilent,
replyTo: proveReplyThreadSilent,
thread: proveReplyThreadSilent,
messageSendingHooks: () => {
expect(adapter!.send!.text).toBeTypeOf("function");
},
batch: proveBatch,
},
});
});
it("backs declared live capabilities with adapter proofs", async () => {
const adapter = telegramPlugin.message;
expect(adapter).toBeDefined();
await verifyChannelMessageLiveCapabilityAdapterProofs({
adapterName: "telegramMessageAdapter",
adapter: adapter!,
proofs: {
draftPreview: () => {
expect(adapter!.receive?.defaultAckPolicy).toBe("after_agent_dispatch");
},
previewFinalization: () => {
expect(adapter!.durableFinal?.capabilities?.text).toBe(true);
},
progressUpdates: () => {
expect(adapter!.live?.capabilities?.draftPreview).toBe(true);
},
},
});
});
it("backs declared live preview finalizer capabilities with adapter proofs", async () => {
const adapter = telegramPlugin.message;
expect(adapter).toBeDefined();
await verifyChannelMessageLiveFinalizerProofs({
adapterName: "telegramMessageAdapter",
adapter: adapter!,
proofs: {
finalEdit: () => {
expect(adapter!.live?.capabilities?.previewFinalization).toBe(true);
},
normalFallback: () => {
expect(adapter!.durableFinal?.capabilities?.text).toBe(true);
},
previewReceipt: () => {
expect(adapter!.live?.finalizer?.capabilities?.previewReceipt).toBe(true);
},
retainOnAmbiguousFailure: () => {
expect(adapter!.live?.finalizer?.capabilities?.retainOnAmbiguousFailure).toBe(true);
},
},
});
});
it("backs declared receive ack policies with adapter proofs", async () => {
const adapter = telegramPlugin.message;
expect(adapter).toBeDefined();
await verifyChannelMessageReceiveAckPolicyAdapterProofs({
adapterName: "telegramMessageAdapter",
adapter: adapter!,
proofs: {
after_receive_record: () => {
expect(adapter!.receive?.supportedAckPolicies).toContain("after_receive_record");
},
after_agent_dispatch: () => {
expect(adapter!.receive?.defaultAckPolicy).toBe("after_agent_dispatch");
},
},
});
});
});

View File

@@ -11,6 +11,12 @@ import {
createChatChannelPlugin,
} from "openclaw/plugin-sdk/channel-core";
import { createAccountStatusSink } from "openclaw/plugin-sdk/channel-lifecycle";
import {
createMessageReceiptFromOutboundResults,
defineChannelMessageAdapter,
type ChannelMessageSendResult,
type MessageReceiptPartKind,
} from "openclaw/plugin-sdk/channel-message";
import { createPairingPrefixStripper } from "openclaw/plugin-sdk/channel-pairing";
import { attachChannelToResult } from "openclaw/plugin-sdk/channel-send-result";
import {
@@ -167,6 +173,7 @@ function buildTelegramSendOptions(params: {
cfg: OpenClawConfig;
mediaUrl?: string | null;
mediaLocalRoots?: readonly string[] | null;
mediaReadFile?: ((filePath: string) => Promise<Buffer>) | null;
accountId?: string | null;
replyToId?: string | null;
threadId?: string | number | null;
@@ -179,6 +186,7 @@ function buildTelegramSendOptions(params: {
cfg: params.cfg,
...(params.mediaUrl ? { mediaUrl: params.mediaUrl } : {}),
...(params.mediaLocalRoots?.length ? { mediaLocalRoots: params.mediaLocalRoots } : {}),
...(params.mediaReadFile ? { mediaReadFile: params.mediaReadFile } : {}),
messageThreadId: parseTelegramThreadId(params.threadId),
replyToMessageId: parseTelegramReplyToMessageId(params.replyToId),
accountId: params.accountId ?? undefined,
@@ -196,11 +204,13 @@ async function sendTelegramOutbound(params: {
text: string;
mediaUrl?: string | null;
mediaLocalRoots?: readonly string[] | null;
mediaReadFile?: ((filePath: string) => Promise<Buffer>) | null;
accountId?: string | null;
deps?: OutboundSendDeps;
replyToId?: string | null;
threadId?: string | number | null;
silent?: boolean | null;
forceDocument?: boolean | null;
gatewayClientScopes?: readonly string[] | null;
}) {
const send = await resolveTelegramSend(params.deps);
@@ -211,15 +221,148 @@ async function sendTelegramOutbound(params: {
cfg: params.cfg,
mediaUrl: params.mediaUrl,
mediaLocalRoots: params.mediaLocalRoots,
mediaReadFile: params.mediaReadFile,
accountId: params.accountId,
replyToId: params.replyToId,
threadId: params.threadId,
silent: params.silent,
forceDocument: params.forceDocument,
gatewayClientScopes: params.gatewayClientScopes,
}),
);
}
type TelegramMessageSendSourceResult = {
messageId?: string;
chatId?: string;
receipt?: ChannelMessageSendResult["receipt"];
};
function toTelegramMessageSendResult(
result: TelegramMessageSendSourceResult,
kind: MessageReceiptPartKind,
replyToId?: string | null,
): ChannelMessageSendResult {
const receipt =
result.receipt ??
createMessageReceiptFromOutboundResults({
results: result.messageId
? [
{
channel: "telegram",
messageId: result.messageId,
chatId: result.chatId,
},
]
: [],
kind,
...(replyToId ? { replyToId } : {}),
});
return {
messageId: result.messageId || receipt.primaryPlatformMessageId,
receipt,
};
}
const telegramMessageAdapter = defineChannelMessageAdapter({
id: "telegram",
durableFinal: {
capabilities: {
text: true,
media: true,
payload: true,
silent: true,
replyTo: true,
thread: true,
messageSendingHooks: true,
batch: true,
},
},
live: {
capabilities: {
draftPreview: true,
previewFinalization: true,
progressUpdates: true,
},
finalizer: {
capabilities: {
finalEdit: true,
normalFallback: true,
previewReceipt: true,
retainOnAmbiguousFailure: true,
},
},
},
receive: {
defaultAckPolicy: "after_agent_dispatch",
supportedAckPolicies: ["after_receive_record", "after_agent_dispatch"],
},
send: {
text: async (ctx) =>
toTelegramMessageSendResult(
await sendTelegramOutbound({
cfg: ctx.cfg,
to: ctx.to,
text: ctx.text,
accountId: ctx.accountId,
deps: ctx.deps,
replyToId: ctx.replyToId,
threadId: ctx.threadId,
silent: ctx.silent,
gatewayClientScopes: ctx.gatewayClientScopes,
}),
"text",
ctx.replyToId,
),
media: async (ctx) =>
toTelegramMessageSendResult(
await sendTelegramOutbound({
cfg: ctx.cfg,
to: ctx.to,
text: ctx.text,
mediaUrl: ctx.mediaUrl,
mediaLocalRoots: ctx.mediaLocalRoots,
mediaReadFile: ctx.mediaReadFile,
accountId: ctx.accountId,
deps: ctx.deps,
replyToId: ctx.replyToId,
threadId: ctx.threadId,
silent: ctx.silent,
forceDocument: ctx.forceDocument,
gatewayClientScopes: ctx.gatewayClientScopes,
}),
"media",
ctx.replyToId,
),
payload: async (ctx) => {
const send = await resolveTelegramSend(ctx.deps);
const result = attachChannelToResult(
"telegram",
await sendTelegramPayloadMessages({
send,
to: ctx.to,
payload: ctx.payload,
baseOpts: {
...buildTelegramSendOptions({
cfg: ctx.cfg,
mediaUrl: ctx.mediaUrl,
mediaLocalRoots: ctx.mediaLocalRoots,
accountId: ctx.accountId,
replyToId: ctx.replyToId,
threadId: ctx.threadId,
silent: ctx.silent,
forceDocument: ctx.forceDocument,
gatewayClientScopes: ctx.gatewayClientScopes,
}),
...(ctx.mediaReadFile ? { mediaReadFile: ctx.mediaReadFile } : {}),
},
}),
);
return toTelegramMessageSendResult(result, "unknown", ctx.replyToId);
},
},
});
const telegramMessageActions: ChannelMessageActionAdapter = {
resolveExecutionMode: (ctx) =>
getOptionalTelegramRuntime()?.channel?.telegram?.messageActions?.resolveExecutionMode?.(ctx) ??
@@ -776,6 +919,7 @@ export const telegramPlugin = createChatChannelPlugin({
listGroups: async (params) => listTelegramDirectoryGroupsFromConfig(params),
}),
actions: telegramMessageActions,
message: telegramMessageAdapter,
status: createComputedAccountStatusAdapter<ResolvedTelegramAccount, TelegramProbe>({
defaultRuntime: createDefaultChannelRuntimeState(DEFAULT_ACCOUNT_ID),
collectStatusIssues: collectTelegramStatusIssues,

View File

@@ -1,3 +1,9 @@
import {
createPreviewMessageReceipt,
defineFinalizableLivePreviewAdapter,
deliverWithFinalizableLivePreviewAdapter,
type MessageReceipt,
} from "openclaw/plugin-sdk/channel-message";
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import type { TelegramInlineButtons } from "./button-types.js";
@@ -67,13 +73,20 @@ export type LanePreviewLifecycle = "transient" | "complete";
export type LaneDeliveryResult =
| {
kind: "preview-finalized";
delivery: {
content: string;
messageId?: number;
};
delivery: LanePreviewFinalizedDelivery;
}
| { kind: "preview-retained" | "preview-updated" | "sent" | "skipped" };
type LanePreviewFinalizedDelivery = {
content: string;
messageId: number;
receipt: MessageReceipt;
};
type LanePreviewFinalizedDeliveryInput = Omit<LanePreviewFinalizedDelivery, "receipt"> & {
receipt?: MessageReceipt;
};
type CreateLaneTextDelivererParams = {
lanes: Record<LaneName, DraftLaneState>;
archivedAnswerPreviews: ArchivedPreview[];
@@ -83,7 +96,10 @@ type CreateLaneTextDelivererParams = {
applyTextToPayload: (payload: ReplyPayload, text: string) => ReplyPayload;
applyTextToFollowUpPayload?: (payload: ReplyPayload, text: string) => ReplyPayload;
splitFinalTextForPreview?: (text: string) => readonly string[];
sendPayload: (payload: ReplyPayload) => Promise<boolean>;
sendPayload: (
payload: ReplyPayload,
options?: { durable?: boolean; silent?: boolean },
) => Promise<boolean>;
flushDraftLane: (lane: DraftLaneState) => Promise<void>;
stopDraftLane: (lane: DraftLaneState) => Promise<void>;
editPreview: (params: {
@@ -151,12 +167,27 @@ type PreviewTargetResolution = {
stopCreatesFirstPreview: boolean;
};
type TelegramPreviewFinalEdit = {
laneName: LaneName;
messageId: number;
text: string;
context: "final" | "update";
previewButtons?: TelegramInlineButtons;
};
function result(
kind: LaneDeliveryResult["kind"],
delivery?: Extract<LaneDeliveryResult, { kind: "preview-finalized" }>["delivery"],
delivery?: LanePreviewFinalizedDeliveryInput,
): LaneDeliveryResult {
if (kind === "preview-finalized") {
return { kind, delivery: delivery! };
const finalized = delivery!;
return {
kind,
delivery: {
...finalized,
receipt: finalized.receipt ?? createPreviewMessageReceipt({ id: finalized.messageId }),
},
};
}
return { kind };
}
@@ -278,86 +309,133 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
retainAlternatePreviewOnMissingTarget: boolean;
targetPreviewText: string;
}): Promise<PreviewEditResult> => {
try {
await params.editPreview({
const previewEditState: { result: PreviewEditResult } = { result: "fallback" };
const adapter = defineFinalizableLivePreviewAdapter<
{ text: string },
number,
TelegramPreviewFinalEdit
>({
draft: {
flush: async () => {},
clear: async () => {},
id: () => args.messageId,
},
buildFinalEdit: (payload) => ({
laneName: args.laneName,
messageId: args.messageId,
text: args.text,
previewButtons: args.previewButtons,
text: payload.text,
context: args.context,
});
if (args.updateLaneSnapshot) {
args.lane.lastPartialText = args.text;
}
params.markDelivered();
return "edited";
} catch (err) {
if (isMessageNotModifiedError(err)) {
params.log(
`telegram: ${args.laneName} preview ${args.context} edit returned "message is not modified"; treating as delivered`,
);
...(args.previewButtons ? { previewButtons: args.previewButtons } : {}),
}),
editFinal: async (_messageId, edit) => {
try {
await params.editPreview(edit);
} catch (err) {
if (isMessageNotModifiedError(err)) {
params.log(
`telegram: ${args.laneName} preview ${args.context} edit returned "message is not modified"; treating as delivered`,
);
return;
}
throw err;
}
},
createPreviewReceipt: (messageId) => createPreviewMessageReceipt({ id: messageId }),
onPreviewFinalized: () => {
if (args.updateLaneSnapshot) {
args.lane.lastPartialText = args.text;
}
params.markDelivered();
return "edited";
}
if (args.context === "final") {
if (args.finalTextAlreadyLanded) {
previewEditState.result = "edited";
},
handlePreviewEditError: ({ error: err }) => {
previewEditState.result = "fallback";
if (isMessageNotModifiedError(err)) {
params.log(
`telegram: ${args.laneName} preview final edit failed after stop flush; keeping existing preview (${String(err)})`,
`telegram: ${args.laneName} preview ${args.context} edit returned "message is not modified"; treating as delivered`,
);
params.markDelivered();
return "retained";
previewEditState.result = "edited";
return "retain";
}
if (isSafeToRetrySendError(err)) {
params.log(
`telegram: ${args.laneName} preview final edit failed before reaching Telegram; falling back to standard send (${String(err)})`,
);
return "fallback";
}
if (isMissingPreviewMessageError(err)) {
if (args.retainAlternatePreviewOnMissingTarget) {
if (args.context === "final") {
if (args.finalTextAlreadyLanded) {
params.log(
`telegram: ${args.laneName} preview final edit target missing; keeping alternate preview without fallback (${String(err)})`,
`telegram: ${args.laneName} preview final edit failed after stop flush; keeping existing preview (${String(err)})`,
);
params.markDelivered();
return "retained";
previewEditState.result = "retained";
return "retain";
}
if (isSafeToRetrySendError(err)) {
params.log(
`telegram: ${args.laneName} preview final edit failed before reaching Telegram; falling back to standard send (${String(err)})`,
);
return "fallback";
}
if (isMissingPreviewMessageError(err)) {
if (args.retainAlternatePreviewOnMissingTarget) {
params.log(
`telegram: ${args.laneName} preview final edit target missing; keeping alternate preview without fallback (${String(err)})`,
);
params.markDelivered();
previewEditState.result = "retained";
return "retain";
}
params.log(
`telegram: ${args.laneName} preview final edit target missing with no alternate preview; falling back to standard send (${String(err)})`,
);
return "fallback";
}
if (isRecoverableTelegramNetworkError(err, { allowMessageMatch: true })) {
params.log(
`telegram: ${args.laneName} preview final edit may have landed despite network error; keeping existing preview (${String(err)})`,
);
params.markDelivered();
previewEditState.result = "retained";
return "retain";
}
if (isTelegramClientRejection(err)) {
params.log(
`telegram: ${args.laneName} preview final edit rejected by Telegram (client error); falling back to standard send (${String(err)})`,
);
return "fallback";
}
if (isIncompleteFinalPreviewPrefix(args.targetPreviewText, args.text)) {
params.log(
`telegram: ${args.laneName} preview final edit failed and existing preview is an incomplete prefix; falling back to standard send (${String(err)})`,
);
return "fallback";
}
// Default: ambiguous error — retain when fallback may duplicate a final
// edit that already landed or when the preview is not known-incomplete.
params.log(
`telegram: ${args.laneName} preview final edit target missing with no alternate preview; falling back to standard send (${String(err)})`,
);
return "fallback";
}
if (isRecoverableTelegramNetworkError(err, { allowMessageMatch: true })) {
params.log(
`telegram: ${args.laneName} preview final edit may have landed despite network error; keeping existing preview (${String(err)})`,
`telegram: ${args.laneName} preview final edit failed with ambiguous error; keeping existing preview to avoid duplicate (${String(err)})`,
);
params.markDelivered();
return "retained";
previewEditState.result = "retained";
return "retain";
}
if (isTelegramClientRejection(err)) {
params.log(
`telegram: ${args.laneName} preview final edit rejected by Telegram (client error); falling back to standard send (${String(err)})`,
);
return "fallback";
}
if (isIncompleteFinalPreviewPrefix(args.targetPreviewText, args.text)) {
params.log(
`telegram: ${args.laneName} preview final edit failed and existing preview is an incomplete prefix; falling back to standard send (${String(err)})`,
);
return "fallback";
}
// Default: ambiguous error — retain when fallback may duplicate a final
// edit that already landed or when the preview is not known-incomplete.
params.log(
`telegram: ${args.laneName} preview final edit failed with ambiguous error; keeping existing preview to avoid duplicate (${String(err)})`,
`telegram: ${args.laneName} preview ${args.context} edit failed; falling back to standard send (${String(err)})`,
);
params.markDelivered();
return "retained";
}
params.log(
`telegram: ${args.laneName} preview ${args.context} edit failed; falling back to standard send (${String(err)})`,
);
return "fallback";
return "fallback";
},
});
const delivered = await deliverWithFinalizableLivePreviewAdapter({
kind: "final",
payload: { text: args.text },
adapter,
deliverNormally: async () => false,
});
if (delivered.kind === "preview-finalized" || previewEditState.result === "edited") {
return "edited";
}
if (delivered.kind === "preview-retained") {
return "retained";
}
return "fallback";
};
const tryDeliverLongFinalThroughPreview = async (args: {
lane: DraftLaneState;
@@ -533,7 +611,9 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
return undefined;
}
if (canEditViaPreview && shouldUseFreshFinalForPreview(lane, archivedPreview.visibleSinceMs)) {
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), {
durable: true,
});
if (delivered) {
try {
await params.deletePreviewMessage(archivedPreview.messageId);
@@ -576,7 +656,9 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
}
// Send the replacement message first, then clean up the old preview.
// This avoids the visual "disappear then reappear" flash.
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), {
durable: true,
});
// Once this archived preview is consumed by a fallback final send, delete it
// regardless of deleteIfUnused. That flag only applies to unconsumed boundaries.
if (delivered || archivedPreview.deleteIfUnused !== false) {
@@ -640,7 +722,9 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
}
if (shouldUseFreshFinalForLane(lane)) {
await params.stopDraftLane(lane);
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), {
durable: true,
});
if (delivered) {
await clearActivePreviewAfterFreshFinal(lane, laneName);
return result("sent");
@@ -656,18 +740,25 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
skipRegressive: "existingOnly",
context: "final",
});
const finalizedMessageId = previewMessageId ?? lane.stream?.messageId();
if (finalized === "edited") {
markActivePreviewComplete(laneName);
if (typeof finalizedMessageId !== "number") {
return result("preview-retained");
}
return result("preview-finalized", {
content: text,
messageId: previewMessageId ?? lane.stream?.messageId(),
messageId: finalizedMessageId,
});
}
if (finalized === "regressive-skipped") {
markActivePreviewComplete(laneName);
if (typeof finalizedMessageId !== "number") {
return result("preview-retained");
}
return result("preview-finalized", {
content: lane.lastPartialText,
messageId: previewMessageId ?? lane.stream?.messageId(),
messageId: finalizedMessageId,
});
}
if (finalized === "retained") {
@@ -690,7 +781,9 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
);
}
await params.stopDraftLane(lane);
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), {
durable: true,
});
return delivered ? result("sent") : result("skipped");
}

View File

@@ -140,22 +140,43 @@ async function expectFinalEditFallbackToSend(params: {
const result = await deliverFinalAnswer(params.harness, params.text);
expect(result.kind).toBe("sent");
expect(params.harness.editPreview).toHaveBeenCalledTimes(1);
expect(params.harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: params.text }),
);
expectSendPayloadWith(params.harness, { text: params.text });
expect(params.harness.log).toHaveBeenCalledWith(
expect.stringContaining(params.expectedLogSnippet),
);
}
function expectPreviewFinalized(
result: LaneDeliveryResult,
): Extract<LaneDeliveryResult, { kind: "preview-finalized" }>["delivery"] {
function expectSendPayloadWith(
harness: ReturnType<typeof createHarness>,
expected: Partial<ReplyPayload>,
) {
expect(
harness.sendPayload.mock.calls.some(([payload]) =>
Object.entries(expected).every(([key, value]) => {
return (payload as Record<string, unknown>)[key] === value;
}),
),
).toBe(true);
}
function expectPreviewFinalized(result: LaneDeliveryResult): {
content: string;
messageId: number;
} {
expect(result.kind).toBe("preview-finalized");
if (result.kind !== "preview-finalized") {
throw new Error(`expected preview-finalized, got ${result.kind}`);
}
return result.delivery;
expect(result.delivery.receipt).toEqual(
expect.objectContaining({
primaryPlatformMessageId: String(result.delivery.messageId),
platformMessageIds: [String(result.delivery.messageId)],
}),
);
return {
content: result.delivery.content,
messageId: result.delivery.messageId,
};
}
describe("createLaneTextDeliverer", () => {
@@ -290,9 +311,7 @@ describe("createLaneTextDeliverer", () => {
const result = await deliverFinalAnswer(harness, HELLO_FINAL);
expect(result.kind).toBe("sent");
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: HELLO_FINAL }),
);
expectSendPayloadWith(harness, { text: HELLO_FINAL });
expect(harness.log).toHaveBeenCalledWith(
expect.stringContaining("failed before reaching Telegram; falling back"),
);
@@ -320,9 +339,7 @@ describe("createLaneTextDeliverer", () => {
expect(result.kind).toBe("sent");
expect(harness.editPreview).not.toHaveBeenCalled();
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: "Short final" }),
);
expectSendPayloadWith(harness, { text: "Short final" });
});
it("does not create a synthetic preview for final-only text", async () => {
@@ -343,9 +360,7 @@ describe("createLaneTextDeliverer", () => {
expect(answerStream.update).not.toHaveBeenCalled();
expect(answerStream.materialize).not.toHaveBeenCalled();
expect(harness.editPreview).not.toHaveBeenCalled();
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: "Final only" }),
);
expectSendPayloadWith(harness, { text: "Final only" });
});
it("keeps existing preview when final text regresses", async () => {
@@ -381,7 +396,7 @@ describe("createLaneTextDeliverer", () => {
expect(result.kind).toBe("sent");
expect(harness.editPreview).not.toHaveBeenCalled();
expect(harness.sendPayload).toHaveBeenCalledWith(expect.objectContaining({ text: longText }));
expectSendPayloadWith(harness, { text: longText });
expect(harness.log).toHaveBeenCalledWith(expect.stringContaining("preview final too long"));
});
@@ -429,9 +444,7 @@ describe("createLaneTextDeliverer", () => {
expect(result.kind).toBe("sent");
expect(harness.stopDraftLane).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: HELLO_FINAL }),
);
expectSendPayloadWith(harness, { text: HELLO_FINAL });
expect(harness.editPreview).not.toHaveBeenCalled();
expect(harness.answer.stream?.clear).toHaveBeenCalledTimes(1);
expect(harness.answer.stream?.forceNewMessage).toHaveBeenCalledTimes(1);
@@ -486,9 +499,7 @@ describe("createLaneTextDeliverer", () => {
const result = await deliverFinalAnswer(harness, HELLO_FINAL);
expect(result.kind).toBe("sent");
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: HELLO_FINAL }),
);
expectSendPayloadWith(harness, { text: HELLO_FINAL });
expect(harness.editPreview).not.toHaveBeenCalled();
expect(harness.deletePreviewMessage).toHaveBeenCalledWith(222);
});
@@ -546,9 +557,7 @@ describe("createLaneTextDeliverer", () => {
const result = await deliverFinalAnswer(harness, "Complete final answer");
expect(harness.editPreview).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: "Complete final answer" }),
);
expectSendPayloadWith(harness, { text: "Complete final answer" });
expect(result.kind).toBe("sent");
expect(harness.deletePreviewMessage).toHaveBeenCalledWith(5555);
});
@@ -640,9 +649,7 @@ describe("createLaneTextDeliverer", () => {
const result = await deliverFinalAnswer(harness, HELLO_FINAL);
expect(result.kind).toBe("sent");
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: HELLO_FINAL }),
);
expectSendPayloadWith(harness, { text: HELLO_FINAL });
});
it("retains when sendMayHaveLanded is true and a prior preview was visible", async () => {
@@ -681,9 +688,10 @@ describe("createLaneTextDeliverer", () => {
});
expect(result.kind).toBe("sent");
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: "Final with media", mediaUrl: "file:///tmp/example.png" }),
);
expectSendPayloadWith(harness, {
text: "Final with media",
mediaUrl: "file:///tmp/example.png",
});
expect(harness.deletePreviewMessage).toHaveBeenCalledWith(4444);
});
});

View File

@@ -1,3 +1,4 @@
import { verifyDurableFinalCapabilityProofs } from "openclaw/plugin-sdk/channel-message";
import { beforeEach, describe, expect, it, vi } from "vitest";
const sendMessageTelegramMock = vi.fn();
@@ -123,6 +124,28 @@ describe("telegramOutbound", () => {
expect(result).toEqual({ channel: "telegram", messageId: "tg-buttons", chatId: "12345" });
});
it("forwards silent delivery options to Telegram sends", async () => {
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-silent", chatId: "12345" });
const result = await telegramOutbound.sendPayload!({
cfg: {} as never,
to: "12345",
text: "quiet",
payload: { text: "quiet" },
silent: true,
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenCalledWith(
"12345",
"quiet",
expect.objectContaining({
silent: true,
}),
);
expect(result).toEqual({ channel: "telegram", messageId: "tg-silent", chatId: "12345" });
});
it("forwards audioAsVoice payload media to Telegram voice sends", async () => {
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-voice", chatId: "12345" });
@@ -149,6 +172,116 @@ describe("telegramOutbound", () => {
expect(result).toEqual({ channel: "telegram", messageId: "tg-voice", chatId: "12345" });
});
it("backs declared durable final capabilities with delivery proofs", async () => {
const proveText = async () => {
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-text", chatId: "12345" });
await telegramOutbound.sendText!({
cfg: {} as never,
to: "12345",
text: "hello",
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenLastCalledWith(
"12345",
"hello",
expect.objectContaining({ textMode: "html" }),
);
};
const proveMedia = async () => {
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-media", chatId: "12345" });
await telegramOutbound.sendMedia!({
cfg: {} as never,
to: "12345",
text: "caption",
mediaUrl: "https://example.com/a.png",
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenLastCalledWith(
"12345",
"caption",
expect.objectContaining({ mediaUrl: "https://example.com/a.png" }),
);
};
const provePayload = async () => {
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-payload", chatId: "12345" });
await telegramOutbound.sendPayload!({
cfg: {} as never,
to: "12345",
text: "",
payload: { text: "payload" },
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenLastCalledWith(
"12345",
"payload",
expect.any(Object),
);
};
const proveReplyThreadSilent = async () => {
sendMessageTelegramMock.mockResolvedValueOnce({ messageId: "tg-thread", chatId: "12345" });
await telegramOutbound.sendText!({
cfg: {} as never,
to: "12345",
text: "threaded",
replyToId: "900",
threadId: "12",
silent: true,
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenLastCalledWith(
"12345",
"threaded",
expect.objectContaining({
replyToMessageId: 900,
messageThreadId: 12,
silent: true,
}),
);
};
const proveBatch = async () => {
sendMessageTelegramMock
.mockResolvedValueOnce({ messageId: "tg-batch-1", chatId: "12345" })
.mockResolvedValueOnce({ messageId: "tg-batch-2", chatId: "12345" });
await telegramOutbound.sendPayload!({
cfg: {} as never,
to: "12345",
text: "",
payload: {
text: "batch",
mediaUrls: ["https://example.com/a.png", "https://example.com/b.png"],
},
deps: { sendTelegram: sendMessageTelegramMock },
});
expect(sendMessageTelegramMock).toHaveBeenCalledWith(
"12345",
"batch",
expect.objectContaining({ mediaUrl: "https://example.com/a.png" }),
);
expect(sendMessageTelegramMock).toHaveBeenCalledWith(
"12345",
"",
expect.objectContaining({ mediaUrl: "https://example.com/b.png" }),
);
};
await verifyDurableFinalCapabilityProofs({
adapterName: "telegramOutbound",
capabilities: telegramOutbound.deliveryCapabilities?.durableFinal,
proofs: {
text: proveText,
media: proveMedia,
payload: provePayload,
silent: proveReplyThreadSilent,
replyTo: proveReplyThreadSilent,
thread: proveReplyThreadSilent,
messageSendingHooks: () => {
expect(telegramOutbound.sendText).toBeTypeOf("function");
},
batch: proveBatch,
},
});
});
it("passes delivery pin notify requests to Telegram pinning", async () => {
pinMessageTelegramMock.mockResolvedValueOnce({ ok: true, messageId: "tg-1", chatId: "12345" });

View File

@@ -42,6 +42,7 @@ async function resolveTelegramSendContext(params: {
accountId?: string | null;
replyToId?: string | null;
threadId?: string | number | null;
silent?: boolean;
gatewayClientScopes?: readonly string[];
}): Promise<{
send: TelegramSendFn;
@@ -52,6 +53,7 @@ async function resolveTelegramSendContext(params: {
messageThreadId?: number;
replyToMessageId?: number;
accountId?: string;
silent?: boolean;
gatewayClientScopes?: readonly string[];
};
}> {
@@ -67,6 +69,7 @@ async function resolveTelegramSendContext(params: {
messageThreadId: parseTelegramThreadId(params.threadId),
replyToMessageId: parseTelegramReplyToMessageId(params.replyToId),
accountId: params.accountId ?? undefined,
silent: params.silent,
gatewayClientScopes: params.gatewayClientScopes,
},
};
@@ -135,6 +138,17 @@ export const telegramOutbound: ChannelOutboundAdapter = {
},
deliveryCapabilities: {
pin: true,
durableFinal: {
text: true,
media: true,
payload: true,
silent: true,
replyTo: true,
thread: true,
nativeQuote: false,
messageSendingHooks: true,
batch: true,
},
},
renderPresentation: ({ payload, presentation }) => ({
...payload,
@@ -161,6 +175,7 @@ export const telegramOutbound: ChannelOutboundAdapter = {
deps,
replyToId,
threadId,
silent,
gatewayClientScopes,
}) => {
const { send, baseOpts } = await resolveTelegramSendContext({
@@ -169,6 +184,7 @@ export const telegramOutbound: ChannelOutboundAdapter = {
accountId,
replyToId,
threadId,
silent,
gatewayClientScopes,
});
return await send(to, text, {
@@ -187,6 +203,7 @@ export const telegramOutbound: ChannelOutboundAdapter = {
replyToId,
threadId,
forceDocument,
silent,
gatewayClientScopes,
}) => {
const { send, baseOpts } = await resolveTelegramSendContext({
@@ -195,6 +212,7 @@ export const telegramOutbound: ChannelOutboundAdapter = {
accountId,
replyToId,
threadId,
silent,
gatewayClientScopes,
});
return await send(to, text, {
@@ -217,6 +235,7 @@ export const telegramOutbound: ChannelOutboundAdapter = {
replyToId,
threadId,
forceDocument,
silent,
gatewayClientScopes,
}) => {
const { send, baseOpts } = await resolveTelegramSendContext({
@@ -225,6 +244,7 @@ export const telegramOutbound: ChannelOutboundAdapter = {
accountId,
replyToId,
threadId,
silent,
gatewayClientScopes,
});
const result = await sendTelegramPayloadMessages({