refactor: unify extension webhook request lifecycle scaffolding

This commit is contained in:
Peter Steinberger
2026-03-07 21:56:47 +00:00
parent 27dad962fe
commit 4b61779a46
9 changed files with 429 additions and 336 deletions

View File

@@ -1,12 +1,11 @@
import { timingSafeEqual } from "node:crypto";
import type { IncomingMessage, ServerResponse } from "node:http";
import {
beginWebhookRequestPipelineOrReject,
createWebhookInFlightLimiter,
registerWebhookTargetWithPluginRoute,
readWebhookBodyOrReject,
resolveWebhookTargetWithAuthOrRejectSync,
resolveWebhookTargets,
withResolvedWebhookRequestPipeline,
} from "openclaw/plugin-sdk/bluebubbles";
import { createBlueBubblesDebounceRegistry } from "./monitor-debounce.js";
import { normalizeWebhookMessage, normalizeWebhookReaction } from "./monitor-normalize.js";
@@ -122,156 +121,145 @@ export async function handleBlueBubblesWebhookRequest(
req: IncomingMessage,
res: ServerResponse,
): Promise<boolean> {
const resolved = resolveWebhookTargets(req, webhookTargets);
if (!resolved) {
return false;
}
const { path, targets } = resolved;
const url = new URL(req.url ?? "/", "http://localhost");
const requestLifecycle = beginWebhookRequestPipelineOrReject({
return await withResolvedWebhookRequestPipeline({
req,
res,
targetsByPath: webhookTargets,
allowMethods: ["POST"],
inFlightLimiter: webhookInFlightLimiter,
inFlightKey: `${path}:${req.socket.remoteAddress ?? "unknown"}`,
handle: async ({ path, targets }) => {
const url = new URL(req.url ?? "/", "http://localhost");
const guidParam = url.searchParams.get("guid") ?? url.searchParams.get("password");
const headerToken =
req.headers["x-guid"] ??
req.headers["x-password"] ??
req.headers["x-bluebubbles-guid"] ??
req.headers["authorization"];
const guid = (Array.isArray(headerToken) ? headerToken[0] : headerToken) ?? guidParam ?? "";
const target = resolveWebhookTargetWithAuthOrRejectSync({
targets,
res,
isMatch: (target) => {
const token = target.account.config.password?.trim() ?? "";
return safeEqualSecret(guid, token);
},
});
if (!target) {
console.warn(
`[bluebubbles] webhook rejected: status=${res.statusCode} path=${path} guid=${maskSecret(url.searchParams.get("guid") ?? url.searchParams.get("password") ?? "")}`,
);
return true;
}
const body = await readWebhookBodyOrReject({
req,
res,
profile: "post-auth",
invalidBodyMessage: "invalid payload",
});
if (!body.ok) {
console.warn(`[bluebubbles] webhook rejected: status=${res.statusCode}`);
return true;
}
const parsed = parseBlueBubblesWebhookPayload(body.value);
if (!parsed.ok) {
res.statusCode = 400;
res.end(parsed.error);
console.warn(`[bluebubbles] webhook rejected: ${parsed.error}`);
return true;
}
const payload = asRecord(parsed.value) ?? {};
const firstTarget = targets[0];
if (firstTarget) {
logVerbose(
firstTarget.core,
firstTarget.runtime,
`webhook received path=${path} keys=${Object.keys(payload).join(",") || "none"}`,
);
}
const eventTypeRaw = payload.type;
const eventType = typeof eventTypeRaw === "string" ? eventTypeRaw.trim() : "";
const allowedEventTypes = new Set([
"new-message",
"updated-message",
"message-reaction",
"reaction",
]);
if (eventType && !allowedEventTypes.has(eventType)) {
res.statusCode = 200;
res.end("ok");
if (firstTarget) {
logVerbose(firstTarget.core, firstTarget.runtime, `webhook ignored type=${eventType}`);
}
return true;
}
const reaction = normalizeWebhookReaction(payload);
if (
(eventType === "updated-message" ||
eventType === "message-reaction" ||
eventType === "reaction") &&
!reaction
) {
res.statusCode = 200;
res.end("ok");
if (firstTarget) {
logVerbose(
firstTarget.core,
firstTarget.runtime,
`webhook ignored ${eventType || "event"} without reaction`,
);
}
return true;
}
const message = reaction ? null : normalizeWebhookMessage(payload);
if (!message && !reaction) {
res.statusCode = 400;
res.end("invalid payload");
console.warn("[bluebubbles] webhook rejected: unable to parse message payload");
return true;
}
target.statusSink?.({ lastInboundAt: Date.now() });
if (reaction) {
processReaction(reaction, target).catch((err) => {
target.runtime.error?.(
`[${target.account.accountId}] BlueBubbles reaction failed: ${String(err)}`,
);
});
} else if (message) {
// Route messages through debouncer to coalesce rapid-fire events
// (e.g., text message + URL balloon arriving as separate webhooks)
const debouncer = debounceRegistry.getOrCreateDebouncer(target);
debouncer.enqueue({ message, target }).catch((err) => {
target.runtime.error?.(
`[${target.account.accountId}] BlueBubbles webhook failed: ${String(err)}`,
);
});
}
res.statusCode = 200;
res.end("ok");
if (reaction) {
if (firstTarget) {
logVerbose(
firstTarget.core,
firstTarget.runtime,
`webhook accepted reaction sender=${reaction.senderId} msg=${reaction.messageId} action=${reaction.action}`,
);
}
} else if (message) {
if (firstTarget) {
logVerbose(
firstTarget.core,
firstTarget.runtime,
`webhook accepted sender=${message.senderId} group=${message.isGroup} chatGuid=${message.chatGuid ?? ""} chatId=${message.chatId ?? ""}`,
);
}
}
return true;
},
});
if (!requestLifecycle.ok) {
return true;
}
try {
const guidParam = url.searchParams.get("guid") ?? url.searchParams.get("password");
const headerToken =
req.headers["x-guid"] ??
req.headers["x-password"] ??
req.headers["x-bluebubbles-guid"] ??
req.headers["authorization"];
const guid = (Array.isArray(headerToken) ? headerToken[0] : headerToken) ?? guidParam ?? "";
const target = resolveWebhookTargetWithAuthOrRejectSync({
targets,
res,
isMatch: (target) => {
const token = target.account.config.password?.trim() ?? "";
return safeEqualSecret(guid, token);
},
});
if (!target) {
console.warn(
`[bluebubbles] webhook rejected: status=${res.statusCode} path=${path} guid=${maskSecret(url.searchParams.get("guid") ?? url.searchParams.get("password") ?? "")}`,
);
return true;
}
const body = await readWebhookBodyOrReject({
req,
res,
profile: "post-auth",
invalidBodyMessage: "invalid payload",
});
if (!body.ok) {
console.warn(`[bluebubbles] webhook rejected: status=${res.statusCode}`);
return true;
}
const parsed = parseBlueBubblesWebhookPayload(body.value);
if (!parsed.ok) {
res.statusCode = 400;
res.end(parsed.error);
console.warn(`[bluebubbles] webhook rejected: ${parsed.error}`);
return true;
}
const payload = asRecord(parsed.value) ?? {};
const firstTarget = targets[0];
if (firstTarget) {
logVerbose(
firstTarget.core,
firstTarget.runtime,
`webhook received path=${path} keys=${Object.keys(payload).join(",") || "none"}`,
);
}
const eventTypeRaw = payload.type;
const eventType = typeof eventTypeRaw === "string" ? eventTypeRaw.trim() : "";
const allowedEventTypes = new Set([
"new-message",
"updated-message",
"message-reaction",
"reaction",
]);
if (eventType && !allowedEventTypes.has(eventType)) {
res.statusCode = 200;
res.end("ok");
if (firstTarget) {
logVerbose(firstTarget.core, firstTarget.runtime, `webhook ignored type=${eventType}`);
}
return true;
}
const reaction = normalizeWebhookReaction(payload);
if (
(eventType === "updated-message" ||
eventType === "message-reaction" ||
eventType === "reaction") &&
!reaction
) {
res.statusCode = 200;
res.end("ok");
if (firstTarget) {
logVerbose(
firstTarget.core,
firstTarget.runtime,
`webhook ignored ${eventType || "event"} without reaction`,
);
}
return true;
}
const message = reaction ? null : normalizeWebhookMessage(payload);
if (!message && !reaction) {
res.statusCode = 400;
res.end("invalid payload");
console.warn("[bluebubbles] webhook rejected: unable to parse message payload");
return true;
}
target.statusSink?.({ lastInboundAt: Date.now() });
if (reaction) {
processReaction(reaction, target).catch((err) => {
target.runtime.error?.(
`[${target.account.accountId}] BlueBubbles reaction failed: ${String(err)}`,
);
});
} else if (message) {
// Route messages through debouncer to coalesce rapid-fire events
// (e.g., text message + URL balloon arriving as separate webhooks)
const debouncer = debounceRegistry.getOrCreateDebouncer(target);
debouncer.enqueue({ message, target }).catch((err) => {
target.runtime.error?.(
`[${target.account.accountId}] BlueBubbles webhook failed: ${String(err)}`,
);
});
}
res.statusCode = 200;
res.end("ok");
if (reaction) {
if (firstTarget) {
logVerbose(
firstTarget.core,
firstTarget.runtime,
`webhook accepted reaction sender=${reaction.senderId} msg=${reaction.messageId} action=${reaction.action}`,
);
}
} else if (message) {
if (firstTarget) {
logVerbose(
firstTarget.core,
firstTarget.runtime,
`webhook accepted sender=${message.senderId} group=${message.isGroup} chatGuid=${message.chatGuid ?? ""} chatId=${message.chatId ?? ""}`,
);
}
}
return true;
} finally {
requestLifecycle.release();
}
}
export async function monitorBlueBubblesProvider(

View File

@@ -1,9 +1,8 @@
import type { IncomingMessage, ServerResponse } from "node:http";
import {
beginWebhookRequestPipelineOrReject,
readJsonWebhookBodyOrReject,
resolveWebhookTargetWithAuthOrReject,
resolveWebhookTargets,
withResolvedWebhookRequestPipeline,
type WebhookInFlightLimiter,
} from "openclaw/plugin-sdk/googlechat";
import { verifyGoogleChatRequest } from "./auth.js";
@@ -95,118 +94,106 @@ export function createGoogleChatWebhookRequestHandler(params: {
processEvent: (event: GoogleChatEvent, target: WebhookTarget) => Promise<void>;
}): (req: IncomingMessage, res: ServerResponse) => Promise<boolean> {
return async (req: IncomingMessage, res: ServerResponse): Promise<boolean> => {
const resolved = resolveWebhookTargets(req, params.webhookTargets);
if (!resolved) {
return false;
}
const { path, targets } = resolved;
const requestLifecycle = beginWebhookRequestPipelineOrReject({
return await withResolvedWebhookRequestPipeline({
req,
res,
targetsByPath: params.webhookTargets,
allowMethods: ["POST"],
requireJsonContentType: true,
inFlightLimiter: params.webhookInFlightLimiter,
inFlightKey: `${path}:${req.socket?.remoteAddress ?? "unknown"}`,
});
if (!requestLifecycle.ok) {
return true;
}
handle: async ({ targets }) => {
const headerBearer = extractBearerToken(req.headers.authorization);
let selectedTarget: WebhookTarget | null = null;
let parsedEvent: GoogleChatEvent | null = null;
const readAndParseEvent = async (
profile: "pre-auth" | "post-auth",
): Promise<ParsedGoogleChatInboundSuccess | null> => {
const body = await readJsonWebhookBodyOrReject({
req,
res,
profile,
emptyObjectOnEmpty: false,
invalidJsonMessage: "invalid payload",
});
if (!body.ok) {
return null;
}
try {
const headerBearer = extractBearerToken(req.headers.authorization);
let selectedTarget: WebhookTarget | null = null;
let parsedEvent: GoogleChatEvent | null = null;
const readAndParseEvent = async (
profile: "pre-auth" | "post-auth",
): Promise<ParsedGoogleChatInboundSuccess | null> => {
const body = await readJsonWebhookBodyOrReject({
req,
res,
profile,
emptyObjectOnEmpty: false,
invalidJsonMessage: "invalid payload",
});
if (!body.ok) {
return null;
const parsed = parseGoogleChatInboundPayload(body.value, res);
return parsed.ok ? parsed : null;
};
if (headerBearer) {
selectedTarget = await resolveWebhookTargetWithAuthOrReject({
targets,
res,
isMatch: async (target) => {
const verification = await verifyGoogleChatRequest({
bearer: headerBearer,
audienceType: target.audienceType,
audience: target.audience,
});
return verification.ok;
},
});
if (!selectedTarget) {
return true;
}
const parsed = await readAndParseEvent("post-auth");
if (!parsed) {
return true;
}
parsedEvent = parsed.event;
} else {
const parsed = await readAndParseEvent("pre-auth");
if (!parsed) {
return true;
}
parsedEvent = parsed.event;
if (!parsed.addOnBearerToken) {
res.statusCode = 401;
res.end("unauthorized");
return true;
}
selectedTarget = await resolveWebhookTargetWithAuthOrReject({
targets,
res,
isMatch: async (target) => {
const verification = await verifyGoogleChatRequest({
bearer: parsed.addOnBearerToken,
audienceType: target.audienceType,
audience: target.audience,
});
return verification.ok;
},
});
if (!selectedTarget) {
return true;
}
}
const parsed = parseGoogleChatInboundPayload(body.value, res);
return parsed.ok ? parsed : null;
};
if (headerBearer) {
selectedTarget = await resolveWebhookTargetWithAuthOrReject({
targets,
res,
isMatch: async (target) => {
const verification = await verifyGoogleChatRequest({
bearer: headerBearer,
audienceType: target.audienceType,
audience: target.audience,
});
return verification.ok;
},
});
if (!selectedTarget) {
return true;
}
const parsed = await readAndParseEvent("post-auth");
if (!parsed) {
return true;
}
parsedEvent = parsed.event;
} else {
const parsed = await readAndParseEvent("pre-auth");
if (!parsed) {
return true;
}
parsedEvent = parsed.event;
if (!parsed.addOnBearerToken) {
if (!selectedTarget || !parsedEvent) {
res.statusCode = 401;
res.end("unauthorized");
return true;
}
selectedTarget = await resolveWebhookTargetWithAuthOrReject({
targets,
res,
isMatch: async (target) => {
const verification = await verifyGoogleChatRequest({
bearer: parsed.addOnBearerToken,
audienceType: target.audienceType,
audience: target.audience,
});
return verification.ok;
},
const dispatchTarget = selectedTarget;
dispatchTarget.statusSink?.({ lastInboundAt: Date.now() });
params.processEvent(parsedEvent, dispatchTarget).catch((err) => {
dispatchTarget.runtime.error?.(
`[${dispatchTarget.account.accountId}] Google Chat webhook failed: ${String(err)}`,
);
});
if (!selectedTarget) {
return true;
}
}
if (!selectedTarget || !parsedEvent) {
res.statusCode = 401;
res.end("unauthorized");
res.statusCode = 200;
res.setHeader("Content-Type", "application/json");
res.end("{}");
return true;
}
const dispatchTarget = selectedTarget;
dispatchTarget.statusSink?.({ lastInboundAt: Date.now() });
params.processEvent(parsedEvent, dispatchTarget).catch((err) => {
dispatchTarget.runtime.error?.(
`[${dispatchTarget.account.accountId}] Google Chat webhook failed: ${String(err)}`,
);
});
res.statusCode = 200;
res.setHeader("Content-Type", "application/json");
res.end("{}");
return true;
} finally {
requestLifecycle.release();
}
},
});
};
}

View File

@@ -11,8 +11,8 @@ import {
type RegisterWebhookTargetOptions,
type RegisterWebhookPluginRouteOptions,
registerWebhookTarget,
resolveSingleWebhookTarget,
resolveWebhookTargets,
resolveWebhookTargetWithAuthOrRejectSync,
withResolvedWebhookRequestPipeline,
WEBHOOK_ANOMALY_COUNTER_DEFAULTS,
WEBHOOK_RATE_LIMIT_DEFAULTS,
} from "openclaw/plugin-sdk/zalo";
@@ -134,95 +134,80 @@ export async function handleZaloWebhookRequest(
res: ServerResponse,
processUpdate: ZaloWebhookProcessUpdate,
): Promise<boolean> {
const resolved = resolveWebhookTargets(req, webhookTargets);
if (!resolved) {
return false;
}
const { targets, path } = resolved;
if (
!applyBasicWebhookRequestGuards({
req,
res,
allowMethods: ["POST"],
})
) {
return true;
}
const headerToken = String(req.headers["x-bot-api-secret-token"] ?? "");
const matchedTarget = resolveSingleWebhookTarget(targets, (entry) =>
timingSafeEquals(entry.secret, headerToken),
);
if (matchedTarget.kind === "none") {
res.statusCode = 401;
res.end("unauthorized");
recordWebhookStatus(targets[0]?.runtime, path, res.statusCode);
return true;
}
if (matchedTarget.kind === "ambiguous") {
res.statusCode = 401;
res.end("ambiguous webhook target");
recordWebhookStatus(targets[0]?.runtime, path, res.statusCode);
return true;
}
const target = matchedTarget.target;
const rateLimitKey = `${path}:${req.socket.remoteAddress ?? "unknown"}`;
const nowMs = Date.now();
if (
!applyBasicWebhookRequestGuards({
req,
res,
rateLimiter: webhookRateLimiter,
rateLimitKey,
nowMs,
requireJsonContentType: true,
})
) {
recordWebhookStatus(target.runtime, path, res.statusCode);
return true;
}
const body = await readJsonWebhookBodyOrReject({
return await withResolvedWebhookRequestPipeline({
req,
res,
maxBytes: 1024 * 1024,
timeoutMs: 30_000,
emptyObjectOnEmpty: false,
invalidJsonMessage: "Bad Request",
targetsByPath: webhookTargets,
allowMethods: ["POST"],
handle: async ({ targets, path }) => {
const headerToken = String(req.headers["x-bot-api-secret-token"] ?? "");
const target = resolveWebhookTargetWithAuthOrRejectSync({
targets,
res,
isMatch: (entry) => timingSafeEquals(entry.secret, headerToken),
});
if (!target) {
recordWebhookStatus(targets[0]?.runtime, path, res.statusCode);
return true;
}
const rateLimitKey = `${path}:${req.socket.remoteAddress ?? "unknown"}`;
const nowMs = Date.now();
if (
!applyBasicWebhookRequestGuards({
req,
res,
rateLimiter: webhookRateLimiter,
rateLimitKey,
nowMs,
requireJsonContentType: true,
})
) {
recordWebhookStatus(target.runtime, path, res.statusCode);
return true;
}
const body = await readJsonWebhookBodyOrReject({
req,
res,
maxBytes: 1024 * 1024,
timeoutMs: 30_000,
emptyObjectOnEmpty: false,
invalidJsonMessage: "Bad Request",
});
if (!body.ok) {
recordWebhookStatus(target.runtime, path, res.statusCode);
return true;
}
const raw = body.value;
// Zalo sends updates directly as { event_name, message, ... }, not wrapped in { ok, result }.
const record = raw && typeof raw === "object" ? (raw as Record<string, unknown>) : null;
const update: ZaloUpdate | undefined =
record && record.ok === true && record.result
? (record.result as ZaloUpdate)
: ((record as ZaloUpdate | null) ?? undefined);
if (!update?.event_name) {
res.statusCode = 400;
res.end("Bad Request");
recordWebhookStatus(target.runtime, path, res.statusCode);
return true;
}
if (isReplayEvent(update, nowMs)) {
res.statusCode = 200;
res.end("ok");
return true;
}
target.statusSink?.({ lastInboundAt: Date.now() });
processUpdate({ update, target }).catch((err) => {
target.runtime.error?.(`[${target.account.accountId}] Zalo webhook failed: ${String(err)}`);
});
res.statusCode = 200;
res.end("ok");
return true;
},
});
if (!body.ok) {
recordWebhookStatus(target.runtime, path, res.statusCode);
return true;
}
const raw = body.value;
// Zalo sends updates directly as { event_name, message, ... }, not wrapped in { ok, result }.
const record = raw && typeof raw === "object" ? (raw as Record<string, unknown>) : null;
const update: ZaloUpdate | undefined =
record && record.ok === true && record.result
? (record.result as ZaloUpdate)
: ((record as ZaloUpdate | null) ?? undefined);
if (!update?.event_name) {
res.statusCode = 400;
res.end("Bad Request");
recordWebhookStatus(target.runtime, path, res.statusCode);
return true;
}
if (isReplayEvent(update, nowMs)) {
res.statusCode = 200;
res.end("ok");
return true;
}
target.statusSink?.({ lastInboundAt: Date.now() });
processUpdate({ update, target }).catch((err) => {
target.runtime.error?.(`[${target.account.accountId}] Zalo webhook failed: ${String(err)}`);
});
res.statusCode = 200;
res.end("ok");
return true;
}

View File

@@ -106,4 +106,5 @@ export {
registerWebhookTargetWithPluginRoute,
resolveWebhookTargets,
resolveWebhookTargetWithAuthOrRejectSync,
withResolvedWebhookRequestPipeline,
} from "./webhook-targets.js";

View File

@@ -85,4 +85,5 @@ export {
registerWebhookTargetWithPluginRoute,
resolveWebhookTargets,
resolveWebhookTargetWithAuthOrReject,
withResolvedWebhookRequestPipeline,
} from "./webhook-targets.js";

View File

@@ -154,6 +154,7 @@ export {
resolveSingleWebhookTarget,
resolveSingleWebhookTargetAsync,
resolveWebhookTargets,
withResolvedWebhookRequestPipeline,
} from "./webhook-targets.js";
export type {
RegisterWebhookPluginRouteOptions,

View File

@@ -3,6 +3,7 @@ import type { IncomingMessage, ServerResponse } from "node:http";
import { afterEach, describe, expect, it, vi } from "vitest";
import { createEmptyPluginRegistry } from "../plugins/registry.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createWebhookInFlightLimiter } from "./webhook-request-guards.js";
import {
registerWebhookTarget,
registerWebhookTargetWithPluginRoute,
@@ -12,6 +13,7 @@ import {
resolveWebhookTargetWithAuthOrReject,
resolveWebhookTargetWithAuthOrRejectSync,
resolveWebhookTargets,
withResolvedWebhookRequestPipeline,
} from "./webhook-targets.js";
function createRequest(method: string, url: string): IncomingMessage {
@@ -155,6 +157,78 @@ describe("resolveWebhookTargets", () => {
});
});
describe("withResolvedWebhookRequestPipeline", () => {
it("returns false when request path has no registered targets", async () => {
const req = createRequest("POST", "/missing");
req.headers = {};
const res = {
statusCode: 200,
setHeader: vi.fn(),
end: vi.fn(),
} as unknown as ServerResponse;
const handled = await withResolvedWebhookRequestPipeline({
req,
res,
targetsByPath: new Map<string, Array<{ id: string }>>(),
allowMethods: ["POST"],
handle: vi.fn(),
});
expect(handled).toBe(false);
});
it("runs handler when targets resolve and method passes", async () => {
const req = createRequest("POST", "/hook");
req.headers = {};
(req as unknown as { socket: { remoteAddress: string } }).socket = {
remoteAddress: "127.0.0.1",
};
const res = {
statusCode: 200,
setHeader: vi.fn(),
end: vi.fn(),
} as unknown as ServerResponse;
const handle = vi.fn(async () => {});
const handled = await withResolvedWebhookRequestPipeline({
req,
res,
targetsByPath: new Map([["/hook", [{ id: "A" }]]]),
allowMethods: ["POST"],
handle,
});
expect(handled).toBe(true);
expect(handle).toHaveBeenCalledWith({ path: "/hook", targets: [{ id: "A" }] });
});
it("releases in-flight slot when handler throws", async () => {
const req = createRequest("POST", "/hook");
req.headers = {};
(req as unknown as { socket: { remoteAddress: string } }).socket = {
remoteAddress: "127.0.0.1",
};
const res = {
statusCode: 200,
setHeader: vi.fn(),
end: vi.fn(),
} as unknown as ServerResponse;
const limiter = createWebhookInFlightLimiter();
await expect(
withResolvedWebhookRequestPipeline({
req,
res,
targetsByPath: new Map([["/hook", [{ id: "A" }]]]),
allowMethods: ["POST"],
inFlightLimiter: limiter,
handle: async () => {
throw new Error("boom");
},
}),
).rejects.toThrow("boom");
expect(limiter.size()).toBe(0);
});
});
describe("rejectNonPostWebhookRequest", () => {
it("sets 405 for non-POST requests", () => {
const setHeaderMock = vi.fn();

View File

@@ -1,6 +1,11 @@
import type { IncomingMessage, ServerResponse } from "node:http";
import { registerPluginHttpRoute } from "../plugins/http-registry.js";
import type { FixedWindowRateLimiter } from "./webhook-memory-guards.js";
import { normalizeWebhookPath } from "./webhook-path.js";
import {
beginWebhookRequestPipelineOrReject,
type WebhookInFlightLimiter,
} from "./webhook-request-guards.js";
export type RegisteredWebhookTarget<T> = {
target: T;
@@ -107,6 +112,55 @@ export function resolveWebhookTargets<T>(
return { path, targets };
}
export async function withResolvedWebhookRequestPipeline<T>(params: {
req: IncomingMessage;
res: ServerResponse;
targetsByPath: Map<string, T[]>;
allowMethods?: readonly string[];
rateLimiter?: FixedWindowRateLimiter;
rateLimitKey?: string;
nowMs?: number;
requireJsonContentType?: boolean;
inFlightLimiter?: WebhookInFlightLimiter;
inFlightKey?: string | ((args: { req: IncomingMessage; path: string; targets: T[] }) => string);
inFlightLimitStatusCode?: number;
inFlightLimitMessage?: string;
handle: (args: { path: string; targets: T[] }) => Promise<boolean | void> | boolean | void;
}): Promise<boolean> {
const resolved = resolveWebhookTargets(params.req, params.targetsByPath);
if (!resolved) {
return false;
}
const inFlightKey =
typeof params.inFlightKey === "function"
? params.inFlightKey({ req: params.req, path: resolved.path, targets: resolved.targets })
: (params.inFlightKey ?? `${resolved.path}:${params.req.socket?.remoteAddress ?? "unknown"}`);
const requestLifecycle = beginWebhookRequestPipelineOrReject({
req: params.req,
res: params.res,
allowMethods: params.allowMethods,
rateLimiter: params.rateLimiter,
rateLimitKey: params.rateLimitKey,
nowMs: params.nowMs,
requireJsonContentType: params.requireJsonContentType,
inFlightLimiter: params.inFlightLimiter,
inFlightKey,
inFlightLimitStatusCode: params.inFlightLimitStatusCode,
inFlightLimitMessage: params.inFlightLimitMessage,
});
if (!requestLifecycle.ok) {
return true;
}
try {
await params.handle(resolved);
return true;
} finally {
requestLifecycle.release();
}
}
export type WebhookTargetMatchResult<T> =
| { kind: "none" }
| { kind: "single"; target: T }

View File

@@ -106,6 +106,8 @@ export type {
export {
registerWebhookTarget,
registerWebhookTargetWithPluginRoute,
resolveWebhookTargetWithAuthOrRejectSync,
resolveSingleWebhookTarget,
resolveWebhookTargets,
withResolvedWebhookRequestPipeline,
} from "./webhook-targets.js";