fix(zalo): make replay retries explicit

This commit is contained in:
Vincent Koc
2026-04-13 15:36:35 +01:00
parent 2c22a15719
commit 785b9b1bc0
2 changed files with 120 additions and 31 deletions

View File

@@ -12,11 +12,15 @@ import {
postWebhookReplay,
} from "../test-support/lifecycle-test-support.js";
import { handleZaloWebhookRequest } from "./monitor.js";
import type { ZaloRuntimeEnv } from "./monitor.types.js";
import {
clearZaloWebhookSecurityStateForTest,
getZaloWebhookRateLimitStateSizeForTest,
getZaloWebhookStatusCounterSizeForTest,
handleZaloWebhookRequest as handleZaloWebhookRequestInternal,
registerZaloWebhookTarget,
type ZaloWebhookProcessUpdate,
ZaloRetryableWebhookError,
} from "./monitor.webhook.js";
import type { ResolvedZaloAccount } from "./types.js";
const DEFAULT_ACCOUNT: ResolvedZaloAccount = {
@@ -27,13 +31,19 @@ const DEFAULT_ACCOUNT: ResolvedZaloAccount = {
config: {},
};
const webhookRequestHandler: RequestListener = async (req, res) => {
const handled = await handleZaloWebhookRequest(req, res);
if (!handled) {
res.statusCode = 404;
res.end("not found");
}
};
function createWebhookRequestHandler(processUpdate?: ZaloWebhookProcessUpdate): RequestListener {
return async (req, res) => {
const handled = processUpdate
? await handleZaloWebhookRequestInternal(req, res, processUpdate)
: await handleZaloWebhookRequest(req, res);
if (!handled) {
res.statusCode = 404;
res.end("not found");
}
};
}
const webhookRequestHandler = createWebhookRequestHandler();
function registerTarget(params: {
path: string;
@@ -42,12 +52,13 @@ function registerTarget(params: {
account?: ResolvedZaloAccount;
config?: OpenClawConfig;
core?: PluginRuntime;
runtime?: Partial<ZaloRuntimeEnv>;
}): () => void {
return registerZaloWebhookTarget({
token: "tok",
account: params.account ?? DEFAULT_ACCOUNT,
config: params.config ?? ({} as OpenClawConfig),
runtime: {},
runtime: (params.runtime ?? {}) as ZaloRuntimeEnv,
core: params.core ?? ({} as PluginRuntime),
secret: params.secret ?? "secret",
path: params.path,
@@ -253,6 +264,55 @@ describe("handleZaloWebhookRequest", () => {
unregister();
}
});
it("allows a retry after processUpdate throws a retryable replay error", async () => {
const error = vi.fn();
const unregister = registerTarget({
path: "/hook-retry-after-failure",
runtime: { error },
});
const payload = createTextUpdate({
messageId: "msg-retry-after-failure-1",
userId: "123",
userName: "",
chatId: "123",
text: "hello",
});
let attempts = 0;
const processUpdate = vi.fn<ZaloWebhookProcessUpdate>(async () => {
attempts += 1;
if (attempts === 1) {
throw new ZaloRetryableWebhookError("boom");
}
});
try {
await withServer(createWebhookRequestHandler(processUpdate), async (baseUrl) => {
const first = await postWebhookJson({
baseUrl,
path: "/hook-retry-after-failure",
secret: "secret",
payload,
});
expect(first.status).toBe(200);
await vi.waitFor(() => expect(error).toHaveBeenCalledTimes(1));
const second = await postWebhookJson({
baseUrl,
path: "/hook-retry-after-failure",
secret: "secret",
payload,
});
expect(second.status).toBe(200);
await vi.waitFor(() => expect(processUpdate).toHaveBeenCalledTimes(2));
});
} finally {
unregister();
}
});
it("keeps replay dedupe isolated per authenticated target", async () => {
const sinkA = vi.fn();
const sinkB = vi.fn();

View File

@@ -1,10 +1,10 @@
import type { IncomingMessage, ServerResponse } from "node:http";
import { safeEqualSecret } from "openclaw/plugin-sdk/browser-security-runtime";
import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
import type { ResolvedZaloAccount } from "./accounts.js";
import type { ZaloFetch, ZaloUpdate } from "./api.js";
import type { ZaloRuntimeEnv } from "./monitor.types.js";
import {
createDedupeCache,
createFixedWindowRateLimiter,
createWebhookAnomalyTracker,
readJsonWebhookBodyOrReject,
@@ -47,9 +47,9 @@ const webhookRateLimiter = createFixedWindowRateLimiter({
maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests,
maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys,
});
const recentWebhookEvents = createDedupeCache({
const recentWebhookEvents = createClaimableDedupe({
ttlMs: ZALO_WEBHOOK_REPLAY_WINDOW_MS,
maxSize: 5000,
memoryMaxSize: 5000,
});
const webhookAnomalyTracker = createWebhookAnomalyTracker({
maxTrackedKeys: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.maxTrackedKeys,
@@ -59,7 +59,7 @@ const webhookAnomalyTracker = createWebhookAnomalyTracker({
export function clearZaloWebhookSecurityStateForTest(): void {
webhookRateLimiter.clear();
recentWebhookEvents.clear();
recentWebhookEvents.clearMemory();
webhookAnomalyTracker.clear();
}
@@ -75,11 +75,11 @@ function timingSafeEquals(left: string, right: string): boolean {
return safeEqualSecret(left, right);
}
function buildReplayEventCacheKey(
target: ZaloWebhookTarget,
update: ZaloUpdate,
messageId: string,
): string {
function buildReplayEventCacheKey(target: ZaloWebhookTarget, update: ZaloUpdate): string | null {
const messageId = update.message?.message_id;
if (!messageId) {
return null;
}
const chatId = update.message?.chat?.id ?? "";
const senderId = update.message?.from?.id ?? "";
return JSON.stringify([
@@ -92,13 +92,44 @@ function buildReplayEventCacheKey(
]);
}
function isReplayEvent(target: ZaloWebhookTarget, update: ZaloUpdate, nowMs: number): boolean {
const messageId = update.message?.message_id;
if (!messageId) {
return false;
export class ZaloRetryableWebhookError extends Error {
constructor(message: string, options?: ErrorOptions) {
super(message, options);
this.name = "ZaloRetryableWebhookError";
}
}
export async function processZaloReplayGuardedUpdate(params: {
target: ZaloWebhookTarget;
update: ZaloUpdate;
processUpdate: ZaloWebhookProcessUpdate;
nowMs?: number;
}): Promise<"processed" | "duplicate"> {
const replayEventKey = buildReplayEventCacheKey(params.target, params.update);
if (replayEventKey) {
const replayClaim = await recentWebhookEvents.claim(replayEventKey, { now: params.nowMs });
if (replayClaim.kind !== "claimed") {
return "duplicate";
}
}
params.target.statusSink?.({ lastInboundAt: Date.now() });
try {
await params.processUpdate({ update: params.update, target: params.target });
if (replayEventKey) {
await recentWebhookEvents.commit(replayEventKey);
}
return "processed";
} catch (error) {
if (replayEventKey) {
if (error instanceof ZaloRetryableWebhookError) {
recentWebhookEvents.release(replayEventKey, { error });
} else {
await recentWebhookEvents.commit(replayEventKey);
}
}
throw error;
}
const key = buildReplayEventCacheKey(target, update, messageId);
return recentWebhookEvents.check(key, nowMs);
}
function recordWebhookStatus(
@@ -227,14 +258,12 @@ export async function handleZaloWebhookRequest(
return true;
}
if (isReplayEvent(target, update, nowMs)) {
res.statusCode = 200;
res.end("ok");
return true;
}
target.statusSink?.({ lastInboundAt: Date.now() });
processUpdate({ update, target }).catch((err) => {
void processZaloReplayGuardedUpdate({
target,
update,
processUpdate,
nowMs,
}).catch((err) => {
target.runtime.error?.(`[${target.account.accountId}] Zalo webhook failed: ${String(err)}`);
});