From 4b61779a463fa9da0720eb7c59a80e7ebec6f943 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 7 Mar 2026 21:56:47 +0000 Subject: [PATCH] refactor: unify extension webhook request lifecycle scaffolding --- extensions/bluebubbles/src/monitor.ts | 282 +++++++++---------- extensions/googlechat/src/monitor-webhook.ts | 185 ++++++------ extensions/zalo/src/monitor.webhook.ts | 165 +++++------ src/plugin-sdk/bluebubbles.ts | 1 + src/plugin-sdk/googlechat.ts | 1 + src/plugin-sdk/index.ts | 1 + src/plugin-sdk/webhook-targets.test.ts | 74 +++++ src/plugin-sdk/webhook-targets.ts | 54 ++++ src/plugin-sdk/zalo.ts | 2 + 9 files changed, 429 insertions(+), 336 deletions(-) diff --git a/extensions/bluebubbles/src/monitor.ts b/extensions/bluebubbles/src/monitor.ts index 8c7aa9e17c0..1dc503e5340 100644 --- a/extensions/bluebubbles/src/monitor.ts +++ b/extensions/bluebubbles/src/monitor.ts @@ -1,12 +1,11 @@ import { timingSafeEqual } from "node:crypto"; import type { IncomingMessage, ServerResponse } from "node:http"; import { - beginWebhookRequestPipelineOrReject, createWebhookInFlightLimiter, registerWebhookTargetWithPluginRoute, readWebhookBodyOrReject, resolveWebhookTargetWithAuthOrRejectSync, - resolveWebhookTargets, + withResolvedWebhookRequestPipeline, } from "openclaw/plugin-sdk/bluebubbles"; import { createBlueBubblesDebounceRegistry } from "./monitor-debounce.js"; import { normalizeWebhookMessage, normalizeWebhookReaction } from "./monitor-normalize.js"; @@ -122,156 +121,145 @@ export async function handleBlueBubblesWebhookRequest( req: IncomingMessage, res: ServerResponse, ): Promise { - const resolved = resolveWebhookTargets(req, webhookTargets); - if (!resolved) { - return false; - } - const { path, targets } = resolved; - const url = new URL(req.url ?? "/", "http://localhost"); - const requestLifecycle = beginWebhookRequestPipelineOrReject({ + return await withResolvedWebhookRequestPipeline({ req, res, + targetsByPath: webhookTargets, allowMethods: ["POST"], inFlightLimiter: webhookInFlightLimiter, - inFlightKey: `${path}:${req.socket.remoteAddress ?? "unknown"}`, + handle: async ({ path, targets }) => { + const url = new URL(req.url ?? "/", "http://localhost"); + const guidParam = url.searchParams.get("guid") ?? url.searchParams.get("password"); + const headerToken = + req.headers["x-guid"] ?? + req.headers["x-password"] ?? + req.headers["x-bluebubbles-guid"] ?? + req.headers["authorization"]; + const guid = (Array.isArray(headerToken) ? headerToken[0] : headerToken) ?? guidParam ?? ""; + const target = resolveWebhookTargetWithAuthOrRejectSync({ + targets, + res, + isMatch: (target) => { + const token = target.account.config.password?.trim() ?? ""; + return safeEqualSecret(guid, token); + }, + }); + if (!target) { + console.warn( + `[bluebubbles] webhook rejected: status=${res.statusCode} path=${path} guid=${maskSecret(url.searchParams.get("guid") ?? url.searchParams.get("password") ?? "")}`, + ); + return true; + } + const body = await readWebhookBodyOrReject({ + req, + res, + profile: "post-auth", + invalidBodyMessage: "invalid payload", + }); + if (!body.ok) { + console.warn(`[bluebubbles] webhook rejected: status=${res.statusCode}`); + return true; + } + + const parsed = parseBlueBubblesWebhookPayload(body.value); + if (!parsed.ok) { + res.statusCode = 400; + res.end(parsed.error); + console.warn(`[bluebubbles] webhook rejected: ${parsed.error}`); + return true; + } + + const payload = asRecord(parsed.value) ?? {}; + const firstTarget = targets[0]; + if (firstTarget) { + logVerbose( + firstTarget.core, + firstTarget.runtime, + `webhook received path=${path} keys=${Object.keys(payload).join(",") || "none"}`, + ); + } + const eventTypeRaw = payload.type; + const eventType = typeof eventTypeRaw === "string" ? eventTypeRaw.trim() : ""; + const allowedEventTypes = new Set([ + "new-message", + "updated-message", + "message-reaction", + "reaction", + ]); + if (eventType && !allowedEventTypes.has(eventType)) { + res.statusCode = 200; + res.end("ok"); + if (firstTarget) { + logVerbose(firstTarget.core, firstTarget.runtime, `webhook ignored type=${eventType}`); + } + return true; + } + const reaction = normalizeWebhookReaction(payload); + if ( + (eventType === "updated-message" || + eventType === "message-reaction" || + eventType === "reaction") && + !reaction + ) { + res.statusCode = 200; + res.end("ok"); + if (firstTarget) { + logVerbose( + firstTarget.core, + firstTarget.runtime, + `webhook ignored ${eventType || "event"} without reaction`, + ); + } + return true; + } + const message = reaction ? null : normalizeWebhookMessage(payload); + if (!message && !reaction) { + res.statusCode = 400; + res.end("invalid payload"); + console.warn("[bluebubbles] webhook rejected: unable to parse message payload"); + return true; + } + + target.statusSink?.({ lastInboundAt: Date.now() }); + if (reaction) { + processReaction(reaction, target).catch((err) => { + target.runtime.error?.( + `[${target.account.accountId}] BlueBubbles reaction failed: ${String(err)}`, + ); + }); + } else if (message) { + // Route messages through debouncer to coalesce rapid-fire events + // (e.g., text message + URL balloon arriving as separate webhooks) + const debouncer = debounceRegistry.getOrCreateDebouncer(target); + debouncer.enqueue({ message, target }).catch((err) => { + target.runtime.error?.( + `[${target.account.accountId}] BlueBubbles webhook failed: ${String(err)}`, + ); + }); + } + + res.statusCode = 200; + res.end("ok"); + if (reaction) { + if (firstTarget) { + logVerbose( + firstTarget.core, + firstTarget.runtime, + `webhook accepted reaction sender=${reaction.senderId} msg=${reaction.messageId} action=${reaction.action}`, + ); + } + } else if (message) { + if (firstTarget) { + logVerbose( + firstTarget.core, + firstTarget.runtime, + `webhook accepted sender=${message.senderId} group=${message.isGroup} chatGuid=${message.chatGuid ?? ""} chatId=${message.chatId ?? ""}`, + ); + } + } + return true; + }, }); - if (!requestLifecycle.ok) { - return true; - } - - try { - const guidParam = url.searchParams.get("guid") ?? url.searchParams.get("password"); - const headerToken = - req.headers["x-guid"] ?? - req.headers["x-password"] ?? - req.headers["x-bluebubbles-guid"] ?? - req.headers["authorization"]; - const guid = (Array.isArray(headerToken) ? headerToken[0] : headerToken) ?? guidParam ?? ""; - const target = resolveWebhookTargetWithAuthOrRejectSync({ - targets, - res, - isMatch: (target) => { - const token = target.account.config.password?.trim() ?? ""; - return safeEqualSecret(guid, token); - }, - }); - if (!target) { - console.warn( - `[bluebubbles] webhook rejected: status=${res.statusCode} path=${path} guid=${maskSecret(url.searchParams.get("guid") ?? url.searchParams.get("password") ?? "")}`, - ); - return true; - } - const body = await readWebhookBodyOrReject({ - req, - res, - profile: "post-auth", - invalidBodyMessage: "invalid payload", - }); - if (!body.ok) { - console.warn(`[bluebubbles] webhook rejected: status=${res.statusCode}`); - return true; - } - - const parsed = parseBlueBubblesWebhookPayload(body.value); - if (!parsed.ok) { - res.statusCode = 400; - res.end(parsed.error); - console.warn(`[bluebubbles] webhook rejected: ${parsed.error}`); - return true; - } - - const payload = asRecord(parsed.value) ?? {}; - const firstTarget = targets[0]; - if (firstTarget) { - logVerbose( - firstTarget.core, - firstTarget.runtime, - `webhook received path=${path} keys=${Object.keys(payload).join(",") || "none"}`, - ); - } - const eventTypeRaw = payload.type; - const eventType = typeof eventTypeRaw === "string" ? eventTypeRaw.trim() : ""; - const allowedEventTypes = new Set([ - "new-message", - "updated-message", - "message-reaction", - "reaction", - ]); - if (eventType && !allowedEventTypes.has(eventType)) { - res.statusCode = 200; - res.end("ok"); - if (firstTarget) { - logVerbose(firstTarget.core, firstTarget.runtime, `webhook ignored type=${eventType}`); - } - return true; - } - const reaction = normalizeWebhookReaction(payload); - if ( - (eventType === "updated-message" || - eventType === "message-reaction" || - eventType === "reaction") && - !reaction - ) { - res.statusCode = 200; - res.end("ok"); - if (firstTarget) { - logVerbose( - firstTarget.core, - firstTarget.runtime, - `webhook ignored ${eventType || "event"} without reaction`, - ); - } - return true; - } - const message = reaction ? null : normalizeWebhookMessage(payload); - if (!message && !reaction) { - res.statusCode = 400; - res.end("invalid payload"); - console.warn("[bluebubbles] webhook rejected: unable to parse message payload"); - return true; - } - - target.statusSink?.({ lastInboundAt: Date.now() }); - if (reaction) { - processReaction(reaction, target).catch((err) => { - target.runtime.error?.( - `[${target.account.accountId}] BlueBubbles reaction failed: ${String(err)}`, - ); - }); - } else if (message) { - // Route messages through debouncer to coalesce rapid-fire events - // (e.g., text message + URL balloon arriving as separate webhooks) - const debouncer = debounceRegistry.getOrCreateDebouncer(target); - debouncer.enqueue({ message, target }).catch((err) => { - target.runtime.error?.( - `[${target.account.accountId}] BlueBubbles webhook failed: ${String(err)}`, - ); - }); - } - - res.statusCode = 200; - res.end("ok"); - if (reaction) { - if (firstTarget) { - logVerbose( - firstTarget.core, - firstTarget.runtime, - `webhook accepted reaction sender=${reaction.senderId} msg=${reaction.messageId} action=${reaction.action}`, - ); - } - } else if (message) { - if (firstTarget) { - logVerbose( - firstTarget.core, - firstTarget.runtime, - `webhook accepted sender=${message.senderId} group=${message.isGroup} chatGuid=${message.chatGuid ?? ""} chatId=${message.chatId ?? ""}`, - ); - } - } - return true; - } finally { - requestLifecycle.release(); - } } export async function monitorBlueBubblesProvider( diff --git a/extensions/googlechat/src/monitor-webhook.ts b/extensions/googlechat/src/monitor-webhook.ts index 5f380722267..cde54214575 100644 --- a/extensions/googlechat/src/monitor-webhook.ts +++ b/extensions/googlechat/src/monitor-webhook.ts @@ -1,9 +1,8 @@ import type { IncomingMessage, ServerResponse } from "node:http"; import { - beginWebhookRequestPipelineOrReject, readJsonWebhookBodyOrReject, resolveWebhookTargetWithAuthOrReject, - resolveWebhookTargets, + withResolvedWebhookRequestPipeline, type WebhookInFlightLimiter, } from "openclaw/plugin-sdk/googlechat"; import { verifyGoogleChatRequest } from "./auth.js"; @@ -95,118 +94,106 @@ export function createGoogleChatWebhookRequestHandler(params: { processEvent: (event: GoogleChatEvent, target: WebhookTarget) => Promise; }): (req: IncomingMessage, res: ServerResponse) => Promise { return async (req: IncomingMessage, res: ServerResponse): Promise => { - const resolved = resolveWebhookTargets(req, params.webhookTargets); - if (!resolved) { - return false; - } - const { path, targets } = resolved; - - const requestLifecycle = beginWebhookRequestPipelineOrReject({ + return await withResolvedWebhookRequestPipeline({ req, res, + targetsByPath: params.webhookTargets, allowMethods: ["POST"], requireJsonContentType: true, inFlightLimiter: params.webhookInFlightLimiter, - inFlightKey: `${path}:${req.socket?.remoteAddress ?? "unknown"}`, - }); - if (!requestLifecycle.ok) { - return true; - } + handle: async ({ targets }) => { + const headerBearer = extractBearerToken(req.headers.authorization); + let selectedTarget: WebhookTarget | null = null; + let parsedEvent: GoogleChatEvent | null = null; + const readAndParseEvent = async ( + profile: "pre-auth" | "post-auth", + ): Promise => { + const body = await readJsonWebhookBodyOrReject({ + req, + res, + profile, + emptyObjectOnEmpty: false, + invalidJsonMessage: "invalid payload", + }); + if (!body.ok) { + return null; + } - try { - const headerBearer = extractBearerToken(req.headers.authorization); - let selectedTarget: WebhookTarget | null = null; - let parsedEvent: GoogleChatEvent | null = null; - const readAndParseEvent = async ( - profile: "pre-auth" | "post-auth", - ): Promise => { - const body = await readJsonWebhookBodyOrReject({ - req, - res, - profile, - emptyObjectOnEmpty: false, - invalidJsonMessage: "invalid payload", - }); - if (!body.ok) { - return null; + const parsed = parseGoogleChatInboundPayload(body.value, res); + return parsed.ok ? parsed : null; + }; + + if (headerBearer) { + selectedTarget = await resolveWebhookTargetWithAuthOrReject({ + targets, + res, + isMatch: async (target) => { + const verification = await verifyGoogleChatRequest({ + bearer: headerBearer, + audienceType: target.audienceType, + audience: target.audience, + }); + return verification.ok; + }, + }); + if (!selectedTarget) { + return true; + } + + const parsed = await readAndParseEvent("post-auth"); + if (!parsed) { + return true; + } + parsedEvent = parsed.event; + } else { + const parsed = await readAndParseEvent("pre-auth"); + if (!parsed) { + return true; + } + parsedEvent = parsed.event; + + if (!parsed.addOnBearerToken) { + res.statusCode = 401; + res.end("unauthorized"); + return true; + } + + selectedTarget = await resolveWebhookTargetWithAuthOrReject({ + targets, + res, + isMatch: async (target) => { + const verification = await verifyGoogleChatRequest({ + bearer: parsed.addOnBearerToken, + audienceType: target.audienceType, + audience: target.audience, + }); + return verification.ok; + }, + }); + if (!selectedTarget) { + return true; + } } - const parsed = parseGoogleChatInboundPayload(body.value, res); - return parsed.ok ? parsed : null; - }; - - if (headerBearer) { - selectedTarget = await resolveWebhookTargetWithAuthOrReject({ - targets, - res, - isMatch: async (target) => { - const verification = await verifyGoogleChatRequest({ - bearer: headerBearer, - audienceType: target.audienceType, - audience: target.audience, - }); - return verification.ok; - }, - }); - if (!selectedTarget) { - return true; - } - - const parsed = await readAndParseEvent("post-auth"); - if (!parsed) { - return true; - } - parsedEvent = parsed.event; - } else { - const parsed = await readAndParseEvent("pre-auth"); - if (!parsed) { - return true; - } - parsedEvent = parsed.event; - - if (!parsed.addOnBearerToken) { + if (!selectedTarget || !parsedEvent) { res.statusCode = 401; res.end("unauthorized"); return true; } - selectedTarget = await resolveWebhookTargetWithAuthOrReject({ - targets, - res, - isMatch: async (target) => { - const verification = await verifyGoogleChatRequest({ - bearer: parsed.addOnBearerToken, - audienceType: target.audienceType, - audience: target.audience, - }); - return verification.ok; - }, + const dispatchTarget = selectedTarget; + dispatchTarget.statusSink?.({ lastInboundAt: Date.now() }); + params.processEvent(parsedEvent, dispatchTarget).catch((err) => { + dispatchTarget.runtime.error?.( + `[${dispatchTarget.account.accountId}] Google Chat webhook failed: ${String(err)}`, + ); }); - if (!selectedTarget) { - return true; - } - } - if (!selectedTarget || !parsedEvent) { - res.statusCode = 401; - res.end("unauthorized"); + res.statusCode = 200; + res.setHeader("Content-Type", "application/json"); + res.end("{}"); return true; - } - - const dispatchTarget = selectedTarget; - dispatchTarget.statusSink?.({ lastInboundAt: Date.now() }); - params.processEvent(parsedEvent, dispatchTarget).catch((err) => { - dispatchTarget.runtime.error?.( - `[${dispatchTarget.account.accountId}] Google Chat webhook failed: ${String(err)}`, - ); - }); - - res.statusCode = 200; - res.setHeader("Content-Type", "application/json"); - res.end("{}"); - return true; - } finally { - requestLifecycle.release(); - } + }, + }); }; } diff --git a/extensions/zalo/src/monitor.webhook.ts b/extensions/zalo/src/monitor.webhook.ts index 3bcc35aa43c..8fad827fddc 100644 --- a/extensions/zalo/src/monitor.webhook.ts +++ b/extensions/zalo/src/monitor.webhook.ts @@ -11,8 +11,8 @@ import { type RegisterWebhookTargetOptions, type RegisterWebhookPluginRouteOptions, registerWebhookTarget, - resolveSingleWebhookTarget, - resolveWebhookTargets, + resolveWebhookTargetWithAuthOrRejectSync, + withResolvedWebhookRequestPipeline, WEBHOOK_ANOMALY_COUNTER_DEFAULTS, WEBHOOK_RATE_LIMIT_DEFAULTS, } from "openclaw/plugin-sdk/zalo"; @@ -134,95 +134,80 @@ export async function handleZaloWebhookRequest( res: ServerResponse, processUpdate: ZaloWebhookProcessUpdate, ): Promise { - const resolved = resolveWebhookTargets(req, webhookTargets); - if (!resolved) { - return false; - } - const { targets, path } = resolved; - - if ( - !applyBasicWebhookRequestGuards({ - req, - res, - allowMethods: ["POST"], - }) - ) { - return true; - } - - const headerToken = String(req.headers["x-bot-api-secret-token"] ?? ""); - const matchedTarget = resolveSingleWebhookTarget(targets, (entry) => - timingSafeEquals(entry.secret, headerToken), - ); - if (matchedTarget.kind === "none") { - res.statusCode = 401; - res.end("unauthorized"); - recordWebhookStatus(targets[0]?.runtime, path, res.statusCode); - return true; - } - if (matchedTarget.kind === "ambiguous") { - res.statusCode = 401; - res.end("ambiguous webhook target"); - recordWebhookStatus(targets[0]?.runtime, path, res.statusCode); - return true; - } - const target = matchedTarget.target; - const rateLimitKey = `${path}:${req.socket.remoteAddress ?? "unknown"}`; - const nowMs = Date.now(); - - if ( - !applyBasicWebhookRequestGuards({ - req, - res, - rateLimiter: webhookRateLimiter, - rateLimitKey, - nowMs, - requireJsonContentType: true, - }) - ) { - recordWebhookStatus(target.runtime, path, res.statusCode); - return true; - } - const body = await readJsonWebhookBodyOrReject({ + return await withResolvedWebhookRequestPipeline({ req, res, - maxBytes: 1024 * 1024, - timeoutMs: 30_000, - emptyObjectOnEmpty: false, - invalidJsonMessage: "Bad Request", + targetsByPath: webhookTargets, + allowMethods: ["POST"], + handle: async ({ targets, path }) => { + const headerToken = String(req.headers["x-bot-api-secret-token"] ?? ""); + const target = resolveWebhookTargetWithAuthOrRejectSync({ + targets, + res, + isMatch: (entry) => timingSafeEquals(entry.secret, headerToken), + }); + if (!target) { + recordWebhookStatus(targets[0]?.runtime, path, res.statusCode); + return true; + } + const rateLimitKey = `${path}:${req.socket.remoteAddress ?? "unknown"}`; + const nowMs = Date.now(); + + if ( + !applyBasicWebhookRequestGuards({ + req, + res, + rateLimiter: webhookRateLimiter, + rateLimitKey, + nowMs, + requireJsonContentType: true, + }) + ) { + recordWebhookStatus(target.runtime, path, res.statusCode); + return true; + } + const body = await readJsonWebhookBodyOrReject({ + req, + res, + maxBytes: 1024 * 1024, + timeoutMs: 30_000, + emptyObjectOnEmpty: false, + invalidJsonMessage: "Bad Request", + }); + if (!body.ok) { + recordWebhookStatus(target.runtime, path, res.statusCode); + return true; + } + const raw = body.value; + + // Zalo sends updates directly as { event_name, message, ... }, not wrapped in { ok, result }. + const record = raw && typeof raw === "object" ? (raw as Record) : null; + const update: ZaloUpdate | undefined = + record && record.ok === true && record.result + ? (record.result as ZaloUpdate) + : ((record as ZaloUpdate | null) ?? undefined); + + if (!update?.event_name) { + res.statusCode = 400; + res.end("Bad Request"); + recordWebhookStatus(target.runtime, path, res.statusCode); + return true; + } + + if (isReplayEvent(update, nowMs)) { + res.statusCode = 200; + res.end("ok"); + return true; + } + + target.statusSink?.({ lastInboundAt: Date.now() }); + processUpdate({ update, target }).catch((err) => { + target.runtime.error?.(`[${target.account.accountId}] Zalo webhook failed: ${String(err)}`); + }); + + res.statusCode = 200; + res.end("ok"); + return true; + }, }); - if (!body.ok) { - recordWebhookStatus(target.runtime, path, res.statusCode); - return true; - } - const raw = body.value; - - // Zalo sends updates directly as { event_name, message, ... }, not wrapped in { ok, result }. - const record = raw && typeof raw === "object" ? (raw as Record) : null; - const update: ZaloUpdate | undefined = - record && record.ok === true && record.result - ? (record.result as ZaloUpdate) - : ((record as ZaloUpdate | null) ?? undefined); - - if (!update?.event_name) { - res.statusCode = 400; - res.end("Bad Request"); - recordWebhookStatus(target.runtime, path, res.statusCode); - return true; - } - - if (isReplayEvent(update, nowMs)) { - res.statusCode = 200; - res.end("ok"); - return true; - } - - target.statusSink?.({ lastInboundAt: Date.now() }); - processUpdate({ update, target }).catch((err) => { - target.runtime.error?.(`[${target.account.accountId}] Zalo webhook failed: ${String(err)}`); - }); - - res.statusCode = 200; - res.end("ok"); - return true; } diff --git a/src/plugin-sdk/bluebubbles.ts b/src/plugin-sdk/bluebubbles.ts index 9af0103b59c..5a1180a5770 100644 --- a/src/plugin-sdk/bluebubbles.ts +++ b/src/plugin-sdk/bluebubbles.ts @@ -106,4 +106,5 @@ export { registerWebhookTargetWithPluginRoute, resolveWebhookTargets, resolveWebhookTargetWithAuthOrRejectSync, + withResolvedWebhookRequestPipeline, } from "./webhook-targets.js"; diff --git a/src/plugin-sdk/googlechat.ts b/src/plugin-sdk/googlechat.ts index ff0fe9281bf..72c834eb9b5 100644 --- a/src/plugin-sdk/googlechat.ts +++ b/src/plugin-sdk/googlechat.ts @@ -85,4 +85,5 @@ export { registerWebhookTargetWithPluginRoute, resolveWebhookTargets, resolveWebhookTargetWithAuthOrReject, + withResolvedWebhookRequestPipeline, } from "./webhook-targets.js"; diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index feb61839ef9..bf50d3d4d8e 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -154,6 +154,7 @@ export { resolveSingleWebhookTarget, resolveSingleWebhookTargetAsync, resolveWebhookTargets, + withResolvedWebhookRequestPipeline, } from "./webhook-targets.js"; export type { RegisterWebhookPluginRouteOptions, diff --git a/src/plugin-sdk/webhook-targets.test.ts b/src/plugin-sdk/webhook-targets.test.ts index 4f428f5b477..02ad40b1f1c 100644 --- a/src/plugin-sdk/webhook-targets.test.ts +++ b/src/plugin-sdk/webhook-targets.test.ts @@ -3,6 +3,7 @@ import type { IncomingMessage, ServerResponse } from "node:http"; import { afterEach, describe, expect, it, vi } from "vitest"; import { createEmptyPluginRegistry } from "../plugins/registry.js"; import { setActivePluginRegistry } from "../plugins/runtime.js"; +import { createWebhookInFlightLimiter } from "./webhook-request-guards.js"; import { registerWebhookTarget, registerWebhookTargetWithPluginRoute, @@ -12,6 +13,7 @@ import { resolveWebhookTargetWithAuthOrReject, resolveWebhookTargetWithAuthOrRejectSync, resolveWebhookTargets, + withResolvedWebhookRequestPipeline, } from "./webhook-targets.js"; function createRequest(method: string, url: string): IncomingMessage { @@ -155,6 +157,78 @@ describe("resolveWebhookTargets", () => { }); }); +describe("withResolvedWebhookRequestPipeline", () => { + it("returns false when request path has no registered targets", async () => { + const req = createRequest("POST", "/missing"); + req.headers = {}; + const res = { + statusCode: 200, + setHeader: vi.fn(), + end: vi.fn(), + } as unknown as ServerResponse; + const handled = await withResolvedWebhookRequestPipeline({ + req, + res, + targetsByPath: new Map>(), + allowMethods: ["POST"], + handle: vi.fn(), + }); + expect(handled).toBe(false); + }); + + it("runs handler when targets resolve and method passes", async () => { + const req = createRequest("POST", "/hook"); + req.headers = {}; + (req as unknown as { socket: { remoteAddress: string } }).socket = { + remoteAddress: "127.0.0.1", + }; + const res = { + statusCode: 200, + setHeader: vi.fn(), + end: vi.fn(), + } as unknown as ServerResponse; + const handle = vi.fn(async () => {}); + const handled = await withResolvedWebhookRequestPipeline({ + req, + res, + targetsByPath: new Map([["/hook", [{ id: "A" }]]]), + allowMethods: ["POST"], + handle, + }); + expect(handled).toBe(true); + expect(handle).toHaveBeenCalledWith({ path: "/hook", targets: [{ id: "A" }] }); + }); + + it("releases in-flight slot when handler throws", async () => { + const req = createRequest("POST", "/hook"); + req.headers = {}; + (req as unknown as { socket: { remoteAddress: string } }).socket = { + remoteAddress: "127.0.0.1", + }; + const res = { + statusCode: 200, + setHeader: vi.fn(), + end: vi.fn(), + } as unknown as ServerResponse; + const limiter = createWebhookInFlightLimiter(); + + await expect( + withResolvedWebhookRequestPipeline({ + req, + res, + targetsByPath: new Map([["/hook", [{ id: "A" }]]]), + allowMethods: ["POST"], + inFlightLimiter: limiter, + handle: async () => { + throw new Error("boom"); + }, + }), + ).rejects.toThrow("boom"); + + expect(limiter.size()).toBe(0); + }); +}); + describe("rejectNonPostWebhookRequest", () => { it("sets 405 for non-POST requests", () => { const setHeaderMock = vi.fn(); diff --git a/src/plugin-sdk/webhook-targets.ts b/src/plugin-sdk/webhook-targets.ts index 298b3d14974..791f4591101 100644 --- a/src/plugin-sdk/webhook-targets.ts +++ b/src/plugin-sdk/webhook-targets.ts @@ -1,6 +1,11 @@ import type { IncomingMessage, ServerResponse } from "node:http"; import { registerPluginHttpRoute } from "../plugins/http-registry.js"; +import type { FixedWindowRateLimiter } from "./webhook-memory-guards.js"; import { normalizeWebhookPath } from "./webhook-path.js"; +import { + beginWebhookRequestPipelineOrReject, + type WebhookInFlightLimiter, +} from "./webhook-request-guards.js"; export type RegisteredWebhookTarget = { target: T; @@ -107,6 +112,55 @@ export function resolveWebhookTargets( return { path, targets }; } +export async function withResolvedWebhookRequestPipeline(params: { + req: IncomingMessage; + res: ServerResponse; + targetsByPath: Map; + allowMethods?: readonly string[]; + rateLimiter?: FixedWindowRateLimiter; + rateLimitKey?: string; + nowMs?: number; + requireJsonContentType?: boolean; + inFlightLimiter?: WebhookInFlightLimiter; + inFlightKey?: string | ((args: { req: IncomingMessage; path: string; targets: T[] }) => string); + inFlightLimitStatusCode?: number; + inFlightLimitMessage?: string; + handle: (args: { path: string; targets: T[] }) => Promise | boolean | void; +}): Promise { + const resolved = resolveWebhookTargets(params.req, params.targetsByPath); + if (!resolved) { + return false; + } + + const inFlightKey = + typeof params.inFlightKey === "function" + ? params.inFlightKey({ req: params.req, path: resolved.path, targets: resolved.targets }) + : (params.inFlightKey ?? `${resolved.path}:${params.req.socket?.remoteAddress ?? "unknown"}`); + const requestLifecycle = beginWebhookRequestPipelineOrReject({ + req: params.req, + res: params.res, + allowMethods: params.allowMethods, + rateLimiter: params.rateLimiter, + rateLimitKey: params.rateLimitKey, + nowMs: params.nowMs, + requireJsonContentType: params.requireJsonContentType, + inFlightLimiter: params.inFlightLimiter, + inFlightKey, + inFlightLimitStatusCode: params.inFlightLimitStatusCode, + inFlightLimitMessage: params.inFlightLimitMessage, + }); + if (!requestLifecycle.ok) { + return true; + } + + try { + await params.handle(resolved); + return true; + } finally { + requestLifecycle.release(); + } +} + export type WebhookTargetMatchResult = | { kind: "none" } | { kind: "single"; target: T } diff --git a/src/plugin-sdk/zalo.ts b/src/plugin-sdk/zalo.ts index b6c2d17cab4..82f484b4877 100644 --- a/src/plugin-sdk/zalo.ts +++ b/src/plugin-sdk/zalo.ts @@ -106,6 +106,8 @@ export type { export { registerWebhookTarget, registerWebhookTargetWithPluginRoute, + resolveWebhookTargetWithAuthOrRejectSync, resolveSingleWebhookTarget, resolveWebhookTargets, + withResolvedWebhookRequestPipeline, } from "./webhook-targets.js";