From f881f086bb75efb171de0372951bf9913e3e2668 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 13 Apr 2026 16:05:50 +0100 Subject: [PATCH] fix(line): make webhook replay retries explicit --- extensions/line/src/bot-handlers.test.ts | 35 ++++++++++++++++++++---- extensions/line/src/bot-handlers.ts | 13 ++++++++- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/extensions/line/src/bot-handlers.test.ts b/extensions/line/src/bot-handlers.test.ts index b9122666d9a..71a0221f537 100644 --- a/extensions/line/src/bot-handlers.test.ts +++ b/extensions/line/src/bot-handlers.test.ts @@ -237,6 +237,7 @@ vi.mock("./bot-message-context.js", () => ({ let handleLineWebhookEvents: typeof import("./bot-handlers.js").handleLineWebhookEvents; let createLineWebhookReplayCache: typeof import("./bot-handlers.js").createLineWebhookReplayCache; +let LineRetryableWebhookError: typeof import("./bot-handlers.js").LineRetryableWebhookError; type LineWebhookContext = Parameters[1]; const createRuntime = () => ({ log: vi.fn(), error: vi.fn(), exit: vi.fn() }); @@ -367,7 +368,8 @@ async function startInflightReplayDuplicate(params: { describe("handleLineWebhookEvents", () => { beforeAll(async () => { - ({ handleLineWebhookEvents, createLineWebhookReplayCache } = await import("./bot-handlers.js")); + ({ handleLineWebhookEvents, createLineWebhookReplayCache, LineRetryableWebhookError } = + await import("./bot-handlers.js")); }); beforeEach(() => { @@ -733,7 +735,7 @@ describe("handleLineWebhookEvents", () => { expect(processMessage).toHaveBeenCalledTimes(1); }); - it("mirrors in-flight replay failures so concurrent duplicates also fail", async () => { + it("mirrors in-flight retryable replay failures so concurrent duplicates also fail", async () => { let rejectFirst: ((err: Error) => void) | undefined; const firstDone = new Promise((_, reject) => { rejectFirst = reject; @@ -751,7 +753,7 @@ describe("handleLineWebhookEvents", () => { const { firstRun, secondRun } = await startInflightReplayDuplicate({ event, processMessage }); const firstFailure = expect(firstRun).rejects.toThrow("transient inflight failure"); const secondFailure = expect(secondRun).rejects.toThrow("transient inflight failure"); - rejectFirst?.(new Error("transient inflight failure")); + rejectFirst?.(new LineRetryableWebhookError("transient inflight failure")); await Promise.all([firstFailure, secondFailure]); expect(processMessage).toHaveBeenCalledTimes(1); @@ -1039,7 +1041,7 @@ describe("handleLineWebhookEvents", () => { expect(processMessage).not.toHaveBeenCalled(); }); - it("does not mark replay cache when event processing fails", async () => { + it("keeps replay cache committed after a non-retryable event failure", async () => { const processMessage = vi .fn() .mockRejectedValueOnce(new Error("transient failure")) @@ -1056,10 +1058,31 @@ describe("handleLineWebhookEvents", () => { await expect(handleLineWebhookEvents([event], context)).rejects.toThrow("transient failure"); await handleLineWebhookEvents([event], context); - expect(buildLineMessageContextMock).toHaveBeenCalledTimes(2); - expect(processMessage).toHaveBeenCalledTimes(2); + expect(buildLineMessageContextMock).toHaveBeenCalledTimes(1); + expect(processMessage).toHaveBeenCalledTimes(1); expect(context.runtime.error).toHaveBeenCalledWith( expect.stringContaining("line: event handler failed: Error: transient failure"), ); }); + + it("reopens replay after an explicit retryable event failure", async () => { + const processMessage = vi + .fn() + .mockRejectedValueOnce(new LineRetryableWebhookError("retry me")) + .mockResolvedValueOnce(undefined); + const event = createReplayMessageEvent({ + messageId: "m-fail-then-retryable", + groupId: "group-retry", + userId: "user-retry", + webhookEventId: "evt-fail-then-retryable", + isRedelivery: false, + }); + const context = createOpenGroupReplayContext(processMessage, createLineWebhookReplayCache()); + + await expect(handleLineWebhookEvents([event], context)).rejects.toThrow("retry me"); + await handleLineWebhookEvents([event], context); + + expect(buildLineMessageContextMock).toHaveBeenCalledTimes(2); + expect(processMessage).toHaveBeenCalledTimes(2); + }); }); diff --git a/extensions/line/src/bot-handlers.ts b/extensions/line/src/bot-handlers.ts index 98917868ab0..df58515b6ff 100644 --- a/extensions/line/src/bot-handlers.ts +++ b/extensions/line/src/bot-handlers.ts @@ -87,6 +87,13 @@ const LINE_WEBHOOK_REPLAY_WINDOW_MS = 10 * 60 * 1000; const LINE_WEBHOOK_REPLAY_MAX_ENTRIES = 4096; export type LineWebhookReplayCache = ClaimableDedupe; +export class LineRetryableWebhookError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "LineRetryableWebhookError"; + } +} + export function createLineWebhookReplayCache(): LineWebhookReplayCache { return createClaimableDedupe({ ttlMs: LINE_WEBHOOK_REPLAY_WINDOW_MS, @@ -617,7 +624,11 @@ export async function handleLineWebhookEvents( } } catch (err) { if (replayCandidate) { - replayCandidate.cache.release(replayCandidate.key, { error: err }); + if (err instanceof LineRetryableWebhookError) { + replayCandidate.cache.release(replayCandidate.key, { error: err }); + } else { + await replayCandidate.cache.commit(replayCandidate.key); + } } context.runtime.error?.(danger(`line: event handler failed: ${String(err)}`)); firstError ??= err;