fix(telegram): swallow update watermark persistence failures

This commit is contained in:
Vincent Koc
2026-04-13 16:31:01 +01:00
parent 2bc031c357
commit 9fc36837b4
2 changed files with 78 additions and 2 deletions

View File

@@ -977,6 +977,78 @@ describe("createTelegramBot", () => {
persistedAfterDrain.length > 0 ? Math.max(...persistedAfterDrain) : -Infinity;
expect(maxPersistedAfterDrain).toBe(102);
});
it("logs and swallows update watermark persistence failures", async () => {
sequentializeSpy.mockImplementationOnce(
() => async (_ctx: unknown, next: () => Promise<void>) => {
await next();
},
);
const onUpdateId = vi.fn().mockRejectedValueOnce(new Error("disk boom"));
const runtime = {
log: vi.fn(),
error: vi.fn(),
writeStdout: vi.fn(),
writeJson: vi.fn(),
exit: vi.fn(),
};
createTelegramBot({
token: "tok",
runtime,
updateOffset: {
lastUpdateId: 13_099,
onUpdateId,
},
});
type Middleware = (
ctx: Record<string, unknown>,
next: () => Promise<void>,
) => Promise<void> | void;
const middlewares = middlewareUseSpy.mock.calls
.map((call) => call[0])
.filter((fn): fn is Middleware => typeof fn === "function");
const runMiddlewareChain = async (
ctx: Record<string, unknown>,
finalNext: () => Promise<void>,
) => {
let idx = -1;
const dispatch = async (i: number): Promise<void> => {
if (i <= idx) {
throw new Error("middleware dispatch called multiple times");
}
idx = i;
const fn = middlewares[i];
if (!fn) {
await finalNext();
return;
}
await fn(ctx, async () => dispatch(i + 1));
};
await dispatch(0);
};
const unhandled: unknown[] = [];
const onUnhandledRejection = (reason: unknown) => {
unhandled.push(reason);
};
process.on("unhandledRejection", onUnhandledRejection);
try {
await runMiddlewareChain({ update: { update_id: 13_100 } }, async () => {});
await vi.waitFor(() => {
expect(onUpdateId).toHaveBeenCalledWith(13_100);
});
expect(unhandled).toEqual([]);
} finally {
process.off("unhandledRejection", onUnhandledRejection);
}
});
it("allows distinct callback_query ids without update_id", async () => {
loadConfig.mockReturnValue({
channels: {

View File

@@ -12,7 +12,7 @@ import {
resolveThreadBindingMaxAgeMsForChannel,
resolveThreadBindingSpawnPolicy,
} from "openclaw/plugin-sdk/conversation-runtime";
import { formatUncaughtError } from "openclaw/plugin-sdk/error-runtime";
import { formatErrorMessage, formatUncaughtError } from "openclaw/plugin-sdk/error-runtime";
import { resolveTextChunkLimit } from "openclaw/plugin-sdk/reply-chunking";
import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "openclaw/plugin-sdk/reply-history";
import { danger, logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env";
@@ -289,7 +289,11 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance
return;
}
highestPersistedUpdateId = safe;
void opts.updateOffset.onUpdateId(safe);
void Promise.resolve()
.then(() => opts.updateOffset?.onUpdateId?.(safe))
.catch((err) => {
runtime.error?.(`telegram: failed to persist update watermark: ${formatErrorMessage(err)}`);
});
};
const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => {