From 785b9b1bc0b6857d369a36b066b3117ec88322ce Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 13 Apr 2026 15:36:35 +0100 Subject: [PATCH] fix(zalo): make replay retries explicit --- extensions/zalo/src/monitor.webhook.test.ts | 76 ++++++++++++++++++--- extensions/zalo/src/monitor.webhook.ts | 75 +++++++++++++------- 2 files changed, 120 insertions(+), 31 deletions(-) diff --git a/extensions/zalo/src/monitor.webhook.test.ts b/extensions/zalo/src/monitor.webhook.test.ts index 27ac082124e..d7f02064fe1 100644 --- a/extensions/zalo/src/monitor.webhook.test.ts +++ b/extensions/zalo/src/monitor.webhook.test.ts @@ -12,11 +12,15 @@ import { postWebhookReplay, } from "../test-support/lifecycle-test-support.js"; import { handleZaloWebhookRequest } from "./monitor.js"; +import type { ZaloRuntimeEnv } from "./monitor.types.js"; import { clearZaloWebhookSecurityStateForTest, getZaloWebhookRateLimitStateSizeForTest, getZaloWebhookStatusCounterSizeForTest, + handleZaloWebhookRequest as handleZaloWebhookRequestInternal, registerZaloWebhookTarget, + type ZaloWebhookProcessUpdate, + ZaloRetryableWebhookError, } from "./monitor.webhook.js"; import type { ResolvedZaloAccount } from "./types.js"; const DEFAULT_ACCOUNT: ResolvedZaloAccount = { @@ -27,13 +31,19 @@ const DEFAULT_ACCOUNT: ResolvedZaloAccount = { config: {}, }; -const webhookRequestHandler: RequestListener = async (req, res) => { - const handled = await handleZaloWebhookRequest(req, res); - if (!handled) { - res.statusCode = 404; - res.end("not found"); - } -}; +function createWebhookRequestHandler(processUpdate?: ZaloWebhookProcessUpdate): RequestListener { + return async (req, res) => { + const handled = processUpdate + ? await handleZaloWebhookRequestInternal(req, res, processUpdate) + : await handleZaloWebhookRequest(req, res); + if (!handled) { + res.statusCode = 404; + res.end("not found"); + } + }; +} + +const webhookRequestHandler = createWebhookRequestHandler(); function registerTarget(params: { path: string; @@ -42,12 +52,13 @@ function registerTarget(params: { account?: ResolvedZaloAccount; config?: OpenClawConfig; core?: PluginRuntime; + runtime?: Partial; }): () => void { return registerZaloWebhookTarget({ token: "tok", account: params.account ?? DEFAULT_ACCOUNT, config: params.config ?? ({} as OpenClawConfig), - runtime: {}, + runtime: (params.runtime ?? {}) as ZaloRuntimeEnv, core: params.core ?? ({} as PluginRuntime), secret: params.secret ?? "secret", path: params.path, @@ -253,6 +264,55 @@ describe("handleZaloWebhookRequest", () => { unregister(); } }); + + it("allows a retry after processUpdate throws a retryable replay error", async () => { + const error = vi.fn(); + const unregister = registerTarget({ + path: "/hook-retry-after-failure", + runtime: { error }, + }); + const payload = createTextUpdate({ + messageId: "msg-retry-after-failure-1", + userId: "123", + userName: "", + chatId: "123", + text: "hello", + }); + let attempts = 0; + const processUpdate = vi.fn(async () => { + attempts += 1; + if (attempts === 1) { + throw new ZaloRetryableWebhookError("boom"); + } + }); + + try { + await withServer(createWebhookRequestHandler(processUpdate), async (baseUrl) => { + const first = await postWebhookJson({ + baseUrl, + path: "/hook-retry-after-failure", + secret: "secret", + payload, + }); + + expect(first.status).toBe(200); + await vi.waitFor(() => expect(error).toHaveBeenCalledTimes(1)); + + const second = await postWebhookJson({ + baseUrl, + path: "/hook-retry-after-failure", + secret: "secret", + payload, + }); + + expect(second.status).toBe(200); + await vi.waitFor(() => expect(processUpdate).toHaveBeenCalledTimes(2)); + }); + } finally { + unregister(); + } + }); + it("keeps replay dedupe isolated per authenticated target", async () => { const sinkA = vi.fn(); const sinkB = vi.fn(); diff --git a/extensions/zalo/src/monitor.webhook.ts b/extensions/zalo/src/monitor.webhook.ts index de7d75094de..45ab6696869 100644 --- a/extensions/zalo/src/monitor.webhook.ts +++ b/extensions/zalo/src/monitor.webhook.ts @@ -1,10 +1,10 @@ import type { IncomingMessage, ServerResponse } from "node:http"; import { safeEqualSecret } from "openclaw/plugin-sdk/browser-security-runtime"; +import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; import type { ResolvedZaloAccount } from "./accounts.js"; import type { ZaloFetch, ZaloUpdate } from "./api.js"; import type { ZaloRuntimeEnv } from "./monitor.types.js"; import { - createDedupeCache, createFixedWindowRateLimiter, createWebhookAnomalyTracker, readJsonWebhookBodyOrReject, @@ -47,9 +47,9 @@ const webhookRateLimiter = createFixedWindowRateLimiter({ maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests, maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys, }); -const recentWebhookEvents = createDedupeCache({ +const recentWebhookEvents = createClaimableDedupe({ ttlMs: ZALO_WEBHOOK_REPLAY_WINDOW_MS, - maxSize: 5000, + memoryMaxSize: 5000, }); const webhookAnomalyTracker = createWebhookAnomalyTracker({ maxTrackedKeys: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.maxTrackedKeys, @@ -59,7 +59,7 @@ const webhookAnomalyTracker = createWebhookAnomalyTracker({ export function clearZaloWebhookSecurityStateForTest(): void { webhookRateLimiter.clear(); - recentWebhookEvents.clear(); + recentWebhookEvents.clearMemory(); webhookAnomalyTracker.clear(); } @@ -75,11 +75,11 @@ function timingSafeEquals(left: string, right: string): boolean { return safeEqualSecret(left, right); } -function buildReplayEventCacheKey( - target: ZaloWebhookTarget, - update: ZaloUpdate, - messageId: string, -): string { +function buildReplayEventCacheKey(target: ZaloWebhookTarget, update: ZaloUpdate): string | null { + const messageId = update.message?.message_id; + if (!messageId) { + return null; + } const chatId = update.message?.chat?.id ?? ""; const senderId = update.message?.from?.id ?? ""; return JSON.stringify([ @@ -92,13 +92,44 @@ function buildReplayEventCacheKey( ]); } -function isReplayEvent(target: ZaloWebhookTarget, update: ZaloUpdate, nowMs: number): boolean { - const messageId = update.message?.message_id; - if (!messageId) { - return false; +export class ZaloRetryableWebhookError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "ZaloRetryableWebhookError"; + } +} + +export async function processZaloReplayGuardedUpdate(params: { + target: ZaloWebhookTarget; + update: ZaloUpdate; + processUpdate: ZaloWebhookProcessUpdate; + nowMs?: number; +}): Promise<"processed" | "duplicate"> { + const replayEventKey = buildReplayEventCacheKey(params.target, params.update); + if (replayEventKey) { + const replayClaim = await recentWebhookEvents.claim(replayEventKey, { now: params.nowMs }); + if (replayClaim.kind !== "claimed") { + return "duplicate"; + } + } + + params.target.statusSink?.({ lastInboundAt: Date.now() }); + try { + await params.processUpdate({ update: params.update, target: params.target }); + if (replayEventKey) { + await recentWebhookEvents.commit(replayEventKey); + } + return "processed"; + } catch (error) { + if (replayEventKey) { + if (error instanceof ZaloRetryableWebhookError) { + recentWebhookEvents.release(replayEventKey, { error }); + } else { + await recentWebhookEvents.commit(replayEventKey); + } + } + throw error; } - const key = buildReplayEventCacheKey(target, update, messageId); - return recentWebhookEvents.check(key, nowMs); } function recordWebhookStatus( @@ -227,14 +258,12 @@ export async function handleZaloWebhookRequest( return true; } - if (isReplayEvent(target, update, nowMs)) { - res.statusCode = 200; - res.end("ok"); - return true; - } - - target.statusSink?.({ lastInboundAt: Date.now() }); - processUpdate({ update, target }).catch((err) => { + void processZaloReplayGuardedUpdate({ + target, + update, + processUpdate, + nowMs, + }).catch((err) => { target.runtime.error?.(`[${target.account.accountId}] Zalo webhook failed: ${String(err)}`); });