mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 17:00:50 +00:00
fix(telegram): defer replay commit until update succeeds
This commit is contained in:
@@ -2680,4 +2680,70 @@ describe("createTelegramBot", () => {
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("retries native command updates after a bubbled handler failure", async () => {
|
||||
loadConfig.mockReturnValue({
|
||||
commands: { native: true },
|
||||
channels: {
|
||||
telegram: {
|
||||
dmPolicy: "open",
|
||||
allowFrom: ["*"],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
createTelegramBot({ token: "tok" });
|
||||
const verboseHandler = commandSpy.mock.calls.find((call) => call[0] === "verbose")?.[1] as
|
||||
| ((ctx: Record<string, unknown>) => Promise<void>)
|
||||
| undefined;
|
||||
if (!verboseHandler) {
|
||||
throw new Error("verbose command handler missing");
|
||||
}
|
||||
|
||||
const middlewares = middlewareUseSpy.mock.calls
|
||||
.map((call) => call[0])
|
||||
.filter(
|
||||
(fn): fn is (ctx: Record<string, unknown>, next: () => Promise<void>) => Promise<void> =>
|
||||
typeof fn === "function",
|
||||
);
|
||||
const runMiddlewareChain = async (ctx: Record<string, unknown>) => {
|
||||
let idx = -1;
|
||||
const dispatch = async (i: number): Promise<void> => {
|
||||
if (i <= idx) {
|
||||
throw new Error("middleware dispatch called multiple times");
|
||||
}
|
||||
idx = i;
|
||||
const fn = middlewares[i];
|
||||
if (!fn) {
|
||||
await verboseHandler(ctx);
|
||||
return;
|
||||
}
|
||||
await fn(ctx, async () => dispatch(i + 1));
|
||||
};
|
||||
await dispatch(0);
|
||||
};
|
||||
|
||||
const ctx = {
|
||||
update: { update_id: 333 },
|
||||
message: {
|
||||
chat: { id: 12345, type: "private" },
|
||||
from: { id: 12345, username: "testuser" },
|
||||
text: "/verbose on",
|
||||
date: 1736380800,
|
||||
message_id: 42,
|
||||
},
|
||||
match: "on",
|
||||
};
|
||||
|
||||
const loadConfigCallsBeforeRetry = loadConfig.mock.calls.length;
|
||||
loadConfig.mockImplementationOnce(() => {
|
||||
throw new Error("cfg boom");
|
||||
});
|
||||
await expect(runMiddlewareChain(ctx)).rejects.toThrow("cfg boom");
|
||||
const loadConfigCallsAfterFailure = loadConfig.mock.calls.length;
|
||||
await runMiddlewareChain(ctx);
|
||||
|
||||
expect(loadConfigCallsAfterFailure).toBe(loadConfigCallsBeforeRetry + 1);
|
||||
expect(loadConfig.mock.calls.length).toBeGreaterThan(loadConfigCallsAfterFailure);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -256,6 +256,8 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance
|
||||
});
|
||||
|
||||
const recentUpdates = createTelegramUpdateDedupe();
|
||||
const pendingUpdateKeys = new Set<string>();
|
||||
const activeHandledUpdateKeys = new Map<string, boolean>();
|
||||
const initialUpdateId =
|
||||
typeof opts.updateOffset?.lastUpdateId === "number" ? opts.updateOffset.lastUpdateId : null;
|
||||
|
||||
@@ -308,6 +310,12 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance
|
||||
});
|
||||
};
|
||||
|
||||
const logSkippedUpdate = (key: string) => {
|
||||
if (shouldLogVerbose()) {
|
||||
logVerbose(`telegram dedupe: skipped ${key}`);
|
||||
}
|
||||
};
|
||||
|
||||
const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => {
|
||||
const updateId = resolveTelegramUpdateId(ctx);
|
||||
const skipCutoff = highestPersistedUpdateId ?? initialUpdateId;
|
||||
@@ -315,24 +323,55 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance
|
||||
return true;
|
||||
}
|
||||
const key = buildTelegramUpdateKey(ctx);
|
||||
if (!key) {
|
||||
return false;
|
||||
}
|
||||
const handled = activeHandledUpdateKeys.get(key);
|
||||
if (handled != null) {
|
||||
if (handled) {
|
||||
logSkippedUpdate(key);
|
||||
return true;
|
||||
}
|
||||
activeHandledUpdateKeys.set(key, true);
|
||||
return false;
|
||||
}
|
||||
const skipped = recentUpdates.check(key);
|
||||
if (skipped && key && shouldLogVerbose()) {
|
||||
logVerbose(`telegram dedupe: skipped ${key}`);
|
||||
if (skipped) {
|
||||
logSkippedUpdate(key);
|
||||
}
|
||||
return skipped;
|
||||
};
|
||||
|
||||
bot.use(async (ctx, next) => {
|
||||
const updateId = resolveTelegramUpdateId(ctx);
|
||||
const updateKey = buildTelegramUpdateKey(ctx);
|
||||
let completed = false;
|
||||
if (typeof updateId === "number") {
|
||||
failedUpdateIds.delete(updateId);
|
||||
pendingUpdateIds.add(updateId);
|
||||
}
|
||||
if (updateKey) {
|
||||
if (pendingUpdateKeys.has(updateKey) || recentUpdates.peek(updateKey)) {
|
||||
logSkippedUpdate(updateKey);
|
||||
if (typeof updateId === "number") {
|
||||
pendingUpdateIds.delete(updateId);
|
||||
}
|
||||
return;
|
||||
}
|
||||
pendingUpdateKeys.add(updateKey);
|
||||
activeHandledUpdateKeys.set(updateKey, false);
|
||||
}
|
||||
try {
|
||||
await next();
|
||||
completed = true;
|
||||
} finally {
|
||||
if (updateKey) {
|
||||
activeHandledUpdateKeys.delete(updateKey);
|
||||
if (completed) {
|
||||
recentUpdates.check(updateKey);
|
||||
}
|
||||
pendingUpdateKeys.delete(updateKey);
|
||||
}
|
||||
if (typeof updateId === "number") {
|
||||
pendingUpdateIds.delete(updateId);
|
||||
if (completed) {
|
||||
|
||||
Reference in New Issue
Block a user