From 42a9b7d64b28b4ebe4b5737d062dd50f40edfee1 Mon Sep 17 00:00:00 2001 From: Mariano Belinky Date: Mon, 6 Apr 2026 15:19:33 +0200 Subject: [PATCH] plugins: add bundled webhooks TaskFlow bridge --- .github/labeler.yml | 4 + CHANGELOG.md | 4 + extensions/webhooks/api.ts | 6 + extensions/webhooks/index.ts | 51 ++ extensions/webhooks/openclaw.plugin.json | 48 ++ extensions/webhooks/package.json | 18 + extensions/webhooks/runtime-api.ts | 16 + extensions/webhooks/src/config.test.ts | 86 ++ extensions/webhooks/src/config.ts | 95 +++ extensions/webhooks/src/http.test.ts | 375 +++++++++ extensions/webhooks/src/http.ts | 795 ++++++++++++++++++ pnpm-lock.yaml | 6 + .../lib/bundled-runtime-sidecar-paths.json | 1 + 13 files changed, 1505 insertions(+) create mode 100644 extensions/webhooks/api.ts create mode 100644 extensions/webhooks/index.ts create mode 100644 extensions/webhooks/openclaw.plugin.json create mode 100644 extensions/webhooks/package.json create mode 100644 extensions/webhooks/runtime-api.ts create mode 100644 extensions/webhooks/src/config.test.ts create mode 100644 extensions/webhooks/src/config.ts create mode 100644 extensions/webhooks/src/http.test.ts create mode 100644 extensions/webhooks/src/http.ts diff --git a/.github/labeler.yml b/.github/labeler.yml index 2ee9c600432..70898371a57 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -241,6 +241,10 @@ - changed-files: - any-glob-to-any-file: - "extensions/open-prose/**" +"extensions: webhooks": + - changed-files: + - any-glob-to-any-file: + - "extensions/webhooks/**" "extensions: device-pair": - changed-files: - any-glob-to-any-file: diff --git a/CHANGELOG.md b/CHANGELOG.md index d3c76723242..57fddbdb278 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ Docs: https://docs.openclaw.ai ## Unreleased +### Changes + +- Plugins/webhooks: add a bundled webhook ingress plugin so external automation can create and drive bound TaskFlows through per-route shared-secret endpoints. Thanks @mbelinky. + ### Fixes - Providers/Google: recognize Gemma model ids in native Google forward-compat resolution, keep the requested provider when cloning fallback templates, and force Gemma reasoning off so Gemma 4 routes stop failing through the Google catalog fallback. (#61507) Thanks @eyjohn. diff --git a/extensions/webhooks/api.ts b/extensions/webhooks/api.ts new file mode 100644 index 00000000000..89c896a40cb --- /dev/null +++ b/extensions/webhooks/api.ts @@ -0,0 +1,6 @@ +export { + definePluginEntry, + type OpenClawPluginApi, + type PluginLogger, + type PluginRuntime, +} from "openclaw/plugin-sdk/core"; diff --git a/extensions/webhooks/index.ts b/extensions/webhooks/index.ts new file mode 100644 index 00000000000..9d838dc219b --- /dev/null +++ b/extensions/webhooks/index.ts @@ -0,0 +1,51 @@ +import { definePluginEntry, type OpenClawPluginApi } from "./api.js"; +import { resolveWebhooksPluginConfig } from "./src/config.js"; +import { createTaskFlowWebhookRequestHandler, type TaskFlowWebhookTarget } from "./src/http.js"; + +export default definePluginEntry({ + id: "webhooks", + name: "Webhooks", + description: + "Authenticated inbound webhooks that bind external automation to OpenClaw TaskFlows.", + async register(api: OpenClawPluginApi) { + const routes = await resolveWebhooksPluginConfig({ + pluginConfig: api.pluginConfig, + cfg: api.config, + env: process.env, + logger: api.logger, + }); + if (routes.length === 0) { + return; + } + + const targetsByPath = new Map(); + const handler = createTaskFlowWebhookRequestHandler({ + cfg: api.config, + targetsByPath, + }); + + for (const route of routes) { + const taskFlow = api.runtime.taskFlow.bindSession({ + sessionKey: route.sessionKey, + }); + const target: TaskFlowWebhookTarget = { + routeId: route.routeId, + path: route.path, + secret: route.secret, + defaultControllerId: route.controllerId, + taskFlow, + }; + targetsByPath.set(target.path, [...(targetsByPath.get(target.path) ?? []), target]); + api.registerHttpRoute({ + path: target.path, + auth: "plugin", + match: "exact", + replaceExisting: true, + handler, + }); + api.logger.info?.( + `[webhooks] registered route ${route.routeId} on ${route.path} for session ${route.sessionKey}`, + ); + } + }, +}); diff --git a/extensions/webhooks/openclaw.plugin.json b/extensions/webhooks/openclaw.plugin.json new file mode 100644 index 00000000000..739baa8b5c9 --- /dev/null +++ b/extensions/webhooks/openclaw.plugin.json @@ -0,0 +1,48 @@ +{ + "id": "webhooks", + "name": "Webhooks", + "description": "Authenticated inbound webhooks that bind external automation to OpenClaw TaskFlows.", + "configSchema": { + "type": "object", + "additionalProperties": false, + "$defs": { + "secretRef": { + "type": "object", + "additionalProperties": false, + "properties": { + "source": { + "type": "string", + "enum": ["env", "file", "exec"] + }, + "provider": { "type": "string" }, + "id": { "type": "string" } + }, + "required": ["source", "provider", "id"] + }, + "secretInput": { + "anyOf": [{ "type": "string", "minLength": 1 }, { "$ref": "#/$defs/secretRef" }] + }, + "route": { + "type": "object", + "additionalProperties": false, + "properties": { + "enabled": { "type": "boolean" }, + "path": { "type": "string", "minLength": 1 }, + "sessionKey": { "type": "string", "minLength": 1 }, + "secret": { "$ref": "#/$defs/secretInput" }, + "controllerId": { "type": "string", "minLength": 1 }, + "description": { "type": "string" } + }, + "required": ["sessionKey", "secret"] + } + }, + "properties": { + "routes": { + "type": "object", + "additionalProperties": { + "$ref": "#/$defs/route" + } + } + } + } +} diff --git a/extensions/webhooks/package.json b/extensions/webhooks/package.json new file mode 100644 index 00000000000..899a19a34d9 --- /dev/null +++ b/extensions/webhooks/package.json @@ -0,0 +1,18 @@ +{ + "name": "@openclaw/webhooks", + "version": "2026.4.6", + "private": true, + "description": "OpenClaw webhook bridge plugin", + "type": "module", + "dependencies": { + "zod": "^4.3.6" + }, + "openclaw": { + "bundle": { + "stageRuntimeDependencies": true + }, + "extensions": [ + "./index.ts" + ] + } +} diff --git a/extensions/webhooks/runtime-api.ts b/extensions/webhooks/runtime-api.ts new file mode 100644 index 00000000000..3eec785cb32 --- /dev/null +++ b/extensions/webhooks/runtime-api.ts @@ -0,0 +1,16 @@ +export { + createFixedWindowRateLimiter, + createWebhookInFlightLimiter, + normalizeWebhookPath, + readJsonWebhookBodyOrReject, + resolveRequestClientIp, + resolveWebhookTargetWithAuthOrRejectSync, + withResolvedWebhookRequestPipeline, + WEBHOOK_IN_FLIGHT_DEFAULTS, + WEBHOOK_RATE_LIMIT_DEFAULTS, + type WebhookInFlightLimiter, +} from "openclaw/plugin-sdk/webhook-ingress"; +export { + resolveConfiguredSecretInputString, + type OpenClawConfig, +} from "openclaw/plugin-sdk/config-runtime"; diff --git a/extensions/webhooks/src/config.test.ts b/extensions/webhooks/src/config.test.ts new file mode 100644 index 00000000000..946583208db --- /dev/null +++ b/extensions/webhooks/src/config.test.ts @@ -0,0 +1,86 @@ +import { describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../runtime-api.js"; +import { resolveWebhooksPluginConfig } from "./config.js"; + +describe("resolveWebhooksPluginConfig", () => { + it("resolves default paths and SecretRef-backed secrets", async () => { + const routes = await resolveWebhooksPluginConfig({ + pluginConfig: { + routes: { + zapier: { + sessionKey: "agent:main:main", + secret: { + source: "env", + provider: "default", + id: "OPENCLAW_WEBHOOK_SECRET", + }, + }, + }, + }, + cfg: {} as OpenClawConfig, + env: { + OPENCLAW_WEBHOOK_SECRET: "shared-secret", + }, + }); + + expect(routes).toEqual([ + { + routeId: "zapier", + path: "/plugins/webhooks/zapier", + sessionKey: "agent:main:main", + secret: "shared-secret", + controllerId: "webhooks/zapier", + }, + ]); + }); + + it("skips routes whose secret cannot be resolved", async () => { + const warn = vi.fn(); + + const routes = await resolveWebhooksPluginConfig({ + pluginConfig: { + routes: { + missing: { + sessionKey: "agent:main:main", + secret: { + source: "env", + provider: "default", + id: "MISSING_SECRET", + }, + }, + }, + }, + cfg: {} as OpenClawConfig, + env: {}, + logger: { warn } as never, + }); + + expect(routes).toEqual([]); + expect(warn).toHaveBeenCalledWith( + expect.stringContaining("[webhooks] skipping route missing:"), + ); + }); + + it("rejects duplicate normalized paths", async () => { + await expect( + resolveWebhooksPluginConfig({ + pluginConfig: { + routes: { + first: { + path: "/plugins/webhooks/shared", + sessionKey: "agent:main:main", + secret: "a", + }, + second: { + path: "/plugins/webhooks/shared/", + sessionKey: "agent:main:other", + secret: "b", + }, + }, + }, + cfg: {} as OpenClawConfig, + env: {}, + }), + ).rejects.toThrow(/conflicts with routes\.first\.path/i); + }); +}); diff --git a/extensions/webhooks/src/config.ts b/extensions/webhooks/src/config.ts new file mode 100644 index 00000000000..3c14802fb4b --- /dev/null +++ b/extensions/webhooks/src/config.ts @@ -0,0 +1,95 @@ +import { z } from "zod"; +import type { PluginLogger } from "../api.js"; +import { + normalizeWebhookPath, + resolveConfiguredSecretInputString, + type OpenClawConfig, +} from "../runtime-api.js"; + +const secretRefSchema = z + .object({ + source: z.enum(["env", "file", "exec"]), + provider: z.string().trim().min(1), + id: z.string().trim().min(1), + }) + .strict(); + +const secretInputSchema = z.union([z.string().trim().min(1), secretRefSchema]); + +const webhookRouteConfigSchema = z + .object({ + enabled: z.boolean().optional().default(true), + path: z.string().trim().min(1).optional(), + sessionKey: z.string().trim().min(1), + secret: secretInputSchema, + controllerId: z.string().trim().min(1).optional(), + description: z.string().trim().min(1).optional(), + }) + .strict(); + +const webhooksPluginConfigSchema = z + .object({ + routes: z.record(z.string().trim().min(1), webhookRouteConfigSchema).default({}), + }) + .strict(); + +export type ResolvedWebhookRouteConfig = { + routeId: string; + path: string; + sessionKey: string; + secret: string; + controllerId: string; + description?: string; +}; + +export async function resolveWebhooksPluginConfig(params: { + pluginConfig: unknown; + cfg: OpenClawConfig; + env: NodeJS.ProcessEnv; + logger?: PluginLogger; +}): Promise { + const parsed = webhooksPluginConfigSchema.parse(params.pluginConfig ?? {}); + const resolvedRoutes: ResolvedWebhookRouteConfig[] = []; + const seenPaths = new Map(); + + for (const [routeId, route] of Object.entries(parsed.routes)) { + if (route.enabled === false) { + continue; + } + const path = normalizeWebhookPath(route.path ?? `/plugins/webhooks/${routeId}`); + const existingRouteId = seenPaths.get(path); + if (existingRouteId) { + throw new Error( + `webhooks.routes.${routeId}.path conflicts with routes.${existingRouteId}.path (${path}).`, + ); + } + + const secretResolution = await resolveConfiguredSecretInputString({ + config: params.cfg, + env: params.env, + value: route.secret, + path: `plugins.entries.webhooks.routes.${routeId}.secret`, + }); + const secret = secretResolution.value?.trim(); + if (!secret) { + params.logger?.warn?.( + `[webhooks] skipping route ${routeId}: ${ + secretResolution.unresolvedRefReason ?? "secret is empty or unresolved" + }`, + ); + continue; + } + + seenPaths.set(path, routeId); + resolvedRoutes.push({ + routeId, + path, + sessionKey: route.sessionKey, + secret, + controllerId: route.controllerId ?? `webhooks/${routeId}`, + ...(route.description ? { description: route.description } : {}), + }); + } + + return resolvedRoutes; +} diff --git a/extensions/webhooks/src/http.test.ts b/extensions/webhooks/src/http.test.ts new file mode 100644 index 00000000000..e2ee560ff00 --- /dev/null +++ b/extensions/webhooks/src/http.test.ts @@ -0,0 +1,375 @@ +import { EventEmitter } from "node:events"; +import type { IncomingMessage } from "node:http"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createRuntimeTaskFlow } from "../../../src/plugins/runtime/runtime-taskflow.js"; +import { createMockServerResponse } from "../../../src/test-utils/mock-http-response.js"; +import type { OpenClawConfig } from "../runtime-api.js"; +import { createTaskFlowWebhookRequestHandler, type TaskFlowWebhookTarget } from "./http.js"; + +const hoisted = vi.hoisted(() => { + const sendMessageMock = vi.fn(); + const cancelSessionMock = vi.fn(); + const killSubagentRunAdminMock = vi.fn(); + return { + sendMessageMock, + cancelSessionMock, + killSubagentRunAdminMock, + }; +}); + +vi.mock("../../../src/tasks/task-registry-delivery-runtime.js", () => ({ + sendMessage: hoisted.sendMessageMock, +})); + +vi.mock("../../../src/acp/control-plane/manager.js", () => ({ + getAcpSessionManager: () => ({ + cancelSession: hoisted.cancelSessionMock, + }), +})); + +vi.mock("../../../src/agents/subagent-control.js", () => ({ + killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params), +})); + +type MockIncomingMessage = IncomingMessage & { + destroyed?: boolean; + destroy: () => MockIncomingMessage; + socket: { remoteAddress: string }; +}; + +let nextSessionId = 0; + +function createJsonRequest(params: { + path: string; + secret?: string; + body: unknown; +}): MockIncomingMessage { + const req = new EventEmitter() as MockIncomingMessage; + req.method = "POST"; + req.url = params.path; + req.headers = { + "content-type": "application/json", + ...(params.secret ? { "x-openclaw-webhook-secret": params.secret } : {}), + }; + req.socket = { remoteAddress: "127.0.0.1" } as MockIncomingMessage["socket"]; + req.destroyed = false; + req.destroy = (() => { + req.destroyed = true; + return req; + }) as MockIncomingMessage["destroy"]; + + void Promise.resolve().then(() => { + req.emit("data", Buffer.from(JSON.stringify(params.body), "utf8")); + req.emit("end"); + }); + + return req; +} + +function createHandler(): { + handler: ReturnType; + target: TaskFlowWebhookTarget; +} { + const runtime = createRuntimeTaskFlow(); + nextSessionId += 1; + const target: TaskFlowWebhookTarget = { + routeId: "zapier", + path: "/plugins/webhooks/zapier", + secret: "shared-secret", + defaultControllerId: "webhooks/zapier", + taskFlow: runtime.bindSession({ + sessionKey: `agent:main:webhook-test-${String(nextSessionId)}`, + }), + }; + const targetsByPath = new Map([[target.path, [target]]]); + return { + handler: createTaskFlowWebhookRequestHandler({ + cfg: {} as OpenClawConfig, + targetsByPath, + }), + target, + }; +} + +async function dispatchJsonRequest(params: { + handler: ReturnType; + path: string; + secret?: string; + body: unknown; +}) { + const req = createJsonRequest({ + path: params.path, + secret: params.secret, + body: params.body, + }); + const res = createMockServerResponse(); + await params.handler(req, res); + return res; +} + +function parseJsonBody(res: { body?: string | Buffer | null }) { + return JSON.parse(String(res.body ?? "")); +} + +afterEach(() => { + vi.clearAllMocks(); +}); + +describe("createTaskFlowWebhookRequestHandler", () => { + it("rejects requests with the wrong secret", async () => { + const { handler, target } = createHandler(); + const res = await dispatchJsonRequest({ + handler, + path: target.path, + secret: "wrong-secret", + body: { + action: "list_flows", + }, + }); + + expect(res.statusCode).toBe(401); + expect(res.body).toBe("unauthorized"); + expect(target.taskFlow.list()).toEqual([]); + }); + + it("creates flows through the bound session and scrubs owner metadata from responses", async () => { + const { handler, target } = createHandler(); + const res = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "create_flow", + goal: "Review inbound queue", + }, + }); + + expect(res.statusCode).toBe(200); + const parsed = parseJsonBody(res); + expect(parsed.ok).toBe(true); + expect(parsed.result.flow).toMatchObject({ + syncMode: "managed", + controllerId: "webhooks/zapier", + goal: "Review inbound queue", + }); + expect(parsed.result.flow.ownerKey).toBeUndefined(); + expect(parsed.result.flow.requesterOrigin).toBeUndefined(); + expect(target.taskFlow.get(parsed.result.flow.flowId)?.flowId).toBe(parsed.result.flow.flowId); + }); + + it("runs child tasks and scrubs task ownership fields from responses", async () => { + const { handler, target } = createHandler(); + const flow = target.taskFlow.createManaged({ + controllerId: "webhooks/zapier", + goal: "Triage inbox", + }); + const res = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "run_task", + flowId: flow.flowId, + runtime: "acp", + childSessionKey: "agent:main:subagent:child", + task: "Inspect the next message batch", + status: "running", + startedAt: 10, + lastEventAt: 10, + }, + }); + + expect(res.statusCode).toBe(200); + const parsed = parseJsonBody(res); + expect(parsed.ok).toBe(true); + expect(parsed.result.created).toBe(true); + expect(parsed.result.task).toMatchObject({ + parentFlowId: flow.flowId, + childSessionKey: "agent:main:subagent:child", + runtime: "acp", + }); + expect(parsed.result.task.ownerKey).toBeUndefined(); + expect(parsed.result.task.requesterSessionKey).toBeUndefined(); + }); + + it("returns 404 for missing flow mutations", async () => { + const { handler, target } = createHandler(); + const res = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "set_waiting", + flowId: "flow-missing", + expectedRevision: 0, + }, + }); + + expect(res.statusCode).toBe(404); + const parsed = parseJsonBody(res); + expect(parsed).toMatchObject({ + ok: false, + code: "not_found", + error: "TaskFlow not found.", + result: { + applied: false, + code: "not_found", + }, + }); + }); + + it("returns 409 for revision conflicts", async () => { + const { handler, target } = createHandler(); + const flow = target.taskFlow.createManaged({ + controllerId: "webhooks/zapier", + goal: "Review inbox", + }); + const res = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "set_waiting", + flowId: flow.flowId, + expectedRevision: flow.revision + 1, + }, + }); + + expect(res.statusCode).toBe(409); + const parsed = parseJsonBody(res); + expect(parsed).toMatchObject({ + ok: false, + code: "revision_conflict", + result: { + applied: false, + code: "revision_conflict", + current: { + flowId: flow.flowId, + revision: flow.revision, + }, + }, + }); + }); + + it("rejects internal runtimes and running-only metadata from external callers", async () => { + const { handler, target } = createHandler(); + const flow = target.taskFlow.createManaged({ + controllerId: "webhooks/zapier", + goal: "Review inbox", + }); + + const runtimeRes = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "run_task", + flowId: flow.flowId, + runtime: "cli", + task: "Inspect queue", + }, + }); + expect(runtimeRes.statusCode).toBe(400); + expect(parseJsonBody(runtimeRes)).toMatchObject({ + ok: false, + code: "invalid_request", + }); + + const queuedMetadataRes = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "run_task", + flowId: flow.flowId, + runtime: "acp", + task: "Inspect queue", + startedAt: 10, + }, + }); + expect(queuedMetadataRes.statusCode).toBe(400); + expect(parseJsonBody(queuedMetadataRes)).toMatchObject({ + ok: false, + code: "invalid_request", + error: + "status: status must be running when startedAt, lastEventAt, or progressSummary is provided", + }); + }); + + it("reuses the same task record when retried with the same runId", async () => { + const { handler, target } = createHandler(); + const flow = target.taskFlow.createManaged({ + controllerId: "webhooks/zapier", + goal: "Triage inbox", + }); + + const first = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "run_task", + flowId: flow.flowId, + runtime: "acp", + childSessionKey: "agent:main:subagent:child", + runId: "retry-me", + task: "Inspect the next message batch", + }, + }); + const second = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "run_task", + flowId: flow.flowId, + runtime: "acp", + childSessionKey: "agent:main:subagent:child", + runId: "retry-me", + task: "Inspect the next message batch", + }, + }); + + expect(first.statusCode).toBe(200); + expect(second.statusCode).toBe(200); + const firstParsed = parseJsonBody(first); + const secondParsed = parseJsonBody(second); + expect(firstParsed.result.task.taskId).toBe(secondParsed.result.task.taskId); + expect(target.taskFlow.getTaskSummary(flow.flowId)?.total).toBe(1); + }); + + it("returns 409 when cancellation targets a terminal flow", async () => { + const { handler, target } = createHandler(); + const flow = target.taskFlow.createManaged({ + controllerId: "webhooks/zapier", + goal: "Review inbox", + }); + const finished = target.taskFlow.finish({ + flowId: flow.flowId, + expectedRevision: flow.revision, + }); + expect(finished.applied).toBe(true); + + const res = await dispatchJsonRequest({ + handler, + path: target.path, + secret: target.secret, + body: { + action: "cancel_flow", + flowId: flow.flowId, + }, + }); + + expect(res.statusCode).toBe(409); + expect(parseJsonBody(res)).toMatchObject({ + ok: false, + code: "terminal", + error: "Flow is already succeeded.", + result: { + found: true, + cancelled: false, + reason: "Flow is already succeeded.", + }, + }); + }); +}); diff --git a/extensions/webhooks/src/http.ts b/extensions/webhooks/src/http.ts new file mode 100644 index 00000000000..6be3343e1c0 --- /dev/null +++ b/extensions/webhooks/src/http.ts @@ -0,0 +1,795 @@ +import type { IncomingMessage, ServerResponse } from "node:http"; +import { safeEqualSecret } from "openclaw/plugin-sdk/browser-security-runtime"; +import { z } from "zod"; +import type { PluginRuntime } from "../api.js"; +import { + createFixedWindowRateLimiter, + createWebhookInFlightLimiter, + readJsonWebhookBodyOrReject, + resolveRequestClientIp, + resolveWebhookTargetWithAuthOrRejectSync, + withResolvedWebhookRequestPipeline, + WEBHOOK_IN_FLIGHT_DEFAULTS, + WEBHOOK_RATE_LIMIT_DEFAULTS, + type OpenClawConfig, + type WebhookInFlightLimiter, +} from "../runtime-api.js"; + +type BoundTaskFlowRuntime = ReturnType; + +type JsonValue = null | boolean | number | string | JsonValue[] | { [key: string]: JsonValue }; + +const jsonValueSchema: z.ZodType = z.lazy(() => + z.union([ + z.null(), + z.boolean(), + z.number().finite(), + z.string(), + z.array(jsonValueSchema), + z.record(z.string(), jsonValueSchema), + ]), +); + +const nullableStringSchema = z.string().trim().min(1).nullable().optional(); + +const createFlowRequestSchema = z + .object({ + action: z.literal("create_flow"), + controllerId: z.string().trim().min(1).optional(), + goal: z.string().trim().min(1), + status: z.enum(["queued", "running", "waiting", "blocked"]).optional(), + notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(), + currentStep: nullableStringSchema, + stateJson: jsonValueSchema.nullable().optional(), + waitJson: jsonValueSchema.nullable().optional(), + }) + .strict(); + +const getFlowRequestSchema = z + .object({ action: z.literal("get_flow"), flowId: z.string().trim().min(1) }) + .strict(); +const listFlowsRequestSchema = z.object({ action: z.literal("list_flows") }).strict(); +const findLatestFlowRequestSchema = z.object({ action: z.literal("find_latest_flow") }).strict(); +const resolveFlowRequestSchema = z + .object({ action: z.literal("resolve_flow"), token: z.string().trim().min(1) }) + .strict(); +const getTaskSummaryRequestSchema = z + .object({ action: z.literal("get_task_summary"), flowId: z.string().trim().min(1) }) + .strict(); + +const setWaitingRequestSchema = z + .object({ + action: z.literal("set_waiting"), + flowId: z.string().trim().min(1), + expectedRevision: z.number().int().nonnegative(), + currentStep: nullableStringSchema, + stateJson: jsonValueSchema.nullable().optional(), + waitJson: jsonValueSchema.nullable().optional(), + blockedTaskId: nullableStringSchema, + blockedSummary: nullableStringSchema, + }) + .strict(); + +const resumeFlowRequestSchema = z + .object({ + action: z.literal("resume_flow"), + flowId: z.string().trim().min(1), + expectedRevision: z.number().int().nonnegative(), + status: z.enum(["queued", "running"]).optional(), + currentStep: nullableStringSchema, + stateJson: jsonValueSchema.nullable().optional(), + }) + .strict(); + +const finishFlowRequestSchema = z + .object({ + action: z.literal("finish_flow"), + flowId: z.string().trim().min(1), + expectedRevision: z.number().int().nonnegative(), + stateJson: jsonValueSchema.nullable().optional(), + }) + .strict(); + +const failFlowRequestSchema = z + .object({ + action: z.literal("fail_flow"), + flowId: z.string().trim().min(1), + expectedRevision: z.number().int().nonnegative(), + stateJson: jsonValueSchema.nullable().optional(), + blockedTaskId: nullableStringSchema, + blockedSummary: nullableStringSchema, + }) + .strict(); + +const requestCancelRequestSchema = z + .object({ + action: z.literal("request_cancel"), + flowId: z.string().trim().min(1), + expectedRevision: z.number().int().nonnegative(), + }) + .strict(); + +const cancelFlowRequestSchema = z + .object({ + action: z.literal("cancel_flow"), + flowId: z.string().trim().min(1), + }) + .strict(); + +const runTaskRequestSchema = z + .object({ + action: z.literal("run_task"), + flowId: z.string().trim().min(1), + runtime: z.enum(["subagent", "acp"]), + sourceId: z.string().trim().min(1).optional(), + childSessionKey: z.string().trim().min(1).optional(), + parentTaskId: z.string().trim().min(1).optional(), + agentId: z.string().trim().min(1).optional(), + runId: z.string().trim().min(1).optional(), + label: z.string().trim().min(1).optional(), + task: z.string().trim().min(1), + preferMetadata: z.boolean().optional(), + notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(), + status: z.enum(["queued", "running"]).optional(), + startedAt: z.number().int().nonnegative().optional(), + lastEventAt: z.number().int().nonnegative().optional(), + progressSummary: nullableStringSchema, + }) + .strict() + .superRefine((value, ctx) => { + if ( + value.status !== "running" && + (value.startedAt !== undefined || + value.lastEventAt !== undefined || + value.progressSummary !== undefined) + ) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: + "status must be running when startedAt, lastEventAt, or progressSummary is provided", + path: ["status"], + }); + } + }); + +const webhookActionSchema = z.discriminatedUnion("action", [ + createFlowRequestSchema, + getFlowRequestSchema, + listFlowsRequestSchema, + findLatestFlowRequestSchema, + resolveFlowRequestSchema, + getTaskSummaryRequestSchema, + setWaitingRequestSchema, + resumeFlowRequestSchema, + finishFlowRequestSchema, + failFlowRequestSchema, + requestCancelRequestSchema, + cancelFlowRequestSchema, + runTaskRequestSchema, +]); + +type WebhookAction = z.infer; + +export type TaskFlowWebhookTarget = { + routeId: string; + path: string; + secret: string; + defaultControllerId: string; + taskFlow: BoundTaskFlowRuntime; +}; + +type FlowView = { + flowId: string; + syncMode: "task_mirrored" | "managed"; + controllerId?: string; + revision: number; + status: string; + notifyPolicy: string; + goal: string; + currentStep?: string; + blockedTaskId?: string; + blockedSummary?: string; + stateJson?: JsonValue; + waitJson?: JsonValue; + cancelRequestedAt?: number; + createdAt: number; + updatedAt: number; + endedAt?: number; +}; + +type TaskView = { + taskId: string; + runtime: string; + sourceId?: string; + scopeKind: string; + childSessionKey?: string; + parentFlowId?: string; + parentTaskId?: string; + agentId?: string; + runId?: string; + label?: string; + task: string; + status: string; + deliveryStatus: string; + notifyPolicy: string; + createdAt: number; + startedAt?: number; + endedAt?: number; + lastEventAt?: number; + cleanupAfter?: number; + error?: string; + progressSummary?: string; + terminalSummary?: string; + terminalOutcome?: string; +}; + +function toFlowView(flow: { + flowId: string; + syncMode: "task_mirrored" | "managed"; + controllerId?: string; + revision: number; + status: string; + notifyPolicy: string; + goal: string; + currentStep?: string; + blockedTaskId?: string; + blockedSummary?: string; + stateJson?: JsonValue; + waitJson?: JsonValue; + cancelRequestedAt?: number; + createdAt: number; + updatedAt: number; + endedAt?: number; +}): FlowView { + return { + flowId: flow.flowId, + syncMode: flow.syncMode, + ...(flow.controllerId ? { controllerId: flow.controllerId } : {}), + revision: flow.revision, + status: flow.status, + notifyPolicy: flow.notifyPolicy, + goal: flow.goal, + ...(flow.currentStep ? { currentStep: flow.currentStep } : {}), + ...(flow.blockedTaskId ? { blockedTaskId: flow.blockedTaskId } : {}), + ...(flow.blockedSummary ? { blockedSummary: flow.blockedSummary } : {}), + ...(flow.stateJson !== undefined ? { stateJson: flow.stateJson } : {}), + ...(flow.waitJson !== undefined ? { waitJson: flow.waitJson } : {}), + ...(flow.cancelRequestedAt !== undefined ? { cancelRequestedAt: flow.cancelRequestedAt } : {}), + createdAt: flow.createdAt, + updatedAt: flow.updatedAt, + ...(flow.endedAt !== undefined ? { endedAt: flow.endedAt } : {}), + }; +} + +function toTaskView(task: { + taskId: string; + runtime: string; + sourceId?: string; + scopeKind: string; + childSessionKey?: string; + parentFlowId?: string; + parentTaskId?: string; + agentId?: string; + runId?: string; + label?: string; + task: string; + status: string; + deliveryStatus: string; + notifyPolicy: string; + createdAt: number; + startedAt?: number; + endedAt?: number; + lastEventAt?: number; + cleanupAfter?: number; + error?: string; + progressSummary?: string; + terminalSummary?: string; + terminalOutcome?: string; +}): TaskView { + return { + taskId: task.taskId, + runtime: task.runtime, + ...(task.sourceId ? { sourceId: task.sourceId } : {}), + scopeKind: task.scopeKind, + ...(task.childSessionKey ? { childSessionKey: task.childSessionKey } : {}), + ...(task.parentFlowId ? { parentFlowId: task.parentFlowId } : {}), + ...(task.parentTaskId ? { parentTaskId: task.parentTaskId } : {}), + ...(task.agentId ? { agentId: task.agentId } : {}), + ...(task.runId ? { runId: task.runId } : {}), + ...(task.label ? { label: task.label } : {}), + task: task.task, + status: task.status, + deliveryStatus: task.deliveryStatus, + notifyPolicy: task.notifyPolicy, + createdAt: task.createdAt, + ...(task.startedAt !== undefined ? { startedAt: task.startedAt } : {}), + ...(task.endedAt !== undefined ? { endedAt: task.endedAt } : {}), + ...(task.lastEventAt !== undefined ? { lastEventAt: task.lastEventAt } : {}), + ...(task.cleanupAfter !== undefined ? { cleanupAfter: task.cleanupAfter } : {}), + ...(task.error ? { error: task.error } : {}), + ...(task.progressSummary ? { progressSummary: task.progressSummary } : {}), + ...(task.terminalSummary ? { terminalSummary: task.terminalSummary } : {}), + ...(task.terminalOutcome ? { terminalOutcome: task.terminalOutcome } : {}), + }; +} + +function writeJson(res: ServerResponse, statusCode: number, body: unknown): void { + res.statusCode = statusCode; + res.setHeader("Content-Type", "application/json; charset=utf-8"); + res.end(JSON.stringify(body)); +} + +function extractSharedSecret(req: IncomingMessage): string { + const authHeader = Array.isArray(req.headers.authorization) + ? String(req.headers.authorization[0] ?? "") + : String(req.headers.authorization ?? ""); + if (authHeader.toLowerCase().startsWith("bearer ")) { + return authHeader.slice("bearer ".length).trim(); + } + const sharedHeader = req.headers["x-openclaw-webhook-secret"]; + return Array.isArray(sharedHeader) + ? String(sharedHeader[0] ?? "").trim() + : String(sharedHeader ?? "").trim(); +} + +function timingSafeEquals(left: string, right: string): boolean { + // Reuse the shared helper so webhook auth semantics stay aligned across plugins. + return safeEqualSecret(left, right); +} + +function formatZodError(error: z.ZodError): string { + const firstIssue = error.issues[0]; + if (!firstIssue) { + return "invalid request"; + } + const path = firstIssue.path.length > 0 ? `${firstIssue.path.join(".")}: ` : ""; + return `${path}${firstIssue.message}`; +} + +function mapMutationResult( + result: + | { + applied: true; + flow: FlowView; + } + | { + applied: false; + code: string; + current?: FlowView; + }, +): unknown { + return result; +} + +function mapMutationStatus(result: { + applied: boolean; + code?: "not_found" | "not_managed" | "revision_conflict"; +}): { statusCode: number; code?: string; error?: string } { + if (result.applied) { + return { statusCode: 200 }; + } + switch (result.code) { + case "not_found": + return { + statusCode: 404, + code: "not_found", + error: "TaskFlow not found.", + }; + case "not_managed": + return { + statusCode: 409, + code: "not_managed", + error: "TaskFlow is not managed by this webhook surface.", + }; + case "revision_conflict": + return { + statusCode: 409, + code: "revision_conflict", + error: "TaskFlow changed since the caller's expected revision.", + }; + default: + return { + statusCode: 409, + code: "mutation_rejected", + error: "TaskFlow mutation was rejected.", + }; + } +} + +function mapRunTaskStatus(result: { created: boolean; found: boolean; reason?: string }): { + statusCode: number; + code?: string; + error?: string; +} { + if (result.created) { + return { statusCode: 200 }; + } + if (!result.found) { + return { + statusCode: 404, + code: "not_found", + error: "TaskFlow not found.", + }; + } + if (result.reason === "Flow cancellation has already been requested.") { + return { + statusCode: 409, + code: "cancel_requested", + error: result.reason, + }; + } + if (result.reason === "Flow does not accept managed child tasks.") { + return { + statusCode: 409, + code: "not_managed", + error: result.reason, + }; + } + if (result.reason?.startsWith("Flow is already ")) { + return { + statusCode: 409, + code: "terminal", + error: result.reason, + }; + } + return { + statusCode: 409, + code: "task_not_created", + error: result.reason ?? "TaskFlow task was not created.", + }; +} + +function mapCancelStatus(result: { found: boolean; cancelled: boolean; reason?: string }): { + statusCode: number; + code?: string; + error?: string; +} { + if (result.cancelled) { + return { statusCode: 200 }; + } + if (!result.found) { + return { + statusCode: 404, + code: "not_found", + error: "TaskFlow not found.", + }; + } + if (result.reason === "One or more child tasks are still active.") { + return { + statusCode: 202, + code: "cancel_pending", + error: result.reason, + }; + } + if (result.reason === "Flow changed while cancellation was in progress.") { + return { + statusCode: 409, + code: "revision_conflict", + error: result.reason, + }; + } + if (result.reason?.startsWith("Flow is already ")) { + return { + statusCode: 409, + code: "terminal", + error: result.reason, + }; + } + return { + statusCode: 409, + code: "cancel_rejected", + error: result.reason ?? "TaskFlow cancellation was rejected.", + }; +} + +function describeWebhookOutcome(params: { action: WebhookAction; result: unknown }): { + statusCode: number; + code?: string; + error?: string; +} { + switch (params.action.action) { + case "set_waiting": + case "resume_flow": + case "finish_flow": + case "fail_flow": + case "request_cancel": + return mapMutationStatus( + params.result as { + applied: boolean; + code?: "not_found" | "not_managed" | "revision_conflict"; + }, + ); + case "cancel_flow": + return mapCancelStatus( + params.result as { + found: boolean; + cancelled: boolean; + reason?: string; + }, + ); + case "run_task": + return mapRunTaskStatus( + params.result as { + created: boolean; + found: boolean; + reason?: string; + }, + ); + default: + return { statusCode: 200 }; + } +} + +async function executeWebhookAction(params: { + action: WebhookAction; + target: TaskFlowWebhookTarget; + cfg: OpenClawConfig; +}): Promise { + const { action, target } = params; + switch (action.action) { + case "create_flow": { + const flow = target.taskFlow.createManaged({ + controllerId: action.controllerId ?? target.defaultControllerId, + goal: action.goal, + status: action.status, + notifyPolicy: action.notifyPolicy, + currentStep: action.currentStep ?? undefined, + stateJson: action.stateJson, + waitJson: action.waitJson, + }); + return { flow: toFlowView(flow) }; + } + case "get_flow": { + const flow = target.taskFlow.get(action.flowId); + return { flow: flow ? toFlowView(flow) : null }; + } + case "list_flows": + return { flows: target.taskFlow.list().map(toFlowView) }; + case "find_latest_flow": { + const flow = target.taskFlow.findLatest(); + return { flow: flow ? toFlowView(flow) : null }; + } + case "resolve_flow": { + const flow = target.taskFlow.resolve(action.token); + return { flow: flow ? toFlowView(flow) : null }; + } + case "get_task_summary": + return { summary: target.taskFlow.getTaskSummary(action.flowId) ?? null }; + case "set_waiting": { + const result = target.taskFlow.setWaiting({ + flowId: action.flowId, + expectedRevision: action.expectedRevision, + currentStep: action.currentStep, + stateJson: action.stateJson, + waitJson: action.waitJson, + blockedTaskId: action.blockedTaskId, + blockedSummary: action.blockedSummary, + }); + return mapMutationResult( + result.applied + ? { applied: true, flow: toFlowView(result.flow) } + : { + applied: false, + code: result.code, + ...(result.current ? { current: toFlowView(result.current) } : {}), + }, + ); + } + case "resume_flow": { + const result = target.taskFlow.resume({ + flowId: action.flowId, + expectedRevision: action.expectedRevision, + status: action.status, + currentStep: action.currentStep, + stateJson: action.stateJson, + }); + return mapMutationResult( + result.applied + ? { applied: true, flow: toFlowView(result.flow) } + : { + applied: false, + code: result.code, + ...(result.current ? { current: toFlowView(result.current) } : {}), + }, + ); + } + case "finish_flow": { + const result = target.taskFlow.finish({ + flowId: action.flowId, + expectedRevision: action.expectedRevision, + stateJson: action.stateJson, + }); + return mapMutationResult( + result.applied + ? { applied: true, flow: toFlowView(result.flow) } + : { + applied: false, + code: result.code, + ...(result.current ? { current: toFlowView(result.current) } : {}), + }, + ); + } + case "fail_flow": { + const result = target.taskFlow.fail({ + flowId: action.flowId, + expectedRevision: action.expectedRevision, + stateJson: action.stateJson, + blockedTaskId: action.blockedTaskId, + blockedSummary: action.blockedSummary, + }); + return mapMutationResult( + result.applied + ? { applied: true, flow: toFlowView(result.flow) } + : { + applied: false, + code: result.code, + ...(result.current ? { current: toFlowView(result.current) } : {}), + }, + ); + } + case "request_cancel": { + const result = target.taskFlow.requestCancel({ + flowId: action.flowId, + expectedRevision: action.expectedRevision, + }); + return mapMutationResult( + result.applied + ? { applied: true, flow: toFlowView(result.flow) } + : { + applied: false, + code: result.code, + ...(result.current ? { current: toFlowView(result.current) } : {}), + }, + ); + } + case "cancel_flow": { + const result = await target.taskFlow.cancel({ + flowId: action.flowId, + cfg: params.cfg, + }); + return { + found: result.found, + cancelled: result.cancelled, + ...(result.reason ? { reason: result.reason } : {}), + ...(result.flow ? { flow: toFlowView(result.flow) } : {}), + ...(result.tasks ? { tasks: result.tasks.map(toTaskView) } : {}), + }; + } + case "run_task": { + const result = target.taskFlow.runTask({ + flowId: action.flowId, + runtime: action.runtime, + sourceId: action.sourceId, + childSessionKey: action.childSessionKey, + parentTaskId: action.parentTaskId, + agentId: action.agentId, + runId: action.runId, + label: action.label, + task: action.task, + preferMetadata: action.preferMetadata, + notifyPolicy: action.notifyPolicy, + status: action.status, + startedAt: action.startedAt, + lastEventAt: action.lastEventAt, + progressSummary: action.progressSummary, + }); + if (result.created) { + return { + created: true, + flow: toFlowView(result.flow), + task: toTaskView(result.task), + }; + } + return { + found: result.found, + created: false, + reason: result.reason, + ...(result.flow ? { flow: toFlowView(result.flow) } : {}), + }; + } + } +} + +export function createTaskFlowWebhookRequestHandler(params: { + cfg: OpenClawConfig; + targetsByPath: Map; + inFlightLimiter?: WebhookInFlightLimiter; +}): (req: IncomingMessage, res: ServerResponse) => Promise { + const rateLimiter = createFixedWindowRateLimiter({ + windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs, + maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests, + maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys, + }); + const inFlightLimiter = + params.inFlightLimiter ?? + createWebhookInFlightLimiter({ + maxInFlightPerKey: WEBHOOK_IN_FLIGHT_DEFAULTS.maxInFlightPerKey, + maxTrackedKeys: WEBHOOK_IN_FLIGHT_DEFAULTS.maxTrackedKeys, + }); + + return async (req: IncomingMessage, res: ServerResponse): Promise => { + return await withResolvedWebhookRequestPipeline({ + req, + res, + targetsByPath: params.targetsByPath, + allowMethods: ["POST"], + requireJsonContentType: true, + rateLimiter, + rateLimitKey: (() => { + const clientIp = + resolveRequestClientIp( + req, + params.cfg.gateway?.trustedProxies, + params.cfg.gateway?.allowRealIpFallback === true, + ) ?? + req.socket.remoteAddress ?? + "unknown"; + return `${new URL(req.url ?? "/", "http://localhost").pathname}:${clientIp}`; + })(), + inFlightLimiter, + handle: async ({ targets }) => { + const presentedSecret = extractSharedSecret(req); + const target = resolveWebhookTargetWithAuthOrRejectSync({ + targets, + res, + isMatch: (candidate) => + presentedSecret.length > 0 && timingSafeEquals(candidate.secret, presentedSecret), + }); + if (!target) { + return true; + } + + const body = await readJsonWebhookBodyOrReject({ + req, + res, + maxBytes: 256 * 1024, + timeoutMs: 15_000, + emptyObjectOnEmpty: false, + invalidJsonMessage: "invalid request body", + }); + if (!body.ok) { + return true; + } + + const parsed = webhookActionSchema.safeParse(body.value); + if (!parsed.success) { + writeJson(res, 400, { + ok: false, + code: "invalid_request", + error: formatZodError(parsed.error), + }); + return true; + } + + const result = await executeWebhookAction({ + action: parsed.data, + target, + cfg: params.cfg, + }); + const outcome = describeWebhookOutcome({ + action: parsed.data, + result, + }); + writeJson( + res, + outcome.statusCode, + outcome.statusCode < 400 + ? { + ok: true, + routeId: target.routeId, + ...(outcome.code ? { code: outcome.code } : {}), + result, + } + : { + ok: false, + routeId: target.routeId, + code: outcome.code ?? "request_rejected", + error: outcome.error ?? "request rejected", + result, + }, + ); + return true; + }, + }); + }; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 77d2208cc47..7132136a2d0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -739,6 +739,12 @@ importers: extensions/vydra: {} + extensions/webhooks: + dependencies: + zod: + specifier: ^4.3.6 + version: 4.3.6 + extensions/whatsapp: dependencies: '@whiskeysockets/baileys': diff --git a/scripts/lib/bundled-runtime-sidecar-paths.json b/scripts/lib/bundled-runtime-sidecar-paths.json index bcf07ed612f..0140b1caf52 100644 --- a/scripts/lib/bundled-runtime-sidecar-paths.json +++ b/scripts/lib/bundled-runtime-sidecar-paths.json @@ -31,6 +31,7 @@ "dist/extensions/tlon/runtime-api.js", "dist/extensions/twitch/runtime-api.js", "dist/extensions/voice-call/runtime-api.js", + "dist/extensions/webhooks/runtime-api.js", "dist/extensions/whatsapp/light-runtime-api.js", "dist/extensions/whatsapp/runtime-api.js", "dist/extensions/zai/runtime-api.js",