plugins: add bundled webhooks TaskFlow bridge

This commit is contained in:
Mariano Belinky
2026-04-06 15:19:33 +02:00
committed by mbelinky
parent f3cc8d12d6
commit 42a9b7d64b
13 changed files with 1505 additions and 0 deletions

4
.github/labeler.yml vendored
View File

@@ -241,6 +241,10 @@
- changed-files:
- any-glob-to-any-file:
- "extensions/open-prose/**"
"extensions: webhooks":
- changed-files:
- any-glob-to-any-file:
- "extensions/webhooks/**"
"extensions: device-pair":
- changed-files:
- any-glob-to-any-file:

View File

@@ -4,6 +4,10 @@ Docs: https://docs.openclaw.ai
## Unreleased
### Changes
- Plugins/webhooks: add a bundled webhook ingress plugin so external automation can create and drive bound TaskFlows through per-route shared-secret endpoints. Thanks @mbelinky.
### Fixes
- Providers/Google: recognize Gemma model ids in native Google forward-compat resolution, keep the requested provider when cloning fallback templates, and force Gemma reasoning off so Gemma 4 routes stop failing through the Google catalog fallback. (#61507) Thanks @eyjohn.

View File

@@ -0,0 +1,6 @@
export {
definePluginEntry,
type OpenClawPluginApi,
type PluginLogger,
type PluginRuntime,
} from "openclaw/plugin-sdk/core";

View File

@@ -0,0 +1,51 @@
import { definePluginEntry, type OpenClawPluginApi } from "./api.js";
import { resolveWebhooksPluginConfig } from "./src/config.js";
import { createTaskFlowWebhookRequestHandler, type TaskFlowWebhookTarget } from "./src/http.js";
export default definePluginEntry({
id: "webhooks",
name: "Webhooks",
description:
"Authenticated inbound webhooks that bind external automation to OpenClaw TaskFlows.",
async register(api: OpenClawPluginApi) {
const routes = await resolveWebhooksPluginConfig({
pluginConfig: api.pluginConfig,
cfg: api.config,
env: process.env,
logger: api.logger,
});
if (routes.length === 0) {
return;
}
const targetsByPath = new Map<string, TaskFlowWebhookTarget[]>();
const handler = createTaskFlowWebhookRequestHandler({
cfg: api.config,
targetsByPath,
});
for (const route of routes) {
const taskFlow = api.runtime.taskFlow.bindSession({
sessionKey: route.sessionKey,
});
const target: TaskFlowWebhookTarget = {
routeId: route.routeId,
path: route.path,
secret: route.secret,
defaultControllerId: route.controllerId,
taskFlow,
};
targetsByPath.set(target.path, [...(targetsByPath.get(target.path) ?? []), target]);
api.registerHttpRoute({
path: target.path,
auth: "plugin",
match: "exact",
replaceExisting: true,
handler,
});
api.logger.info?.(
`[webhooks] registered route ${route.routeId} on ${route.path} for session ${route.sessionKey}`,
);
}
},
});

View File

@@ -0,0 +1,48 @@
{
"id": "webhooks",
"name": "Webhooks",
"description": "Authenticated inbound webhooks that bind external automation to OpenClaw TaskFlows.",
"configSchema": {
"type": "object",
"additionalProperties": false,
"$defs": {
"secretRef": {
"type": "object",
"additionalProperties": false,
"properties": {
"source": {
"type": "string",
"enum": ["env", "file", "exec"]
},
"provider": { "type": "string" },
"id": { "type": "string" }
},
"required": ["source", "provider", "id"]
},
"secretInput": {
"anyOf": [{ "type": "string", "minLength": 1 }, { "$ref": "#/$defs/secretRef" }]
},
"route": {
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": { "type": "boolean" },
"path": { "type": "string", "minLength": 1 },
"sessionKey": { "type": "string", "minLength": 1 },
"secret": { "$ref": "#/$defs/secretInput" },
"controllerId": { "type": "string", "minLength": 1 },
"description": { "type": "string" }
},
"required": ["sessionKey", "secret"]
}
},
"properties": {
"routes": {
"type": "object",
"additionalProperties": {
"$ref": "#/$defs/route"
}
}
}
}
}

View File

@@ -0,0 +1,18 @@
{
"name": "@openclaw/webhooks",
"version": "2026.4.6",
"private": true,
"description": "OpenClaw webhook bridge plugin",
"type": "module",
"dependencies": {
"zod": "^4.3.6"
},
"openclaw": {
"bundle": {
"stageRuntimeDependencies": true
},
"extensions": [
"./index.ts"
]
}
}

View File

@@ -0,0 +1,16 @@
export {
createFixedWindowRateLimiter,
createWebhookInFlightLimiter,
normalizeWebhookPath,
readJsonWebhookBodyOrReject,
resolveRequestClientIp,
resolveWebhookTargetWithAuthOrRejectSync,
withResolvedWebhookRequestPipeline,
WEBHOOK_IN_FLIGHT_DEFAULTS,
WEBHOOK_RATE_LIMIT_DEFAULTS,
type WebhookInFlightLimiter,
} from "openclaw/plugin-sdk/webhook-ingress";
export {
resolveConfiguredSecretInputString,
type OpenClawConfig,
} from "openclaw/plugin-sdk/config-runtime";

View File

@@ -0,0 +1,86 @@
import { describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../runtime-api.js";
import { resolveWebhooksPluginConfig } from "./config.js";
describe("resolveWebhooksPluginConfig", () => {
it("resolves default paths and SecretRef-backed secrets", async () => {
const routes = await resolveWebhooksPluginConfig({
pluginConfig: {
routes: {
zapier: {
sessionKey: "agent:main:main",
secret: {
source: "env",
provider: "default",
id: "OPENCLAW_WEBHOOK_SECRET",
},
},
},
},
cfg: {} as OpenClawConfig,
env: {
OPENCLAW_WEBHOOK_SECRET: "shared-secret",
},
});
expect(routes).toEqual([
{
routeId: "zapier",
path: "/plugins/webhooks/zapier",
sessionKey: "agent:main:main",
secret: "shared-secret",
controllerId: "webhooks/zapier",
},
]);
});
it("skips routes whose secret cannot be resolved", async () => {
const warn = vi.fn();
const routes = await resolveWebhooksPluginConfig({
pluginConfig: {
routes: {
missing: {
sessionKey: "agent:main:main",
secret: {
source: "env",
provider: "default",
id: "MISSING_SECRET",
},
},
},
},
cfg: {} as OpenClawConfig,
env: {},
logger: { warn } as never,
});
expect(routes).toEqual([]);
expect(warn).toHaveBeenCalledWith(
expect.stringContaining("[webhooks] skipping route missing:"),
);
});
it("rejects duplicate normalized paths", async () => {
await expect(
resolveWebhooksPluginConfig({
pluginConfig: {
routes: {
first: {
path: "/plugins/webhooks/shared",
sessionKey: "agent:main:main",
secret: "a",
},
second: {
path: "/plugins/webhooks/shared/",
sessionKey: "agent:main:other",
secret: "b",
},
},
},
cfg: {} as OpenClawConfig,
env: {},
}),
).rejects.toThrow(/conflicts with routes\.first\.path/i);
});
});

View File

@@ -0,0 +1,95 @@
import { z } from "zod";
import type { PluginLogger } from "../api.js";
import {
normalizeWebhookPath,
resolveConfiguredSecretInputString,
type OpenClawConfig,
} from "../runtime-api.js";
const secretRefSchema = z
.object({
source: z.enum(["env", "file", "exec"]),
provider: z.string().trim().min(1),
id: z.string().trim().min(1),
})
.strict();
const secretInputSchema = z.union([z.string().trim().min(1), secretRefSchema]);
const webhookRouteConfigSchema = z
.object({
enabled: z.boolean().optional().default(true),
path: z.string().trim().min(1).optional(),
sessionKey: z.string().trim().min(1),
secret: secretInputSchema,
controllerId: z.string().trim().min(1).optional(),
description: z.string().trim().min(1).optional(),
})
.strict();
const webhooksPluginConfigSchema = z
.object({
routes: z.record(z.string().trim().min(1), webhookRouteConfigSchema).default({}),
})
.strict();
export type ResolvedWebhookRouteConfig = {
routeId: string;
path: string;
sessionKey: string;
secret: string;
controllerId: string;
description?: string;
};
export async function resolveWebhooksPluginConfig(params: {
pluginConfig: unknown;
cfg: OpenClawConfig;
env: NodeJS.ProcessEnv;
logger?: PluginLogger;
}): Promise<ResolvedWebhookRouteConfig[]> {
const parsed = webhooksPluginConfigSchema.parse(params.pluginConfig ?? {});
const resolvedRoutes: ResolvedWebhookRouteConfig[] = [];
const seenPaths = new Map<string, string>();
for (const [routeId, route] of Object.entries(parsed.routes)) {
if (route.enabled === false) {
continue;
}
const path = normalizeWebhookPath(route.path ?? `/plugins/webhooks/${routeId}`);
const existingRouteId = seenPaths.get(path);
if (existingRouteId) {
throw new Error(
`webhooks.routes.${routeId}.path conflicts with routes.${existingRouteId}.path (${path}).`,
);
}
const secretResolution = await resolveConfiguredSecretInputString({
config: params.cfg,
env: params.env,
value: route.secret,
path: `plugins.entries.webhooks.routes.${routeId}.secret`,
});
const secret = secretResolution.value?.trim();
if (!secret) {
params.logger?.warn?.(
`[webhooks] skipping route ${routeId}: ${
secretResolution.unresolvedRefReason ?? "secret is empty or unresolved"
}`,
);
continue;
}
seenPaths.set(path, routeId);
resolvedRoutes.push({
routeId,
path,
sessionKey: route.sessionKey,
secret,
controllerId: route.controllerId ?? `webhooks/${routeId}`,
...(route.description ? { description: route.description } : {}),
});
}
return resolvedRoutes;
}

View File

@@ -0,0 +1,375 @@
import { EventEmitter } from "node:events";
import type { IncomingMessage } from "node:http";
import { afterEach, describe, expect, it, vi } from "vitest";
import { createRuntimeTaskFlow } from "../../../src/plugins/runtime/runtime-taskflow.js";
import { createMockServerResponse } from "../../../src/test-utils/mock-http-response.js";
import type { OpenClawConfig } from "../runtime-api.js";
import { createTaskFlowWebhookRequestHandler, type TaskFlowWebhookTarget } from "./http.js";
const hoisted = vi.hoisted(() => {
const sendMessageMock = vi.fn();
const cancelSessionMock = vi.fn();
const killSubagentRunAdminMock = vi.fn();
return {
sendMessageMock,
cancelSessionMock,
killSubagentRunAdminMock,
};
});
vi.mock("../../../src/tasks/task-registry-delivery-runtime.js", () => ({
sendMessage: hoisted.sendMessageMock,
}));
vi.mock("../../../src/acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
cancelSession: hoisted.cancelSessionMock,
}),
}));
vi.mock("../../../src/agents/subagent-control.js", () => ({
killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params),
}));
type MockIncomingMessage = IncomingMessage & {
destroyed?: boolean;
destroy: () => MockIncomingMessage;
socket: { remoteAddress: string };
};
let nextSessionId = 0;
function createJsonRequest(params: {
path: string;
secret?: string;
body: unknown;
}): MockIncomingMessage {
const req = new EventEmitter() as MockIncomingMessage;
req.method = "POST";
req.url = params.path;
req.headers = {
"content-type": "application/json",
...(params.secret ? { "x-openclaw-webhook-secret": params.secret } : {}),
};
req.socket = { remoteAddress: "127.0.0.1" } as MockIncomingMessage["socket"];
req.destroyed = false;
req.destroy = (() => {
req.destroyed = true;
return req;
}) as MockIncomingMessage["destroy"];
void Promise.resolve().then(() => {
req.emit("data", Buffer.from(JSON.stringify(params.body), "utf8"));
req.emit("end");
});
return req;
}
function createHandler(): {
handler: ReturnType<typeof createTaskFlowWebhookRequestHandler>;
target: TaskFlowWebhookTarget;
} {
const runtime = createRuntimeTaskFlow();
nextSessionId += 1;
const target: TaskFlowWebhookTarget = {
routeId: "zapier",
path: "/plugins/webhooks/zapier",
secret: "shared-secret",
defaultControllerId: "webhooks/zapier",
taskFlow: runtime.bindSession({
sessionKey: `agent:main:webhook-test-${String(nextSessionId)}`,
}),
};
const targetsByPath = new Map<string, TaskFlowWebhookTarget[]>([[target.path, [target]]]);
return {
handler: createTaskFlowWebhookRequestHandler({
cfg: {} as OpenClawConfig,
targetsByPath,
}),
target,
};
}
async function dispatchJsonRequest(params: {
handler: ReturnType<typeof createTaskFlowWebhookRequestHandler>;
path: string;
secret?: string;
body: unknown;
}) {
const req = createJsonRequest({
path: params.path,
secret: params.secret,
body: params.body,
});
const res = createMockServerResponse();
await params.handler(req, res);
return res;
}
function parseJsonBody(res: { body?: string | Buffer | null }) {
return JSON.parse(String(res.body ?? ""));
}
afterEach(() => {
vi.clearAllMocks();
});
describe("createTaskFlowWebhookRequestHandler", () => {
it("rejects requests with the wrong secret", async () => {
const { handler, target } = createHandler();
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: "wrong-secret",
body: {
action: "list_flows",
},
});
expect(res.statusCode).toBe(401);
expect(res.body).toBe("unauthorized");
expect(target.taskFlow.list()).toEqual([]);
});
it("creates flows through the bound session and scrubs owner metadata from responses", async () => {
const { handler, target } = createHandler();
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "create_flow",
goal: "Review inbound queue",
},
});
expect(res.statusCode).toBe(200);
const parsed = parseJsonBody(res);
expect(parsed.ok).toBe(true);
expect(parsed.result.flow).toMatchObject({
syncMode: "managed",
controllerId: "webhooks/zapier",
goal: "Review inbound queue",
});
expect(parsed.result.flow.ownerKey).toBeUndefined();
expect(parsed.result.flow.requesterOrigin).toBeUndefined();
expect(target.taskFlow.get(parsed.result.flow.flowId)?.flowId).toBe(parsed.result.flow.flowId);
});
it("runs child tasks and scrubs task ownership fields from responses", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Triage inbox",
});
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
task: "Inspect the next message batch",
status: "running",
startedAt: 10,
lastEventAt: 10,
},
});
expect(res.statusCode).toBe(200);
const parsed = parseJsonBody(res);
expect(parsed.ok).toBe(true);
expect(parsed.result.created).toBe(true);
expect(parsed.result.task).toMatchObject({
parentFlowId: flow.flowId,
childSessionKey: "agent:main:subagent:child",
runtime: "acp",
});
expect(parsed.result.task.ownerKey).toBeUndefined();
expect(parsed.result.task.requesterSessionKey).toBeUndefined();
});
it("returns 404 for missing flow mutations", async () => {
const { handler, target } = createHandler();
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "set_waiting",
flowId: "flow-missing",
expectedRevision: 0,
},
});
expect(res.statusCode).toBe(404);
const parsed = parseJsonBody(res);
expect(parsed).toMatchObject({
ok: false,
code: "not_found",
error: "TaskFlow not found.",
result: {
applied: false,
code: "not_found",
},
});
});
it("returns 409 for revision conflicts", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Review inbox",
});
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "set_waiting",
flowId: flow.flowId,
expectedRevision: flow.revision + 1,
},
});
expect(res.statusCode).toBe(409);
const parsed = parseJsonBody(res);
expect(parsed).toMatchObject({
ok: false,
code: "revision_conflict",
result: {
applied: false,
code: "revision_conflict",
current: {
flowId: flow.flowId,
revision: flow.revision,
},
},
});
});
it("rejects internal runtimes and running-only metadata from external callers", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Review inbox",
});
const runtimeRes = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "cli",
task: "Inspect queue",
},
});
expect(runtimeRes.statusCode).toBe(400);
expect(parseJsonBody(runtimeRes)).toMatchObject({
ok: false,
code: "invalid_request",
});
const queuedMetadataRes = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "acp",
task: "Inspect queue",
startedAt: 10,
},
});
expect(queuedMetadataRes.statusCode).toBe(400);
expect(parseJsonBody(queuedMetadataRes)).toMatchObject({
ok: false,
code: "invalid_request",
error:
"status: status must be running when startedAt, lastEventAt, or progressSummary is provided",
});
});
it("reuses the same task record when retried with the same runId", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Triage inbox",
});
const first = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "retry-me",
task: "Inspect the next message batch",
},
});
const second = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "run_task",
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:main:subagent:child",
runId: "retry-me",
task: "Inspect the next message batch",
},
});
expect(first.statusCode).toBe(200);
expect(second.statusCode).toBe(200);
const firstParsed = parseJsonBody(first);
const secondParsed = parseJsonBody(second);
expect(firstParsed.result.task.taskId).toBe(secondParsed.result.task.taskId);
expect(target.taskFlow.getTaskSummary(flow.flowId)?.total).toBe(1);
});
it("returns 409 when cancellation targets a terminal flow", async () => {
const { handler, target } = createHandler();
const flow = target.taskFlow.createManaged({
controllerId: "webhooks/zapier",
goal: "Review inbox",
});
const finished = target.taskFlow.finish({
flowId: flow.flowId,
expectedRevision: flow.revision,
});
expect(finished.applied).toBe(true);
const res = await dispatchJsonRequest({
handler,
path: target.path,
secret: target.secret,
body: {
action: "cancel_flow",
flowId: flow.flowId,
},
});
expect(res.statusCode).toBe(409);
expect(parseJsonBody(res)).toMatchObject({
ok: false,
code: "terminal",
error: "Flow is already succeeded.",
result: {
found: true,
cancelled: false,
reason: "Flow is already succeeded.",
},
});
});
});

View File

@@ -0,0 +1,795 @@
import type { IncomingMessage, ServerResponse } from "node:http";
import { safeEqualSecret } from "openclaw/plugin-sdk/browser-security-runtime";
import { z } from "zod";
import type { PluginRuntime } from "../api.js";
import {
createFixedWindowRateLimiter,
createWebhookInFlightLimiter,
readJsonWebhookBodyOrReject,
resolveRequestClientIp,
resolveWebhookTargetWithAuthOrRejectSync,
withResolvedWebhookRequestPipeline,
WEBHOOK_IN_FLIGHT_DEFAULTS,
WEBHOOK_RATE_LIMIT_DEFAULTS,
type OpenClawConfig,
type WebhookInFlightLimiter,
} from "../runtime-api.js";
type BoundTaskFlowRuntime = ReturnType<PluginRuntime["taskFlow"]["bindSession"]>;
type JsonValue = null | boolean | number | string | JsonValue[] | { [key: string]: JsonValue };
const jsonValueSchema: z.ZodType<JsonValue> = z.lazy(() =>
z.union([
z.null(),
z.boolean(),
z.number().finite(),
z.string(),
z.array(jsonValueSchema),
z.record(z.string(), jsonValueSchema),
]),
);
const nullableStringSchema = z.string().trim().min(1).nullable().optional();
const createFlowRequestSchema = z
.object({
action: z.literal("create_flow"),
controllerId: z.string().trim().min(1).optional(),
goal: z.string().trim().min(1),
status: z.enum(["queued", "running", "waiting", "blocked"]).optional(),
notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(),
currentStep: nullableStringSchema,
stateJson: jsonValueSchema.nullable().optional(),
waitJson: jsonValueSchema.nullable().optional(),
})
.strict();
const getFlowRequestSchema = z
.object({ action: z.literal("get_flow"), flowId: z.string().trim().min(1) })
.strict();
const listFlowsRequestSchema = z.object({ action: z.literal("list_flows") }).strict();
const findLatestFlowRequestSchema = z.object({ action: z.literal("find_latest_flow") }).strict();
const resolveFlowRequestSchema = z
.object({ action: z.literal("resolve_flow"), token: z.string().trim().min(1) })
.strict();
const getTaskSummaryRequestSchema = z
.object({ action: z.literal("get_task_summary"), flowId: z.string().trim().min(1) })
.strict();
const setWaitingRequestSchema = z
.object({
action: z.literal("set_waiting"),
flowId: z.string().trim().min(1),
expectedRevision: z.number().int().nonnegative(),
currentStep: nullableStringSchema,
stateJson: jsonValueSchema.nullable().optional(),
waitJson: jsonValueSchema.nullable().optional(),
blockedTaskId: nullableStringSchema,
blockedSummary: nullableStringSchema,
})
.strict();
const resumeFlowRequestSchema = z
.object({
action: z.literal("resume_flow"),
flowId: z.string().trim().min(1),
expectedRevision: z.number().int().nonnegative(),
status: z.enum(["queued", "running"]).optional(),
currentStep: nullableStringSchema,
stateJson: jsonValueSchema.nullable().optional(),
})
.strict();
const finishFlowRequestSchema = z
.object({
action: z.literal("finish_flow"),
flowId: z.string().trim().min(1),
expectedRevision: z.number().int().nonnegative(),
stateJson: jsonValueSchema.nullable().optional(),
})
.strict();
const failFlowRequestSchema = z
.object({
action: z.literal("fail_flow"),
flowId: z.string().trim().min(1),
expectedRevision: z.number().int().nonnegative(),
stateJson: jsonValueSchema.nullable().optional(),
blockedTaskId: nullableStringSchema,
blockedSummary: nullableStringSchema,
})
.strict();
const requestCancelRequestSchema = z
.object({
action: z.literal("request_cancel"),
flowId: z.string().trim().min(1),
expectedRevision: z.number().int().nonnegative(),
})
.strict();
const cancelFlowRequestSchema = z
.object({
action: z.literal("cancel_flow"),
flowId: z.string().trim().min(1),
})
.strict();
const runTaskRequestSchema = z
.object({
action: z.literal("run_task"),
flowId: z.string().trim().min(1),
runtime: z.enum(["subagent", "acp"]),
sourceId: z.string().trim().min(1).optional(),
childSessionKey: z.string().trim().min(1).optional(),
parentTaskId: z.string().trim().min(1).optional(),
agentId: z.string().trim().min(1).optional(),
runId: z.string().trim().min(1).optional(),
label: z.string().trim().min(1).optional(),
task: z.string().trim().min(1),
preferMetadata: z.boolean().optional(),
notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(),
status: z.enum(["queued", "running"]).optional(),
startedAt: z.number().int().nonnegative().optional(),
lastEventAt: z.number().int().nonnegative().optional(),
progressSummary: nullableStringSchema,
})
.strict()
.superRefine((value, ctx) => {
if (
value.status !== "running" &&
(value.startedAt !== undefined ||
value.lastEventAt !== undefined ||
value.progressSummary !== undefined)
) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message:
"status must be running when startedAt, lastEventAt, or progressSummary is provided",
path: ["status"],
});
}
});
const webhookActionSchema = z.discriminatedUnion("action", [
createFlowRequestSchema,
getFlowRequestSchema,
listFlowsRequestSchema,
findLatestFlowRequestSchema,
resolveFlowRequestSchema,
getTaskSummaryRequestSchema,
setWaitingRequestSchema,
resumeFlowRequestSchema,
finishFlowRequestSchema,
failFlowRequestSchema,
requestCancelRequestSchema,
cancelFlowRequestSchema,
runTaskRequestSchema,
]);
type WebhookAction = z.infer<typeof webhookActionSchema>;
export type TaskFlowWebhookTarget = {
routeId: string;
path: string;
secret: string;
defaultControllerId: string;
taskFlow: BoundTaskFlowRuntime;
};
type FlowView = {
flowId: string;
syncMode: "task_mirrored" | "managed";
controllerId?: string;
revision: number;
status: string;
notifyPolicy: string;
goal: string;
currentStep?: string;
blockedTaskId?: string;
blockedSummary?: string;
stateJson?: JsonValue;
waitJson?: JsonValue;
cancelRequestedAt?: number;
createdAt: number;
updatedAt: number;
endedAt?: number;
};
type TaskView = {
taskId: string;
runtime: string;
sourceId?: string;
scopeKind: string;
childSessionKey?: string;
parentFlowId?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
status: string;
deliveryStatus: string;
notifyPolicy: string;
createdAt: number;
startedAt?: number;
endedAt?: number;
lastEventAt?: number;
cleanupAfter?: number;
error?: string;
progressSummary?: string;
terminalSummary?: string;
terminalOutcome?: string;
};
function toFlowView(flow: {
flowId: string;
syncMode: "task_mirrored" | "managed";
controllerId?: string;
revision: number;
status: string;
notifyPolicy: string;
goal: string;
currentStep?: string;
blockedTaskId?: string;
blockedSummary?: string;
stateJson?: JsonValue;
waitJson?: JsonValue;
cancelRequestedAt?: number;
createdAt: number;
updatedAt: number;
endedAt?: number;
}): FlowView {
return {
flowId: flow.flowId,
syncMode: flow.syncMode,
...(flow.controllerId ? { controllerId: flow.controllerId } : {}),
revision: flow.revision,
status: flow.status,
notifyPolicy: flow.notifyPolicy,
goal: flow.goal,
...(flow.currentStep ? { currentStep: flow.currentStep } : {}),
...(flow.blockedTaskId ? { blockedTaskId: flow.blockedTaskId } : {}),
...(flow.blockedSummary ? { blockedSummary: flow.blockedSummary } : {}),
...(flow.stateJson !== undefined ? { stateJson: flow.stateJson } : {}),
...(flow.waitJson !== undefined ? { waitJson: flow.waitJson } : {}),
...(flow.cancelRequestedAt !== undefined ? { cancelRequestedAt: flow.cancelRequestedAt } : {}),
createdAt: flow.createdAt,
updatedAt: flow.updatedAt,
...(flow.endedAt !== undefined ? { endedAt: flow.endedAt } : {}),
};
}
function toTaskView(task: {
taskId: string;
runtime: string;
sourceId?: string;
scopeKind: string;
childSessionKey?: string;
parentFlowId?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
status: string;
deliveryStatus: string;
notifyPolicy: string;
createdAt: number;
startedAt?: number;
endedAt?: number;
lastEventAt?: number;
cleanupAfter?: number;
error?: string;
progressSummary?: string;
terminalSummary?: string;
terminalOutcome?: string;
}): TaskView {
return {
taskId: task.taskId,
runtime: task.runtime,
...(task.sourceId ? { sourceId: task.sourceId } : {}),
scopeKind: task.scopeKind,
...(task.childSessionKey ? { childSessionKey: task.childSessionKey } : {}),
...(task.parentFlowId ? { parentFlowId: task.parentFlowId } : {}),
...(task.parentTaskId ? { parentTaskId: task.parentTaskId } : {}),
...(task.agentId ? { agentId: task.agentId } : {}),
...(task.runId ? { runId: task.runId } : {}),
...(task.label ? { label: task.label } : {}),
task: task.task,
status: task.status,
deliveryStatus: task.deliveryStatus,
notifyPolicy: task.notifyPolicy,
createdAt: task.createdAt,
...(task.startedAt !== undefined ? { startedAt: task.startedAt } : {}),
...(task.endedAt !== undefined ? { endedAt: task.endedAt } : {}),
...(task.lastEventAt !== undefined ? { lastEventAt: task.lastEventAt } : {}),
...(task.cleanupAfter !== undefined ? { cleanupAfter: task.cleanupAfter } : {}),
...(task.error ? { error: task.error } : {}),
...(task.progressSummary ? { progressSummary: task.progressSummary } : {}),
...(task.terminalSummary ? { terminalSummary: task.terminalSummary } : {}),
...(task.terminalOutcome ? { terminalOutcome: task.terminalOutcome } : {}),
};
}
function writeJson(res: ServerResponse, statusCode: number, body: unknown): void {
res.statusCode = statusCode;
res.setHeader("Content-Type", "application/json; charset=utf-8");
res.end(JSON.stringify(body));
}
function extractSharedSecret(req: IncomingMessage): string {
const authHeader = Array.isArray(req.headers.authorization)
? String(req.headers.authorization[0] ?? "")
: String(req.headers.authorization ?? "");
if (authHeader.toLowerCase().startsWith("bearer ")) {
return authHeader.slice("bearer ".length).trim();
}
const sharedHeader = req.headers["x-openclaw-webhook-secret"];
return Array.isArray(sharedHeader)
? String(sharedHeader[0] ?? "").trim()
: String(sharedHeader ?? "").trim();
}
function timingSafeEquals(left: string, right: string): boolean {
// Reuse the shared helper so webhook auth semantics stay aligned across plugins.
return safeEqualSecret(left, right);
}
function formatZodError(error: z.ZodError): string {
const firstIssue = error.issues[0];
if (!firstIssue) {
return "invalid request";
}
const path = firstIssue.path.length > 0 ? `${firstIssue.path.join(".")}: ` : "";
return `${path}${firstIssue.message}`;
}
function mapMutationResult(
result:
| {
applied: true;
flow: FlowView;
}
| {
applied: false;
code: string;
current?: FlowView;
},
): unknown {
return result;
}
function mapMutationStatus(result: {
applied: boolean;
code?: "not_found" | "not_managed" | "revision_conflict";
}): { statusCode: number; code?: string; error?: string } {
if (result.applied) {
return { statusCode: 200 };
}
switch (result.code) {
case "not_found":
return {
statusCode: 404,
code: "not_found",
error: "TaskFlow not found.",
};
case "not_managed":
return {
statusCode: 409,
code: "not_managed",
error: "TaskFlow is not managed by this webhook surface.",
};
case "revision_conflict":
return {
statusCode: 409,
code: "revision_conflict",
error: "TaskFlow changed since the caller's expected revision.",
};
default:
return {
statusCode: 409,
code: "mutation_rejected",
error: "TaskFlow mutation was rejected.",
};
}
}
function mapRunTaskStatus(result: { created: boolean; found: boolean; reason?: string }): {
statusCode: number;
code?: string;
error?: string;
} {
if (result.created) {
return { statusCode: 200 };
}
if (!result.found) {
return {
statusCode: 404,
code: "not_found",
error: "TaskFlow not found.",
};
}
if (result.reason === "Flow cancellation has already been requested.") {
return {
statusCode: 409,
code: "cancel_requested",
error: result.reason,
};
}
if (result.reason === "Flow does not accept managed child tasks.") {
return {
statusCode: 409,
code: "not_managed",
error: result.reason,
};
}
if (result.reason?.startsWith("Flow is already ")) {
return {
statusCode: 409,
code: "terminal",
error: result.reason,
};
}
return {
statusCode: 409,
code: "task_not_created",
error: result.reason ?? "TaskFlow task was not created.",
};
}
function mapCancelStatus(result: { found: boolean; cancelled: boolean; reason?: string }): {
statusCode: number;
code?: string;
error?: string;
} {
if (result.cancelled) {
return { statusCode: 200 };
}
if (!result.found) {
return {
statusCode: 404,
code: "not_found",
error: "TaskFlow not found.",
};
}
if (result.reason === "One or more child tasks are still active.") {
return {
statusCode: 202,
code: "cancel_pending",
error: result.reason,
};
}
if (result.reason === "Flow changed while cancellation was in progress.") {
return {
statusCode: 409,
code: "revision_conflict",
error: result.reason,
};
}
if (result.reason?.startsWith("Flow is already ")) {
return {
statusCode: 409,
code: "terminal",
error: result.reason,
};
}
return {
statusCode: 409,
code: "cancel_rejected",
error: result.reason ?? "TaskFlow cancellation was rejected.",
};
}
function describeWebhookOutcome(params: { action: WebhookAction; result: unknown }): {
statusCode: number;
code?: string;
error?: string;
} {
switch (params.action.action) {
case "set_waiting":
case "resume_flow":
case "finish_flow":
case "fail_flow":
case "request_cancel":
return mapMutationStatus(
params.result as {
applied: boolean;
code?: "not_found" | "not_managed" | "revision_conflict";
},
);
case "cancel_flow":
return mapCancelStatus(
params.result as {
found: boolean;
cancelled: boolean;
reason?: string;
},
);
case "run_task":
return mapRunTaskStatus(
params.result as {
created: boolean;
found: boolean;
reason?: string;
},
);
default:
return { statusCode: 200 };
}
}
async function executeWebhookAction(params: {
action: WebhookAction;
target: TaskFlowWebhookTarget;
cfg: OpenClawConfig;
}): Promise<unknown> {
const { action, target } = params;
switch (action.action) {
case "create_flow": {
const flow = target.taskFlow.createManaged({
controllerId: action.controllerId ?? target.defaultControllerId,
goal: action.goal,
status: action.status,
notifyPolicy: action.notifyPolicy,
currentStep: action.currentStep ?? undefined,
stateJson: action.stateJson,
waitJson: action.waitJson,
});
return { flow: toFlowView(flow) };
}
case "get_flow": {
const flow = target.taskFlow.get(action.flowId);
return { flow: flow ? toFlowView(flow) : null };
}
case "list_flows":
return { flows: target.taskFlow.list().map(toFlowView) };
case "find_latest_flow": {
const flow = target.taskFlow.findLatest();
return { flow: flow ? toFlowView(flow) : null };
}
case "resolve_flow": {
const flow = target.taskFlow.resolve(action.token);
return { flow: flow ? toFlowView(flow) : null };
}
case "get_task_summary":
return { summary: target.taskFlow.getTaskSummary(action.flowId) ?? null };
case "set_waiting": {
const result = target.taskFlow.setWaiting({
flowId: action.flowId,
expectedRevision: action.expectedRevision,
currentStep: action.currentStep,
stateJson: action.stateJson,
waitJson: action.waitJson,
blockedTaskId: action.blockedTaskId,
blockedSummary: action.blockedSummary,
});
return mapMutationResult(
result.applied
? { applied: true, flow: toFlowView(result.flow) }
: {
applied: false,
code: result.code,
...(result.current ? { current: toFlowView(result.current) } : {}),
},
);
}
case "resume_flow": {
const result = target.taskFlow.resume({
flowId: action.flowId,
expectedRevision: action.expectedRevision,
status: action.status,
currentStep: action.currentStep,
stateJson: action.stateJson,
});
return mapMutationResult(
result.applied
? { applied: true, flow: toFlowView(result.flow) }
: {
applied: false,
code: result.code,
...(result.current ? { current: toFlowView(result.current) } : {}),
},
);
}
case "finish_flow": {
const result = target.taskFlow.finish({
flowId: action.flowId,
expectedRevision: action.expectedRevision,
stateJson: action.stateJson,
});
return mapMutationResult(
result.applied
? { applied: true, flow: toFlowView(result.flow) }
: {
applied: false,
code: result.code,
...(result.current ? { current: toFlowView(result.current) } : {}),
},
);
}
case "fail_flow": {
const result = target.taskFlow.fail({
flowId: action.flowId,
expectedRevision: action.expectedRevision,
stateJson: action.stateJson,
blockedTaskId: action.blockedTaskId,
blockedSummary: action.blockedSummary,
});
return mapMutationResult(
result.applied
? { applied: true, flow: toFlowView(result.flow) }
: {
applied: false,
code: result.code,
...(result.current ? { current: toFlowView(result.current) } : {}),
},
);
}
case "request_cancel": {
const result = target.taskFlow.requestCancel({
flowId: action.flowId,
expectedRevision: action.expectedRevision,
});
return mapMutationResult(
result.applied
? { applied: true, flow: toFlowView(result.flow) }
: {
applied: false,
code: result.code,
...(result.current ? { current: toFlowView(result.current) } : {}),
},
);
}
case "cancel_flow": {
const result = await target.taskFlow.cancel({
flowId: action.flowId,
cfg: params.cfg,
});
return {
found: result.found,
cancelled: result.cancelled,
...(result.reason ? { reason: result.reason } : {}),
...(result.flow ? { flow: toFlowView(result.flow) } : {}),
...(result.tasks ? { tasks: result.tasks.map(toTaskView) } : {}),
};
}
case "run_task": {
const result = target.taskFlow.runTask({
flowId: action.flowId,
runtime: action.runtime,
sourceId: action.sourceId,
childSessionKey: action.childSessionKey,
parentTaskId: action.parentTaskId,
agentId: action.agentId,
runId: action.runId,
label: action.label,
task: action.task,
preferMetadata: action.preferMetadata,
notifyPolicy: action.notifyPolicy,
status: action.status,
startedAt: action.startedAt,
lastEventAt: action.lastEventAt,
progressSummary: action.progressSummary,
});
if (result.created) {
return {
created: true,
flow: toFlowView(result.flow),
task: toTaskView(result.task),
};
}
return {
found: result.found,
created: false,
reason: result.reason,
...(result.flow ? { flow: toFlowView(result.flow) } : {}),
};
}
}
}
export function createTaskFlowWebhookRequestHandler(params: {
cfg: OpenClawConfig;
targetsByPath: Map<string, TaskFlowWebhookTarget[]>;
inFlightLimiter?: WebhookInFlightLimiter;
}): (req: IncomingMessage, res: ServerResponse) => Promise<boolean> {
const rateLimiter = createFixedWindowRateLimiter({
windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs,
maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests,
maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys,
});
const inFlightLimiter =
params.inFlightLimiter ??
createWebhookInFlightLimiter({
maxInFlightPerKey: WEBHOOK_IN_FLIGHT_DEFAULTS.maxInFlightPerKey,
maxTrackedKeys: WEBHOOK_IN_FLIGHT_DEFAULTS.maxTrackedKeys,
});
return async (req: IncomingMessage, res: ServerResponse): Promise<boolean> => {
return await withResolvedWebhookRequestPipeline({
req,
res,
targetsByPath: params.targetsByPath,
allowMethods: ["POST"],
requireJsonContentType: true,
rateLimiter,
rateLimitKey: (() => {
const clientIp =
resolveRequestClientIp(
req,
params.cfg.gateway?.trustedProxies,
params.cfg.gateway?.allowRealIpFallback === true,
) ??
req.socket.remoteAddress ??
"unknown";
return `${new URL(req.url ?? "/", "http://localhost").pathname}:${clientIp}`;
})(),
inFlightLimiter,
handle: async ({ targets }) => {
const presentedSecret = extractSharedSecret(req);
const target = resolveWebhookTargetWithAuthOrRejectSync({
targets,
res,
isMatch: (candidate) =>
presentedSecret.length > 0 && timingSafeEquals(candidate.secret, presentedSecret),
});
if (!target) {
return true;
}
const body = await readJsonWebhookBodyOrReject({
req,
res,
maxBytes: 256 * 1024,
timeoutMs: 15_000,
emptyObjectOnEmpty: false,
invalidJsonMessage: "invalid request body",
});
if (!body.ok) {
return true;
}
const parsed = webhookActionSchema.safeParse(body.value);
if (!parsed.success) {
writeJson(res, 400, {
ok: false,
code: "invalid_request",
error: formatZodError(parsed.error),
});
return true;
}
const result = await executeWebhookAction({
action: parsed.data,
target,
cfg: params.cfg,
});
const outcome = describeWebhookOutcome({
action: parsed.data,
result,
});
writeJson(
res,
outcome.statusCode,
outcome.statusCode < 400
? {
ok: true,
routeId: target.routeId,
...(outcome.code ? { code: outcome.code } : {}),
result,
}
: {
ok: false,
routeId: target.routeId,
code: outcome.code ?? "request_rejected",
error: outcome.error ?? "request rejected",
result,
},
);
return true;
},
});
};
}

6
pnpm-lock.yaml generated
View File

@@ -739,6 +739,12 @@ importers:
extensions/vydra: {}
extensions/webhooks:
dependencies:
zod:
specifier: ^4.3.6
version: 4.3.6
extensions/whatsapp:
dependencies:
'@whiskeysockets/baileys':

View File

@@ -31,6 +31,7 @@
"dist/extensions/tlon/runtime-api.js",
"dist/extensions/twitch/runtime-api.js",
"dist/extensions/voice-call/runtime-api.js",
"dist/extensions/webhooks/runtime-api.js",
"dist/extensions/whatsapp/light-runtime-api.js",
"dist/extensions/whatsapp/runtime-api.js",
"dist/extensions/zai/runtime-api.js",