mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 12:30:44 +00:00
fix(line): make webhook replay retries explicit
This commit is contained in:
@@ -237,6 +237,7 @@ vi.mock("./bot-message-context.js", () => ({
|
||||
|
||||
let handleLineWebhookEvents: typeof import("./bot-handlers.js").handleLineWebhookEvents;
|
||||
let createLineWebhookReplayCache: typeof import("./bot-handlers.js").createLineWebhookReplayCache;
|
||||
let LineRetryableWebhookError: typeof import("./bot-handlers.js").LineRetryableWebhookError;
|
||||
type LineWebhookContext = Parameters<typeof import("./bot-handlers.js").handleLineWebhookEvents>[1];
|
||||
|
||||
const createRuntime = () => ({ log: vi.fn(), error: vi.fn(), exit: vi.fn() });
|
||||
@@ -367,7 +368,8 @@ async function startInflightReplayDuplicate(params: {
|
||||
|
||||
describe("handleLineWebhookEvents", () => {
|
||||
beforeAll(async () => {
|
||||
({ handleLineWebhookEvents, createLineWebhookReplayCache } = await import("./bot-handlers.js"));
|
||||
({ handleLineWebhookEvents, createLineWebhookReplayCache, LineRetryableWebhookError } =
|
||||
await import("./bot-handlers.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
@@ -733,7 +735,7 @@ describe("handleLineWebhookEvents", () => {
|
||||
expect(processMessage).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("mirrors in-flight replay failures so concurrent duplicates also fail", async () => {
|
||||
it("mirrors in-flight retryable replay failures so concurrent duplicates also fail", async () => {
|
||||
let rejectFirst: ((err: Error) => void) | undefined;
|
||||
const firstDone = new Promise<void>((_, reject) => {
|
||||
rejectFirst = reject;
|
||||
@@ -751,7 +753,7 @@ describe("handleLineWebhookEvents", () => {
|
||||
const { firstRun, secondRun } = await startInflightReplayDuplicate({ event, processMessage });
|
||||
const firstFailure = expect(firstRun).rejects.toThrow("transient inflight failure");
|
||||
const secondFailure = expect(secondRun).rejects.toThrow("transient inflight failure");
|
||||
rejectFirst?.(new Error("transient inflight failure"));
|
||||
rejectFirst?.(new LineRetryableWebhookError("transient inflight failure"));
|
||||
|
||||
await Promise.all([firstFailure, secondFailure]);
|
||||
expect(processMessage).toHaveBeenCalledTimes(1);
|
||||
@@ -1039,7 +1041,7 @@ describe("handleLineWebhookEvents", () => {
|
||||
expect(processMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not mark replay cache when event processing fails", async () => {
|
||||
it("keeps replay cache committed after a non-retryable event failure", async () => {
|
||||
const processMessage = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(new Error("transient failure"))
|
||||
@@ -1056,10 +1058,31 @@ describe("handleLineWebhookEvents", () => {
|
||||
await expect(handleLineWebhookEvents([event], context)).rejects.toThrow("transient failure");
|
||||
await handleLineWebhookEvents([event], context);
|
||||
|
||||
expect(buildLineMessageContextMock).toHaveBeenCalledTimes(2);
|
||||
expect(processMessage).toHaveBeenCalledTimes(2);
|
||||
expect(buildLineMessageContextMock).toHaveBeenCalledTimes(1);
|
||||
expect(processMessage).toHaveBeenCalledTimes(1);
|
||||
expect(context.runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("line: event handler failed: Error: transient failure"),
|
||||
);
|
||||
});
|
||||
|
||||
it("reopens replay after an explicit retryable event failure", async () => {
|
||||
const processMessage = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(new LineRetryableWebhookError("retry me"))
|
||||
.mockResolvedValueOnce(undefined);
|
||||
const event = createReplayMessageEvent({
|
||||
messageId: "m-fail-then-retryable",
|
||||
groupId: "group-retry",
|
||||
userId: "user-retry",
|
||||
webhookEventId: "evt-fail-then-retryable",
|
||||
isRedelivery: false,
|
||||
});
|
||||
const context = createOpenGroupReplayContext(processMessage, createLineWebhookReplayCache());
|
||||
|
||||
await expect(handleLineWebhookEvents([event], context)).rejects.toThrow("retry me");
|
||||
await handleLineWebhookEvents([event], context);
|
||||
|
||||
expect(buildLineMessageContextMock).toHaveBeenCalledTimes(2);
|
||||
expect(processMessage).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -87,6 +87,13 @@ const LINE_WEBHOOK_REPLAY_WINDOW_MS = 10 * 60 * 1000;
|
||||
const LINE_WEBHOOK_REPLAY_MAX_ENTRIES = 4096;
|
||||
export type LineWebhookReplayCache = ClaimableDedupe;
|
||||
|
||||
export class LineRetryableWebhookError extends Error {
|
||||
constructor(message: string, options?: ErrorOptions) {
|
||||
super(message, options);
|
||||
this.name = "LineRetryableWebhookError";
|
||||
}
|
||||
}
|
||||
|
||||
export function createLineWebhookReplayCache(): LineWebhookReplayCache {
|
||||
return createClaimableDedupe({
|
||||
ttlMs: LINE_WEBHOOK_REPLAY_WINDOW_MS,
|
||||
@@ -617,7 +624,11 @@ export async function handleLineWebhookEvents(
|
||||
}
|
||||
} catch (err) {
|
||||
if (replayCandidate) {
|
||||
replayCandidate.cache.release(replayCandidate.key, { error: err });
|
||||
if (err instanceof LineRetryableWebhookError) {
|
||||
replayCandidate.cache.release(replayCandidate.key, { error: err });
|
||||
} else {
|
||||
await replayCandidate.cache.commit(replayCandidate.key);
|
||||
}
|
||||
}
|
||||
context.runtime.error?.(danger(`line: event handler failed: ${String(err)}`));
|
||||
firstError ??= err;
|
||||
|
||||
Reference in New Issue
Block a user