fix: harden telegram routing edge cases

This commit is contained in:
Peter Steinberger
2026-05-17 00:44:31 +01:00
parent 983e8d39da
commit b09e11bc69
9 changed files with 172 additions and 21 deletions

View File

@@ -367,6 +367,7 @@ describe("handleTelegramAction", () => {
content: "Hello, Telegram!",
},
telegramConfig(),
{ gatewayClientScopes: ["operator.write"] },
);
const call = mockCall(sendMessageTelegram, 0, "text message");
expect(call[0]).toBe("@testchannel");
@@ -550,6 +551,7 @@ describe("handleTelegramAction", () => {
media: "https://example.com/image.jpg",
},
telegramConfig(),
{ gatewayClientScopes: ["operator.write"] },
);
const call = mockCall(sendMessageTelegram, 0, "send alias");
expect(call[0]).toBe("@testchannel");
@@ -771,22 +773,28 @@ describe("handleTelegramAction", () => {
readCallOpts: (calls: unknown[][], argIndex: number) => Record<string, unknown>,
) => readCallOpts(editForumTopicTelegram.mock.calls as unknown[][], 2),
},
])("forwards resolved cfg for $name action", async ({ params, cfg, assertCall }) => {
const readCallOpts = (calls: unknown[][], argIndex: number): Record<string, unknown> => {
const args = calls[0];
if (!Array.isArray(args)) {
throw new Error("Expected Telegram action call args");
}
const opts = args[argIndex];
if (!opts || typeof opts !== "object") {
throw new Error("Expected Telegram action options object");
}
return opts as Record<string, unknown>;
};
await handleTelegramAction(params as Record<string, unknown>, cfg);
const opts = assertCall(readCallOpts);
expect(opts.cfg).toBe(cfg);
});
])(
"forwards resolved cfg and gateway scopes for $name action",
async ({ params, cfg, assertCall }) => {
const readCallOpts = (calls: unknown[][], argIndex: number): Record<string, unknown> => {
const args = calls[0];
if (!Array.isArray(args)) {
throw new Error("Expected Telegram action call args");
}
const opts = args[argIndex];
if (!opts || typeof opts !== "object") {
throw new Error("Expected Telegram action options object");
}
return opts as Record<string, unknown>;
};
await handleTelegramAction(params as Record<string, unknown>, cfg, {
gatewayClientScopes: ["operator.write"],
});
const opts = assertCall(readCallOpts);
expect(opts.cfg).toBe(cfg);
expect(opts.gatewayClientScopes).toEqual(["operator.write"]);
},
);
it.each([
{
@@ -908,6 +916,7 @@ describe("handleTelegramAction", () => {
delivery: { pin: { enabled: true } },
},
telegramConfig(),
{ gatewayClientScopes: ["operator.write"] },
);
const call = mockCall(pinMessageTelegram, 0, "delivery pin");
@@ -916,6 +925,7 @@ describe("handleTelegramAction", () => {
const options = requireRecord(call[2], "delivery pin options");
expect(options.accountId).toBeUndefined();
expect(options.verbose).toBe(false);
expect(options.gatewayClientScopes).toEqual(["operator.write"]);
});
it("passes delivery pin notify requests for action sends", async () => {

View File

@@ -208,6 +208,7 @@ async function maybePinTelegramActionSend(params: {
accountId?: string;
to: string;
messageId?: string;
gatewayClientScopes?: readonly string[];
}) {
const pin = normalizeTelegramDeliveryPin(params.args);
if (!pin) {
@@ -225,6 +226,7 @@ async function maybePinTelegramActionSend(params: {
accountId: params.accountId,
notify: pin.notify,
verbose: false,
gatewayClientScopes: params.gatewayClientScopes,
});
} catch (err) {
if (pin.required) {
@@ -316,6 +318,7 @@ export async function handleTelegramAction(
token,
remove,
accountId: accountId ?? undefined,
gatewayClientScopes: options?.gatewayClientScopes,
},
);
} catch (err) {
@@ -424,6 +427,7 @@ export async function handleTelegramAction(
accountId: accountId ?? undefined,
to,
messageId: result.messageId,
gatewayClientScopes: options?.gatewayClientScopes,
});
return jsonResult({
ok: true,
@@ -524,6 +528,7 @@ export async function handleTelegramAction(
cfg,
token,
accountId: accountId ?? undefined,
gatewayClientScopes: options?.gatewayClientScopes,
});
if (!result.ok) {
return jsonResult({ ok: false, deleted: false, warning: result.warning });
@@ -570,6 +575,7 @@ export async function handleTelegramAction(
token,
accountId: accountId ?? undefined,
buttons,
gatewayClientScopes: options?.gatewayClientScopes,
},
);
return jsonResult({
@@ -606,6 +612,7 @@ export async function handleTelegramAction(
accountId: accountId ?? undefined,
replyToMessageId: replyToMessageId ?? undefined,
messageThreadId: messageThreadId ?? undefined,
gatewayClientScopes: options?.gatewayClientScopes,
});
notifyVisibleOutboundSuccess(to, messageThreadId);
return jsonResult({
@@ -661,6 +668,7 @@ export async function handleTelegramAction(
accountId: accountId ?? undefined,
iconColor,
iconCustomEmojiId: iconCustomEmojiId ?? undefined,
gatewayClientScopes: options?.gatewayClientScopes,
});
return jsonResult({
ok: true,
@@ -696,6 +704,7 @@ export async function handleTelegramAction(
accountId: accountId ?? undefined,
name: name ?? undefined,
iconCustomEmojiId: iconCustomEmojiId ?? undefined,
gatewayClientScopes: options?.gatewayClientScopes,
},
);
return jsonResult(result);

View File

@@ -154,6 +154,10 @@ export function createTelegramBotCore(
};
const updateTracker = createTelegramUpdateTracker({
initialUpdateId,
persistenceFloorUpdateId:
typeof opts.updateOffset?.persistenceFloorUpdateId === "number"
? opts.updateOffset.persistenceFloorUpdateId
: initialUpdateId,
ackPolicy: "after_agent_dispatch",
...(typeof opts.updateOffset?.onUpdateId === "function"
? { onAcceptedUpdateId: opts.updateOffset.onUpdateId }

View File

@@ -133,6 +133,38 @@ describe("createTelegramUpdateTracker", () => {
});
});
it("can keep a persistence floor while replaying older spooled updates", async () => {
const onAcceptedUpdateId = vi.fn();
const tracker = createTelegramUpdateTracker({
initialUpdateId: null,
persistenceFloorUpdateId: 42,
ackPolicy: "after_agent_dispatch",
onAcceptedUpdateId,
});
const oldPending = tracker.beginUpdate(updateCtx(42));
if (!oldPending.accepted) {
throw new Error("expected old spooled update to be accepted");
}
tracker.finishUpdate(oldPending.update, { completed: false });
const newer = tracker.beginUpdate(updateCtx(43));
if (!newer.accepted) {
throw new Error("expected newer update to be accepted");
}
tracker.finishUpdate(newer.update, { completed: true });
await flushTrackerMicrotasks();
expect(onAcceptedUpdateId).toHaveBeenCalledWith(43);
expectTrackerState(tracker.getState(), {
highestAcceptedUpdateId: 43,
highestPersistedAcceptedUpdateId: 43,
highestCompletedUpdateId: 43,
safeCompletedUpdateId: 43,
failedUpdateIds: [42],
} satisfies Partial<TelegramUpdateTrackerState>);
});
it("serializes and coalesces accepted offset persistence", async () => {
const firstWrite = deferred();
const secondWrite = deferred();

View File

@@ -14,6 +14,7 @@ type PersistUpdateId = (updateId: number) => void | Promise<void>;
type TelegramUpdateTrackerOptions = {
initialUpdateId?: number | null;
persistenceFloorUpdateId?: number | null;
ackPolicy?: MessageAckPolicy;
onAcceptedUpdateId?: PersistUpdateId;
onPersistError?: (error: unknown) => void;
@@ -56,6 +57,10 @@ function sortedIds(ids: Set<number>): number[] {
export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOptions = {}) {
const initialUpdateId =
typeof options.initialUpdateId === "number" ? options.initialUpdateId : null;
const persistenceFloorUpdateId =
typeof options.persistenceFloorUpdateId === "number"
? options.persistenceFloorUpdateId
: initialUpdateId;
const ackPolicy = options.ackPolicy ?? "after_receive_record";
const recentUpdates = createTelegramUpdateDedupe();
const pendingUpdateKeys = new Set<string>();
@@ -63,9 +68,9 @@ export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOption
const pendingUpdateIds = new Set<number>();
const failedUpdateIds = new Set<number>();
let highestAcceptedUpdateId: number | null = initialUpdateId;
let highestPersistedAcceptedUpdateId: number | null = initialUpdateId;
let highestPersistenceRequestedUpdateId: number | null = initialUpdateId;
let highestCompletedUpdateId: number | null = initialUpdateId;
let highestPersistedAcceptedUpdateId: number | null = persistenceFloorUpdateId;
let highestPersistenceRequestedUpdateId: number | null = persistenceFloorUpdateId;
let highestCompletedUpdateId: number | null = persistenceFloorUpdateId;
let persistInFlight = false;
let persistTargetUpdateId: number | null = null;
@@ -130,11 +135,17 @@ export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOption
}
let safeCompletedUpdateId = highestCompletedUpdateId;
for (const updateId of pendingUpdateIds) {
if (persistenceFloorUpdateId !== null && updateId <= persistenceFloorUpdateId) {
continue;
}
if (updateId <= safeCompletedUpdateId) {
safeCompletedUpdateId = updateId - 1;
}
}
for (const updateId of failedUpdateIds) {
if (persistenceFloorUpdateId !== null && updateId <= persistenceFloorUpdateId) {
continue;
}
if (updateId <= safeCompletedUpdateId) {
safeCompletedUpdateId = updateId - 1;
}

View File

@@ -23,6 +23,7 @@ export type TelegramBotOptions = {
minimumClientTimeoutSeconds?: number;
updateOffset?: {
lastUpdateId?: number | null;
persistenceFloorUpdateId?: number | null;
onUpdateId?: (updateId: number) => void | Promise<void>;
};
testTimings?: {

View File

@@ -242,6 +242,7 @@ function createPollingSession(params: {
log?: (message: string) => void;
telegramTransport?: ReturnType<typeof makeTelegramTransport>;
createTelegramTransport?: () => ReturnType<typeof makeTelegramTransport>;
getLastUpdateId?: () => number | null;
stallThresholdMs?: number;
setStatus?: (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
isolatedIngress?: ConstructorParameters<typeof TelegramPollingSession>[0]["isolatedIngress"];
@@ -254,7 +255,7 @@ function createPollingSession(params: {
proxyFetch: undefined,
abortSignal: params.abortSignal,
runnerOptions: {},
getLastUpdateId: () => null,
getLastUpdateId: params.getLastUpdateId ?? (() => null),
persistUpdateId: async () => undefined,
log: params.log ?? (() => undefined),
telegramTransport: params.telegramTransport,
@@ -587,6 +588,7 @@ describe("TelegramPollingSession", () => {
);
expect(mockObjectArg(createTelegramBotMock, "createTelegramBot").updateOffset).toEqual({
lastUpdateId: null,
persistenceFloorUpdateId: null,
onUpdateId: expect.any(Function),
});
expect(init).toHaveBeenCalledBefore(handleUpdate);
@@ -596,6 +598,70 @@ describe("TelegramPollingSession", () => {
}
});
it("drains existing isolated ingress spool entries below the persisted offset", async () => {
const abort = new AbortController();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
const handleUpdate = vi.fn(async () => undefined);
createTelegramBotMock.mockReturnValueOnce({
api: {
deleteWebhook: vi.fn(async () => true),
config: { use: vi.fn() },
},
init: vi.fn(async () => undefined),
handleUpdate,
stop: vi.fn(async () => undefined),
});
await writeTelegramSpooledUpdate({
spoolDir: tempDir,
update: { update_id: 42, message: { text: "pre-upgrade pending" } },
});
let stopWorker: (() => void) | undefined;
const workerDone = new Promise<void>((resolve) => {
stopWorker = resolve;
});
const createWorker = vi.fn(() => ({
onMessage: vi.fn(() => () => undefined),
stop: vi.fn(async () => {
stopWorker?.();
}),
task: vi.fn(async () => {
await workerDone;
}),
}));
try {
const session = createPollingSession({
abortSignal: abort.signal,
getLastUpdateId: () => 42,
isolatedIngress: {
enabled: true,
spoolDir: tempDir,
createWorker,
drainIntervalMs: 10,
},
});
const runPromise = session.runUntilAbort();
await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1));
await vi.waitFor(async () => expect(await fs.readdir(tempDir)).toEqual([]));
abort.abort();
await runPromise;
expect(createWorker).toHaveBeenCalledWith(expect.objectContaining({ initialUpdateId: 42 }));
expect(mockObjectArg(createTelegramBotMock, "createTelegramBot").updateOffset).toEqual({
lastUpdateId: null,
persistenceFloorUpdateId: 42,
onUpdateId: expect.any(Function),
});
expect(handleUpdate).toHaveBeenCalledWith({
update_id: 42,
message: { text: "pre-upgrade pending" },
});
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
}
});
it("drains Telegram delivery queue after isolated ingress reports poll success", async () => {
const abort = new AbortController();
const init = vi.fn(async () => undefined);

View File

@@ -279,8 +279,11 @@ export class TelegramPollingSession {
const fetchAbortController = new AbortController();
this.#activeFetchAbort = fetchAbortController;
const telegramTransport = this.#transportState.acquireForNextCycle();
const persistedLastUpdateId = this.opts.getLastUpdateId();
const lastUpdateId = this.opts.isolatedIngress?.enabled ? null : persistedLastUpdateId;
const updateOffset = {
lastUpdateId: this.opts.getLastUpdateId(),
lastUpdateId,
persistenceFloorUpdateId: persistedLastUpdateId,
onUpdateId: this.opts.persistUpdateId,
};
try {

View File

@@ -123,6 +123,7 @@ type TelegramReactionOpts = {
remove?: boolean;
verbose?: boolean;
retry?: RetryConfig;
gatewayClientScopes?: readonly string[];
};
type TelegramTypingOpts = {
@@ -1040,6 +1041,7 @@ export async function reactMessageTelegram(
lookupTarget: rawTarget,
persistTarget: rawTarget,
verbose: opts.verbose,
gatewayClientScopes: opts.gatewayClientScopes,
});
const messageId = normalizeMessageId(messageIdInput);
const requestWithDiag = createTelegramRequestWithDiag({
@@ -1080,6 +1082,7 @@ type TelegramDeleteOpts = {
verbose?: boolean;
api?: TelegramApiOverride;
retry?: RetryConfig;
gatewayClientScopes?: readonly string[];
};
export async function deleteMessageTelegram(
@@ -1095,6 +1098,7 @@ export async function deleteMessageTelegram(
lookupTarget: rawTarget,
persistTarget: rawTarget,
verbose: opts.verbose,
gatewayClientScopes: opts.gatewayClientScopes,
});
const messageId = normalizeMessageId(messageIdInput);
const requestWithDiag = createTelegramRequestWithDiag({
@@ -1136,6 +1140,7 @@ export async function pinMessageTelegram(
lookupTarget: rawTarget,
persistTarget: rawTarget,
verbose: opts.verbose,
gatewayClientScopes: opts.gatewayClientScopes,
});
const messageId = normalizeMessageId(messageIdInput);
const requestWithDiag = createTelegramRequestWithDiag({
@@ -1168,6 +1173,7 @@ export async function unpinMessageTelegram(
lookupTarget: rawTarget,
persistTarget: rawTarget,
verbose: opts.verbose,
gatewayClientScopes: opts.gatewayClientScopes,
});
const messageId = messageIdInput === undefined ? undefined : normalizeMessageId(messageIdInput);
const requestWithDiag = createTelegramRequestWithDiag({
@@ -1229,6 +1235,7 @@ export async function editForumTopicTelegram(
lookupTarget: target.chatId,
persistTarget: rawTarget,
verbose: opts.verbose,
gatewayClientScopes: opts.gatewayClientScopes,
});
const messageThreadId = normalizeMessageId(messageThreadIdInput);
const requestWithDiag = createTelegramRequestWithDiag({
@@ -1279,6 +1286,7 @@ type TelegramEditOpts = {
verbose?: boolean;
api?: TelegramApiOverride;
retry?: RetryConfig;
gatewayClientScopes?: readonly string[];
textMode?: "markdown" | "html";
/** Controls whether link previews are shown in the edited message. */
linkPreview?: boolean;
@@ -1294,6 +1302,7 @@ type TelegramEditReplyMarkupOpts = {
verbose?: boolean;
api?: TelegramApiOverride;
retry?: RetryConfig;
gatewayClientScopes?: readonly string[];
/** Inline keyboard buttons (reply markup). Pass empty array to remove buttons. */
buttons?: TelegramInlineButtons;
/** Resolved runtime config from the command or gateway boundary. */
@@ -1317,6 +1326,7 @@ export async function editMessageReplyMarkupTelegram(
lookupTarget: rawTarget,
persistTarget: rawTarget,
verbose: opts.verbose,
gatewayClientScopes: opts.gatewayClientScopes,
});
const messageId = normalizeMessageId(messageIdInput);
const requestWithDiag = createTelegramRequestWithDiag({
@@ -1360,6 +1370,7 @@ export async function editMessageTelegram(
lookupTarget: rawTarget,
persistTarget: rawTarget,
verbose: opts.verbose,
gatewayClientScopes: opts.gatewayClientScopes,
});
const messageId = normalizeMessageId(messageIdInput);
const requestWithDiag = createTelegramRequestWithDiag({
@@ -1463,6 +1474,7 @@ type TelegramStickerOpts = {
verbose?: boolean;
api?: TelegramApiOverride;
retry?: RetryConfig;
gatewayClientScopes?: readonly string[];
/** Message ID to reply to (for threading) */
replyToMessageId?: number;
/** Forum topic thread ID (for forum supergroups) */
@@ -1492,6 +1504,7 @@ export async function sendStickerTelegram(
lookupTarget: target.chatId,
persistTarget: to,
verbose: opts.verbose,
gatewayClientScopes: opts.gatewayClientScopes,
});
const threadParams = buildTelegramThreadReplyParams({
@@ -1664,6 +1677,7 @@ type TelegramCreateForumTopicOpts = {
api?: TelegramApiOverride;
verbose?: boolean;
retry?: RetryConfig;
gatewayClientScopes?: readonly string[];
/** Icon color for the topic (must be one of 0x6FB9F0, 0xFFD67E, 0xCB86DB, 0x8EEE98, 0xFF93B2, 0xFB6F5F). */
iconColor?: TelegramCreateForumTopicParams["icon_color"];
/** Custom emoji ID for the topic icon. */
@@ -1707,6 +1721,7 @@ export async function createForumTopicTelegram(
lookupTarget: target.chatId,
persistTarget: chatId,
verbose: opts.verbose,
gatewayClientScopes: opts.gatewayClientScopes,
});
const requestWithDiag = createTelegramNonIdempotentRequestWithDiag({