import type { IncomingMessage, ServerResponse } from "node:http"; import { safeEqualSecret } from "openclaw/plugin-sdk/browser-security-runtime"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; import { z } from "zod"; import type { PluginRuntime } from "../api.js"; import { createFixedWindowRateLimiter, createWebhookInFlightLimiter, readJsonWebhookBodyOrReject, resolveRequestClientIp, resolveWebhookTargetWithAuthOrRejectSync, withResolvedWebhookRequestPipeline, WEBHOOK_IN_FLIGHT_DEFAULTS, WEBHOOK_RATE_LIMIT_DEFAULTS, type OpenClawConfig, type WebhookInFlightLimiter, } from "../runtime-api.js"; type BoundTaskFlowRuntime = ReturnType; type JsonValue = null | boolean | number | string | JsonValue[] | { [key: string]: JsonValue }; const jsonValueSchema: z.ZodType = z.lazy(() => z.union([ z.null(), z.boolean(), z.number().finite(), z.string(), z.array(jsonValueSchema), z.record(z.string(), jsonValueSchema), ]), ); const nullableStringSchema = z.string().trim().min(1).nullable().optional(); const createFlowRequestSchema = z .object({ action: z.literal("create_flow"), controllerId: z.string().trim().min(1).optional(), goal: z.string().trim().min(1), status: z.enum(["queued", "running", "waiting", "blocked"]).optional(), notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(), currentStep: nullableStringSchema, stateJson: jsonValueSchema.nullable().optional(), waitJson: jsonValueSchema.nullable().optional(), }) .strict(); const getFlowRequestSchema = z .object({ action: z.literal("get_flow"), flowId: z.string().trim().min(1) }) .strict(); const listFlowsRequestSchema = z.object({ action: z.literal("list_flows") }).strict(); const findLatestFlowRequestSchema = z.object({ action: z.literal("find_latest_flow") }).strict(); const resolveFlowRequestSchema = z .object({ action: z.literal("resolve_flow"), token: z.string().trim().min(1) }) .strict(); const getTaskSummaryRequestSchema = z .object({ action: z.literal("get_task_summary"), flowId: z.string().trim().min(1) }) .strict(); const setWaitingRequestSchema = z .object({ action: z.literal("set_waiting"), flowId: z.string().trim().min(1), expectedRevision: z.number().int().nonnegative(), currentStep: nullableStringSchema, stateJson: jsonValueSchema.nullable().optional(), waitJson: jsonValueSchema.nullable().optional(), blockedTaskId: nullableStringSchema, blockedSummary: nullableStringSchema, }) .strict(); const resumeFlowRequestSchema = z .object({ action: z.literal("resume_flow"), flowId: z.string().trim().min(1), expectedRevision: z.number().int().nonnegative(), status: z.enum(["queued", "running"]).optional(), currentStep: nullableStringSchema, stateJson: jsonValueSchema.nullable().optional(), }) .strict(); const finishFlowRequestSchema = z .object({ action: z.literal("finish_flow"), flowId: z.string().trim().min(1), expectedRevision: z.number().int().nonnegative(), stateJson: jsonValueSchema.nullable().optional(), }) .strict(); const failFlowRequestSchema = z .object({ action: z.literal("fail_flow"), flowId: z.string().trim().min(1), expectedRevision: z.number().int().nonnegative(), stateJson: jsonValueSchema.nullable().optional(), blockedTaskId: nullableStringSchema, blockedSummary: nullableStringSchema, }) .strict(); const requestCancelRequestSchema = z .object({ action: z.literal("request_cancel"), flowId: z.string().trim().min(1), expectedRevision: z.number().int().nonnegative(), }) .strict(); const cancelFlowRequestSchema = z .object({ action: z.literal("cancel_flow"), flowId: z.string().trim().min(1), }) .strict(); const runTaskRequestSchema = z .object({ action: z.literal("run_task"), flowId: z.string().trim().min(1), runtime: z.enum(["subagent", "acp"]), sourceId: z.string().trim().min(1).optional(), childSessionKey: z.string().trim().min(1).optional(), parentTaskId: z.string().trim().min(1).optional(), agentId: z.string().trim().min(1).optional(), runId: z.string().trim().min(1).optional(), label: z.string().trim().min(1).optional(), task: z.string().trim().min(1), preferMetadata: z.boolean().optional(), notifyPolicy: z.enum(["done_only", "state_changes", "silent"]).optional(), status: z.enum(["queued", "running"]).optional(), startedAt: z.number().int().nonnegative().optional(), lastEventAt: z.number().int().nonnegative().optional(), progressSummary: nullableStringSchema, }) .strict() .superRefine((value, ctx) => { if ( value.status !== "running" && (value.startedAt !== undefined || value.lastEventAt !== undefined || value.progressSummary !== undefined) ) { ctx.addIssue({ code: z.ZodIssueCode.custom, message: "status must be running when startedAt, lastEventAt, or progressSummary is provided", path: ["status"], }); } }); const webhookActionSchema = z.discriminatedUnion("action", [ createFlowRequestSchema, getFlowRequestSchema, listFlowsRequestSchema, findLatestFlowRequestSchema, resolveFlowRequestSchema, getTaskSummaryRequestSchema, setWaitingRequestSchema, resumeFlowRequestSchema, finishFlowRequestSchema, failFlowRequestSchema, requestCancelRequestSchema, cancelFlowRequestSchema, runTaskRequestSchema, ]); type WebhookAction = z.infer; export type TaskFlowWebhookTarget = { routeId: string; path: string; secret: string; defaultControllerId: string; taskFlow: BoundTaskFlowRuntime; }; type FlowView = { flowId: string; syncMode: "task_mirrored" | "managed"; controllerId?: string; revision: number; status: string; notifyPolicy: string; goal: string; currentStep?: string; blockedTaskId?: string; blockedSummary?: string; stateJson?: JsonValue; waitJson?: JsonValue; cancelRequestedAt?: number; createdAt: number; updatedAt: number; endedAt?: number; }; type TaskView = { taskId: string; runtime: string; sourceId?: string; scopeKind: string; childSessionKey?: string; parentFlowId?: string; parentTaskId?: string; agentId?: string; runId?: string; label?: string; task: string; status: string; deliveryStatus: string; notifyPolicy: string; createdAt: number; startedAt?: number; endedAt?: number; lastEventAt?: number; cleanupAfter?: number; error?: string; progressSummary?: string; terminalSummary?: string; terminalOutcome?: string; }; function pickOptionalFields( source: T, keys: readonly TKey[], ): Partial> { const result: Partial> = {}; for (const key of keys) { const value = source[key]; if (value !== undefined) { result[key] = value; } } return result; } function pickOptionalTruthyStringFields( source: T, keys: readonly TKey[], ): Partial> { const result: Partial> = {}; for (const key of keys) { const value = source[key]; if (typeof value === "string" && value) { result[key] = value as T[TKey]; } } return result; } function toFlowView(flow: FlowView): FlowView { return { flowId: flow.flowId, syncMode: flow.syncMode, ...pickOptionalTruthyStringFields(flow, [ "controllerId", "currentStep", "blockedTaskId", "blockedSummary", ]), revision: flow.revision, status: flow.status, notifyPolicy: flow.notifyPolicy, goal: flow.goal, ...pickOptionalFields(flow, ["stateJson", "waitJson", "cancelRequestedAt"]), createdAt: flow.createdAt, updatedAt: flow.updatedAt, ...pickOptionalFields(flow, ["endedAt"]), }; } function toTaskView(task: TaskView): TaskView { return { taskId: task.taskId, runtime: task.runtime, ...pickOptionalTruthyStringFields(task, [ "sourceId", "childSessionKey", "parentFlowId", "parentTaskId", "agentId", "runId", "label", "error", "progressSummary", "terminalSummary", "terminalOutcome", ]), scopeKind: task.scopeKind, task: task.task, status: task.status, deliveryStatus: task.deliveryStatus, notifyPolicy: task.notifyPolicy, createdAt: task.createdAt, ...pickOptionalFields(task, ["startedAt", "endedAt", "lastEventAt", "cleanupAfter"]), }; } function writeJson(res: ServerResponse, statusCode: number, body: unknown): void { res.statusCode = statusCode; res.setHeader("Content-Type", "application/json; charset=utf-8"); res.end(JSON.stringify(body)); } function extractSharedSecret(req: IncomingMessage): string { const authHeader = Array.isArray(req.headers.authorization) ? String(req.headers.authorization[0] ?? "") : String(req.headers.authorization ?? ""); if (normalizeLowercaseStringOrEmpty(authHeader).startsWith("bearer ")) { return authHeader.slice("bearer ".length).trim(); } const sharedHeader = req.headers["x-openclaw-webhook-secret"]; return Array.isArray(sharedHeader) ? String(sharedHeader[0] ?? "").trim() : String(sharedHeader ?? "").trim(); } function timingSafeEquals(left: string, right: string): boolean { // Reuse the shared helper so webhook auth semantics stay aligned across plugins. return safeEqualSecret(left, right); } function formatZodError(error: z.ZodError): string { const firstIssue = error.issues[0]; if (!firstIssue) { return "invalid request"; } const path = firstIssue.path.length > 0 ? `${firstIssue.path.join(".")}: ` : ""; return `${path}${firstIssue.message}`; } function mapMutationResult( result: | { applied: true; flow: FlowView; } | { applied: false; code: string; current?: FlowView; }, ): unknown { return result; } function mapFlowMutationResult( result: | { applied: true; flow: Parameters[0]; } | { applied: false; code: string; current?: Parameters[0]; }, ): unknown { return mapMutationResult( result.applied ? { applied: true, flow: toFlowView(result.flow) } : { applied: false, code: result.code, ...(result.current ? { current: toFlowView(result.current) } : {}), }, ); } function mapMutationStatus(result: { applied: boolean; code?: "not_found" | "not_managed" | "revision_conflict"; }): { statusCode: number; code?: string; error?: string } { if (result.applied) { return { statusCode: 200 }; } switch (result.code) { case "not_found": return { statusCode: 404, code: "not_found", error: "TaskFlow not found.", }; case "not_managed": return { statusCode: 409, code: "not_managed", error: "TaskFlow is not managed by this webhook surface.", }; case "revision_conflict": return { statusCode: 409, code: "revision_conflict", error: "TaskFlow changed since the caller's expected revision.", }; default: return { statusCode: 409, code: "mutation_rejected", error: "TaskFlow mutation was rejected.", }; } } function mapRunTaskStatus(result: { created: boolean; found: boolean; reason?: string }): { statusCode: number; code?: string; error?: string; } { if (result.created) { return { statusCode: 200 }; } if (!result.found) { return { statusCode: 404, code: "not_found", error: "TaskFlow not found.", }; } if (result.reason === "Flow cancellation has already been requested.") { return { statusCode: 409, code: "cancel_requested", error: result.reason, }; } if (result.reason === "Flow does not accept managed child tasks.") { return { statusCode: 409, code: "not_managed", error: result.reason, }; } if (result.reason?.startsWith("Flow is already ")) { return { statusCode: 409, code: "terminal", error: result.reason, }; } return { statusCode: 409, code: "task_not_created", error: result.reason ?? "TaskFlow task was not created.", }; } function mapCancelStatus(result: { found: boolean; cancelled: boolean; reason?: string }): { statusCode: number; code?: string; error?: string; } { if (result.cancelled) { return { statusCode: 200 }; } if (!result.found) { return { statusCode: 404, code: "not_found", error: "TaskFlow not found.", }; } if (result.reason === "One or more child tasks are still active.") { return { statusCode: 202, code: "cancel_pending", error: result.reason, }; } if (result.reason === "Flow changed while cancellation was in progress.") { return { statusCode: 409, code: "revision_conflict", error: result.reason, }; } if (result.reason?.startsWith("Flow is already ")) { return { statusCode: 409, code: "terminal", error: result.reason, }; } return { statusCode: 409, code: "cancel_rejected", error: result.reason ?? "TaskFlow cancellation was rejected.", }; } function describeWebhookOutcome(params: { action: WebhookAction; result: unknown }): { statusCode: number; code?: string; error?: string; } { switch (params.action.action) { case "set_waiting": case "resume_flow": case "finish_flow": case "fail_flow": case "request_cancel": return mapMutationStatus( params.result as { applied: boolean; code?: "not_found" | "not_managed" | "revision_conflict"; }, ); case "cancel_flow": return mapCancelStatus( params.result as { found: boolean; cancelled: boolean; reason?: string; }, ); case "run_task": return mapRunTaskStatus( params.result as { created: boolean; found: boolean; reason?: string; }, ); default: return { statusCode: 200 }; } } async function executeWebhookAction(params: { action: WebhookAction; target: TaskFlowWebhookTarget; cfg: OpenClawConfig; }): Promise { const { action, target } = params; switch (action.action) { case "create_flow": { const flow = target.taskFlow.createManaged({ controllerId: action.controllerId ?? target.defaultControllerId, goal: action.goal, status: action.status, notifyPolicy: action.notifyPolicy, currentStep: action.currentStep ?? undefined, stateJson: action.stateJson, waitJson: action.waitJson, }); return { flow: toFlowView(flow) }; } case "get_flow": { const flow = target.taskFlow.get(action.flowId); return { flow: flow ? toFlowView(flow) : null }; } case "list_flows": return { flows: target.taskFlow.list().map(toFlowView) }; case "find_latest_flow": { const flow = target.taskFlow.findLatest(); return { flow: flow ? toFlowView(flow) : null }; } case "resolve_flow": { const flow = target.taskFlow.resolve(action.token); return { flow: flow ? toFlowView(flow) : null }; } case "get_task_summary": return { summary: target.taskFlow.getTaskSummary(action.flowId) ?? null }; case "set_waiting": { const result = target.taskFlow.setWaiting({ flowId: action.flowId, expectedRevision: action.expectedRevision, currentStep: action.currentStep, stateJson: action.stateJson, waitJson: action.waitJson, blockedTaskId: action.blockedTaskId, blockedSummary: action.blockedSummary, }); return mapFlowMutationResult(result); } case "resume_flow": { const result = target.taskFlow.resume({ flowId: action.flowId, expectedRevision: action.expectedRevision, status: action.status, currentStep: action.currentStep, stateJson: action.stateJson, }); return mapFlowMutationResult(result); } case "finish_flow": { const result = target.taskFlow.finish({ flowId: action.flowId, expectedRevision: action.expectedRevision, stateJson: action.stateJson, }); return mapFlowMutationResult(result); } case "fail_flow": { const result = target.taskFlow.fail({ flowId: action.flowId, expectedRevision: action.expectedRevision, stateJson: action.stateJson, blockedTaskId: action.blockedTaskId, blockedSummary: action.blockedSummary, }); return mapFlowMutationResult(result); } case "request_cancel": { const result = target.taskFlow.requestCancel({ flowId: action.flowId, expectedRevision: action.expectedRevision, }); return mapFlowMutationResult(result); } case "cancel_flow": { const result = await target.taskFlow.cancel({ flowId: action.flowId, cfg: params.cfg, }); return { found: result.found, cancelled: result.cancelled, ...(result.reason ? { reason: result.reason } : {}), ...(result.flow ? { flow: toFlowView(result.flow) } : {}), ...(result.tasks ? { tasks: result.tasks.map(toTaskView) } : {}), }; } case "run_task": { const result = target.taskFlow.runTask({ flowId: action.flowId, runtime: action.runtime, sourceId: action.sourceId, childSessionKey: action.childSessionKey, parentTaskId: action.parentTaskId, agentId: action.agentId, runId: action.runId, label: action.label, task: action.task, preferMetadata: action.preferMetadata, notifyPolicy: action.notifyPolicy, status: action.status, startedAt: action.startedAt, lastEventAt: action.lastEventAt, progressSummary: action.progressSummary, }); if (result.created) { return { created: true, flow: toFlowView(result.flow), task: toTaskView(result.task), }; } return { found: result.found, created: false, reason: result.reason, ...(result.flow ? { flow: toFlowView(result.flow) } : {}), }; } } throw new Error("Unsupported webhook action"); } export function createTaskFlowWebhookRequestHandler(params: { cfg: OpenClawConfig; targetsByPath: Map; inFlightLimiter?: WebhookInFlightLimiter; }): (req: IncomingMessage, res: ServerResponse) => Promise { const rateLimiter = createFixedWindowRateLimiter({ windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs, maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests, maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys, }); const inFlightLimiter = params.inFlightLimiter ?? createWebhookInFlightLimiter({ maxInFlightPerKey: WEBHOOK_IN_FLIGHT_DEFAULTS.maxInFlightPerKey, maxTrackedKeys: WEBHOOK_IN_FLIGHT_DEFAULTS.maxTrackedKeys, }); return async (req: IncomingMessage, res: ServerResponse): Promise => { return await withResolvedWebhookRequestPipeline({ req, res, targetsByPath: params.targetsByPath, allowMethods: ["POST"], requireJsonContentType: true, rateLimiter, rateLimitKey: (() => { const clientIp = resolveRequestClientIp( req, params.cfg.gateway?.trustedProxies, params.cfg.gateway?.allowRealIpFallback === true, ) ?? req.socket.remoteAddress ?? "unknown"; return `${new URL(req.url ?? "/", "http://localhost").pathname}:${clientIp}`; })(), inFlightLimiter, handle: async ({ targets }) => { const presentedSecret = extractSharedSecret(req); const target = resolveWebhookTargetWithAuthOrRejectSync({ targets, res, isMatch: (candidate) => presentedSecret.length > 0 && timingSafeEquals(candidate.secret, presentedSecret), }); if (!target) { return true; } const body = await readJsonWebhookBodyOrReject({ req, res, maxBytes: 256 * 1024, timeoutMs: 15_000, emptyObjectOnEmpty: false, invalidJsonMessage: "invalid request body", }); if (!body.ok) { return true; } const parsed = webhookActionSchema.safeParse(body.value); if (!parsed.success) { writeJson(res, 400, { ok: false, code: "invalid_request", error: formatZodError(parsed.error), }); return true; } const result = await executeWebhookAction({ action: parsed.data, target, cfg: params.cfg, }); const outcome = describeWebhookOutcome({ action: parsed.data, result, }); writeJson( res, outcome.statusCode, outcome.statusCode < 400 ? { ok: true, routeId: target.routeId, ...(outcome.code ? { code: outcome.code } : {}), result, } : { ok: false, routeId: target.routeId, code: outcome.code ?? "request_rejected", error: outcome.error ?? "request rejected", result, }, ); return true; }, }); }; }