mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-02 07:10:23 +00:00
refactor(synology-chat): split inbound webhook flow
This commit is contained in:
@@ -17,21 +17,20 @@ import {
|
||||
import { attachChannelToResult } from "openclaw/plugin-sdk/channel-send-result";
|
||||
import { createEmptyChannelDirectoryAdapter } from "openclaw/plugin-sdk/directory-runtime";
|
||||
import { DEFAULT_ACCOUNT_ID } from "openclaw/plugin-sdk/setup";
|
||||
import { registerPluginHttpRoute } from "openclaw/plugin-sdk/webhook-ingress";
|
||||
import { z } from "zod";
|
||||
import { listAccountIds, resolveAccount } from "./accounts.js";
|
||||
import { sendMessage, sendFileUrl } from "./client.js";
|
||||
import { getSynologyRuntime } from "./runtime.js";
|
||||
import { buildSynologyChatInboundSessionKey } from "./session-key.js";
|
||||
import {
|
||||
registerSynologyWebhookRoute,
|
||||
validateSynologyGatewayAccountStartup,
|
||||
waitUntilAbort,
|
||||
} from "./gateway-runtime.js";
|
||||
import { synologyChatSetupAdapter, synologyChatSetupWizard } from "./setup-surface.js";
|
||||
import type { ResolvedSynologyChatAccount } from "./types.js";
|
||||
import { createWebhookHandler } from "./webhook-handler.js";
|
||||
|
||||
const CHANNEL_ID = "synology-chat";
|
||||
const SynologyChatConfigSchema = buildChannelConfigSchema(z.object({}).passthrough());
|
||||
|
||||
const activeRouteUnregisters = new Map<string, () => void>();
|
||||
|
||||
const resolveSynologyChatDmPolicy = createScopedDmSecurityResolver<ResolvedSynologyChatAccount>({
|
||||
channelKey: CHANNEL_ID,
|
||||
resolvePolicy: (account) => account.dmPolicy,
|
||||
@@ -83,23 +82,6 @@ const collectSynologyChatSecurityWarnings =
|
||||
'- Synology Chat: dmPolicy="allowlist" with empty allowedUserIds blocks all senders. Add users or set dmPolicy="open".',
|
||||
);
|
||||
|
||||
function waitUntilAbort(signal?: AbortSignal, onAbort?: () => void): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
const complete = () => {
|
||||
onAbort?.();
|
||||
resolve();
|
||||
};
|
||||
if (!signal) {
|
||||
return;
|
||||
}
|
||||
if (signal.aborted) {
|
||||
complete();
|
||||
return;
|
||||
}
|
||||
signal.addEventListener("abort", complete, { once: true });
|
||||
});
|
||||
}
|
||||
|
||||
export function createSynologyChatPlugin() {
|
||||
return {
|
||||
id: CHANNEL_ID,
|
||||
@@ -215,122 +197,14 @@ export function createSynologyChatPlugin() {
|
||||
startAccount: async (ctx: any) => {
|
||||
const { cfg, accountId, log } = ctx;
|
||||
const account = resolveAccount(cfg, accountId);
|
||||
|
||||
if (!account.enabled) {
|
||||
log?.info?.(`Synology Chat account ${accountId} is disabled, skipping`);
|
||||
return waitUntilAbort(ctx.abortSignal);
|
||||
}
|
||||
|
||||
if (!account.token || !account.incomingUrl) {
|
||||
log?.warn?.(
|
||||
`Synology Chat account ${accountId} not fully configured (missing token or incomingUrl)`,
|
||||
);
|
||||
return waitUntilAbort(ctx.abortSignal);
|
||||
}
|
||||
if (account.dmPolicy === "allowlist" && account.allowedUserIds.length === 0) {
|
||||
log?.warn?.(
|
||||
`Synology Chat account ${accountId} has dmPolicy=allowlist but empty allowedUserIds; refusing to start route`,
|
||||
);
|
||||
if (!validateSynologyGatewayAccountStartup({ account, accountId, log }).ok) {
|
||||
return waitUntilAbort(ctx.abortSignal);
|
||||
}
|
||||
|
||||
log?.info?.(
|
||||
`Starting Synology Chat channel (account: ${accountId}, path: ${account.webhookPath})`,
|
||||
);
|
||||
|
||||
const handler = createWebhookHandler({
|
||||
account,
|
||||
deliver: async (msg) => {
|
||||
const rt = getSynologyRuntime();
|
||||
const currentCfg = await rt.config.loadConfig();
|
||||
|
||||
// The Chat API user_id (for sending) may differ from the webhook
|
||||
// user_id (used for sessions/pairing). Use chatUserId for API calls.
|
||||
const sendUserId = msg.chatUserId ?? msg.from;
|
||||
const route = rt.channel.routing.resolveAgentRoute({
|
||||
cfg: currentCfg,
|
||||
channel: CHANNEL_ID,
|
||||
accountId: account.accountId,
|
||||
peer: {
|
||||
kind: "direct",
|
||||
id: msg.from,
|
||||
},
|
||||
});
|
||||
const sessionKey = buildSynologyChatInboundSessionKey({
|
||||
agentId: route.agentId,
|
||||
accountId: account.accountId,
|
||||
userId: msg.from,
|
||||
identityLinks: currentCfg.session?.identityLinks,
|
||||
});
|
||||
|
||||
// Build MsgContext using SDK's finalizeInboundContext for proper normalization
|
||||
const msgCtx = rt.channel.reply.finalizeInboundContext({
|
||||
Body: msg.body,
|
||||
RawBody: msg.body,
|
||||
CommandBody: msg.body,
|
||||
From: `synology-chat:${msg.from}`,
|
||||
To: `synology-chat:${msg.from}`,
|
||||
SessionKey: sessionKey,
|
||||
AccountId: account.accountId,
|
||||
OriginatingChannel: CHANNEL_ID,
|
||||
OriginatingTo: `synology-chat:${msg.from}`,
|
||||
ChatType: msg.chatType,
|
||||
SenderName: msg.senderName,
|
||||
SenderId: msg.from,
|
||||
Provider: CHANNEL_ID,
|
||||
Surface: CHANNEL_ID,
|
||||
ConversationLabel: msg.senderName || msg.from,
|
||||
Timestamp: Date.now(),
|
||||
CommandAuthorized: msg.commandAuthorized,
|
||||
});
|
||||
|
||||
// Dispatch via the SDK's buffered block dispatcher
|
||||
await rt.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
|
||||
ctx: msgCtx,
|
||||
cfg: currentCfg,
|
||||
dispatcherOptions: {
|
||||
deliver: async (payload: { text?: string; body?: string }) => {
|
||||
const text = payload?.text ?? payload?.body;
|
||||
if (text) {
|
||||
await sendMessage(
|
||||
account.incomingUrl,
|
||||
text,
|
||||
sendUserId,
|
||||
account.allowInsecureSsl,
|
||||
);
|
||||
}
|
||||
},
|
||||
onReplyStart: () => {
|
||||
log?.info?.(`Agent reply started for ${msg.from}`);
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
return null;
|
||||
},
|
||||
log,
|
||||
});
|
||||
|
||||
// Deregister any stale route from a previous start (e.g. on auto-restart)
|
||||
// to avoid "already registered" collisions that trigger infinite loops.
|
||||
const routeKey = `${accountId}:${account.webhookPath}`;
|
||||
const prevUnregister = activeRouteUnregisters.get(routeKey);
|
||||
if (prevUnregister) {
|
||||
log?.info?.(`Deregistering stale route before re-registering: ${account.webhookPath}`);
|
||||
prevUnregister();
|
||||
activeRouteUnregisters.delete(routeKey);
|
||||
}
|
||||
|
||||
const unregister = registerPluginHttpRoute({
|
||||
path: account.webhookPath,
|
||||
auth: "plugin",
|
||||
replaceExisting: true,
|
||||
pluginId: CHANNEL_ID,
|
||||
accountId: account.accountId,
|
||||
log: (msg: string) => log?.info?.(msg),
|
||||
handler,
|
||||
});
|
||||
activeRouteUnregisters.set(routeKey, unregister);
|
||||
const unregister = registerSynologyWebhookRoute({ account, accountId, log });
|
||||
|
||||
log?.info?.(`Registered HTTP route: ${account.webhookPath} for Synology Chat`);
|
||||
|
||||
@@ -339,8 +213,7 @@ export function createSynologyChatPlugin() {
|
||||
// Resolving immediately triggers a restart loop.
|
||||
return waitUntilAbort(ctx.abortSignal, () => {
|
||||
log?.info?.(`Stopping Synology Chat channel (account: ${accountId})`);
|
||||
if (typeof unregister === "function") unregister();
|
||||
activeRouteUnregisters.delete(routeKey);
|
||||
unregister();
|
||||
});
|
||||
},
|
||||
|
||||
|
||||
87
extensions/synology-chat/src/gateway-runtime.ts
Normal file
87
extensions/synology-chat/src/gateway-runtime.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
import { registerPluginHttpRoute } from "openclaw/plugin-sdk/webhook-ingress";
|
||||
import { dispatchSynologyChatInboundTurn } from "./inbound-turn.js";
|
||||
import type { ResolvedSynologyChatAccount } from "./types.js";
|
||||
import { createWebhookHandler, type WebhookHandlerDeps } from "./webhook-handler.js";
|
||||
|
||||
const CHANNEL_ID = "synology-chat";
|
||||
|
||||
type SynologyGatewayLog = WebhookHandlerDeps["log"];
|
||||
|
||||
const activeRouteUnregisters = new Map<string, () => void>();
|
||||
|
||||
export function waitUntilAbort(signal?: AbortSignal, onAbort?: () => void): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
const complete = () => {
|
||||
onAbort?.();
|
||||
resolve();
|
||||
};
|
||||
if (!signal) {
|
||||
return;
|
||||
}
|
||||
if (signal.aborted) {
|
||||
complete();
|
||||
return;
|
||||
}
|
||||
signal.addEventListener("abort", complete, { once: true });
|
||||
});
|
||||
}
|
||||
|
||||
export function validateSynologyGatewayAccountStartup(params: {
|
||||
account: ResolvedSynologyChatAccount;
|
||||
accountId: string;
|
||||
log?: SynologyGatewayLog;
|
||||
}): { ok: true } | { ok: false } {
|
||||
const { accountId, account, log } = params;
|
||||
if (!account.enabled) {
|
||||
log?.info?.(`Synology Chat account ${accountId} is disabled, skipping`);
|
||||
return { ok: false };
|
||||
}
|
||||
if (!account.token || !account.incomingUrl) {
|
||||
log?.warn?.(
|
||||
`Synology Chat account ${accountId} not fully configured (missing token or incomingUrl)`,
|
||||
);
|
||||
return { ok: false };
|
||||
}
|
||||
if (account.dmPolicy === "allowlist" && account.allowedUserIds.length === 0) {
|
||||
log?.warn?.(
|
||||
`Synology Chat account ${accountId} has dmPolicy=allowlist but empty allowedUserIds; refusing to start route`,
|
||||
);
|
||||
return { ok: false };
|
||||
}
|
||||
return { ok: true };
|
||||
}
|
||||
|
||||
export function registerSynologyWebhookRoute(params: {
|
||||
account: ResolvedSynologyChatAccount;
|
||||
accountId: string;
|
||||
log?: SynologyGatewayLog;
|
||||
}): () => void {
|
||||
const { account, accountId, log } = params;
|
||||
const routeKey = `${accountId}:${account.webhookPath}`;
|
||||
const prevUnregister = activeRouteUnregisters.get(routeKey);
|
||||
if (prevUnregister) {
|
||||
log?.info?.(`Deregistering stale route before re-registering: ${account.webhookPath}`);
|
||||
prevUnregister();
|
||||
activeRouteUnregisters.delete(routeKey);
|
||||
}
|
||||
|
||||
const handler = createWebhookHandler({
|
||||
account,
|
||||
deliver: async (msg) => await dispatchSynologyChatInboundTurn({ account, msg, log }),
|
||||
log,
|
||||
});
|
||||
const unregister = registerPluginHttpRoute({
|
||||
path: account.webhookPath,
|
||||
auth: "plugin",
|
||||
replaceExisting: true,
|
||||
pluginId: CHANNEL_ID,
|
||||
accountId: account.accountId,
|
||||
log: (msg: string) => log?.info?.(msg),
|
||||
handler,
|
||||
});
|
||||
activeRouteUnregisters.set(routeKey, unregister);
|
||||
return () => {
|
||||
unregister();
|
||||
activeRouteUnregisters.delete(routeKey);
|
||||
};
|
||||
}
|
||||
42
extensions/synology-chat/src/inbound-context.ts
Normal file
42
extensions/synology-chat/src/inbound-context.ts
Normal file
@@ -0,0 +1,42 @@
|
||||
import type { ResolvedSynologyChatAccount } from "./types.js";
|
||||
|
||||
const CHANNEL_ID = "synology-chat";
|
||||
|
||||
export type SynologyInboundMessage = {
|
||||
body: string;
|
||||
from: string;
|
||||
senderName: string;
|
||||
provider: string;
|
||||
chatType: string;
|
||||
accountId: string;
|
||||
commandAuthorized: boolean;
|
||||
chatUserId?: string;
|
||||
};
|
||||
|
||||
export function buildSynologyChatInboundContext<TContext>(params: {
|
||||
finalizeInboundContext: (ctx: Record<string, unknown>) => TContext;
|
||||
account: ResolvedSynologyChatAccount;
|
||||
msg: SynologyInboundMessage;
|
||||
sessionKey: string;
|
||||
}): TContext {
|
||||
const { account, msg, sessionKey } = params;
|
||||
return params.finalizeInboundContext({
|
||||
Body: msg.body,
|
||||
RawBody: msg.body,
|
||||
CommandBody: msg.body,
|
||||
From: `synology-chat:${msg.from}`,
|
||||
To: `synology-chat:${msg.from}`,
|
||||
SessionKey: sessionKey,
|
||||
AccountId: account.accountId,
|
||||
OriginatingChannel: CHANNEL_ID,
|
||||
OriginatingTo: `synology-chat:${msg.from}`,
|
||||
ChatType: msg.chatType,
|
||||
SenderName: msg.senderName,
|
||||
SenderId: msg.from,
|
||||
Provider: CHANNEL_ID,
|
||||
Surface: CHANNEL_ID,
|
||||
ConversationLabel: msg.senderName || msg.from,
|
||||
Timestamp: Date.now(),
|
||||
CommandAuthorized: msg.commandAuthorized,
|
||||
});
|
||||
}
|
||||
99
extensions/synology-chat/src/inbound-turn.ts
Normal file
99
extensions/synology-chat/src/inbound-turn.ts
Normal file
@@ -0,0 +1,99 @@
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/core";
|
||||
import { sendMessage } from "./client.js";
|
||||
import { buildSynologyChatInboundContext, type SynologyInboundMessage } from "./inbound-context.js";
|
||||
import { getSynologyRuntime } from "./runtime.js";
|
||||
import { buildSynologyChatInboundSessionKey } from "./session-key.js";
|
||||
import type { ResolvedSynologyChatAccount } from "./types.js";
|
||||
|
||||
const CHANNEL_ID = "synology-chat";
|
||||
|
||||
type SynologyChannelLog = {
|
||||
info?: (...args: unknown[]) => void;
|
||||
};
|
||||
|
||||
function resolveSynologyChatInboundRoute(params: {
|
||||
cfg: OpenClawConfig;
|
||||
account: ResolvedSynologyChatAccount;
|
||||
userId: string;
|
||||
}) {
|
||||
const rt = getSynologyRuntime();
|
||||
const route = rt.channel.routing.resolveAgentRoute({
|
||||
cfg: params.cfg,
|
||||
channel: CHANNEL_ID,
|
||||
accountId: params.account.accountId,
|
||||
peer: {
|
||||
kind: "direct",
|
||||
id: params.userId,
|
||||
},
|
||||
});
|
||||
return {
|
||||
rt,
|
||||
route,
|
||||
sessionKey: buildSynologyChatInboundSessionKey({
|
||||
agentId: route.agentId,
|
||||
accountId: params.account.accountId,
|
||||
userId: params.userId,
|
||||
identityLinks: params.cfg.session?.identityLinks,
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
async function deliverSynologyChatReply(params: {
|
||||
account: ResolvedSynologyChatAccount;
|
||||
sendUserId: string;
|
||||
payload: { text?: string; body?: string };
|
||||
}): Promise<void> {
|
||||
const text = params.payload.text ?? params.payload.body;
|
||||
if (!text) {
|
||||
return;
|
||||
}
|
||||
await sendMessage(
|
||||
params.account.incomingUrl,
|
||||
text,
|
||||
params.sendUserId,
|
||||
params.account.allowInsecureSsl,
|
||||
);
|
||||
}
|
||||
|
||||
export async function dispatchSynologyChatInboundTurn(params: {
|
||||
account: ResolvedSynologyChatAccount;
|
||||
msg: SynologyInboundMessage;
|
||||
log?: SynologyChannelLog;
|
||||
}): Promise<null> {
|
||||
const rt = getSynologyRuntime();
|
||||
const currentCfg = await rt.config.loadConfig();
|
||||
|
||||
// The Chat API user_id (for sending) may differ from the webhook
|
||||
// user_id (used for sessions/pairing). Use chatUserId for API calls.
|
||||
const sendUserId = params.msg.chatUserId ?? params.msg.from;
|
||||
const resolved = resolveSynologyChatInboundRoute({
|
||||
cfg: currentCfg,
|
||||
account: params.account,
|
||||
userId: params.msg.from,
|
||||
});
|
||||
const msgCtx = buildSynologyChatInboundContext({
|
||||
finalizeInboundContext: resolved.rt.channel.reply.finalizeInboundContext,
|
||||
account: params.account,
|
||||
msg: params.msg,
|
||||
sessionKey: resolved.sessionKey,
|
||||
});
|
||||
|
||||
await resolved.rt.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
|
||||
ctx: msgCtx,
|
||||
cfg: currentCfg,
|
||||
dispatcherOptions: {
|
||||
deliver: async (payload: { text?: string; body?: string }) => {
|
||||
await deliverSynologyChatReply({
|
||||
account: params.account,
|
||||
sendUserId,
|
||||
payload,
|
||||
});
|
||||
},
|
||||
onReplyStart: () => {
|
||||
params.log?.info?.(`Agent reply started for ${params.msg.from}`);
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
return null;
|
||||
}
|
||||
@@ -1,10 +1,14 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
|
||||
export function makeReq(method: string, body: string): IncomingMessage {
|
||||
export function makeBaseReq(
|
||||
method: string,
|
||||
opts: { headers?: Record<string, string>; url?: string } = {},
|
||||
): IncomingMessage & { destroyed: boolean } {
|
||||
const req = new EventEmitter() as IncomingMessage & { destroyed: boolean };
|
||||
req.method = method;
|
||||
req.headers = {};
|
||||
req.headers = opts.headers ?? {};
|
||||
req.url = opts.url ?? "/webhook/synology";
|
||||
req.socket = { remoteAddress: "127.0.0.1" } as unknown as IncomingMessage["socket"];
|
||||
req.destroyed = false;
|
||||
req.destroy = ((_: Error | undefined) => {
|
||||
@@ -14,6 +18,15 @@ export function makeReq(method: string, body: string): IncomingMessage {
|
||||
req.destroyed = true;
|
||||
return req;
|
||||
}) as IncomingMessage["destroy"];
|
||||
return req;
|
||||
}
|
||||
|
||||
export function makeReq(
|
||||
method: string,
|
||||
body: string,
|
||||
opts: { headers?: Record<string, string>; url?: string } = {},
|
||||
): IncomingMessage {
|
||||
const req = makeBaseReq(method, opts);
|
||||
process.nextTick(() => {
|
||||
if (req.destroyed) {
|
||||
return;
|
||||
@@ -24,6 +37,13 @@ export function makeReq(method: string, body: string): IncomingMessage {
|
||||
return req;
|
||||
}
|
||||
|
||||
export function makeStalledReq(
|
||||
method: string,
|
||||
opts: { headers?: Record<string, string>; url?: string } = {},
|
||||
): IncomingMessage {
|
||||
return makeBaseReq(method, opts);
|
||||
}
|
||||
|
||||
export function makeRes(): ServerResponse & { _status: number; _body: string } {
|
||||
const res = {
|
||||
_status: 0,
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import { describe, it, expect, vi, beforeEach } from "vitest";
|
||||
import { makeFormBody, makeReq, makeRes, makeStalledReq } from "./test-http-utils.js";
|
||||
import type { ResolvedSynologyChatAccount } from "./types.js";
|
||||
import type { WebhookHandlerDeps } from "./webhook-handler.js";
|
||||
import {
|
||||
@@ -33,70 +32,6 @@ function makeAccount(
|
||||
};
|
||||
}
|
||||
|
||||
function makeReq(
|
||||
method: string,
|
||||
body: string,
|
||||
opts: { headers?: Record<string, string>; url?: string } = {},
|
||||
): IncomingMessage {
|
||||
const req = makeBaseReq(method, opts);
|
||||
|
||||
// Simulate body delivery
|
||||
process.nextTick(() => {
|
||||
if (req.destroyed) {
|
||||
return;
|
||||
}
|
||||
req.emit("data", Buffer.from(body));
|
||||
req.emit("end");
|
||||
});
|
||||
|
||||
return req;
|
||||
}
|
||||
function makeStalledReq(method: string): IncomingMessage {
|
||||
return makeBaseReq(method);
|
||||
}
|
||||
|
||||
function makeBaseReq(
|
||||
method: string,
|
||||
opts: { headers?: Record<string, string>; url?: string } = {},
|
||||
): IncomingMessage & { destroyed: boolean } {
|
||||
const req = new EventEmitter() as IncomingMessage & {
|
||||
destroyed: boolean;
|
||||
};
|
||||
req.method = method;
|
||||
req.headers = opts.headers ?? {};
|
||||
req.url = opts.url ?? "/webhook/synology";
|
||||
req.socket = { remoteAddress: "127.0.0.1" } as any;
|
||||
req.destroyed = false;
|
||||
req.destroy = ((_: Error | undefined) => {
|
||||
if (req.destroyed) {
|
||||
return req;
|
||||
}
|
||||
req.destroyed = true;
|
||||
return req;
|
||||
}) as IncomingMessage["destroy"];
|
||||
return req;
|
||||
}
|
||||
|
||||
function makeRes(): ServerResponse & { _status: number; _body: string } {
|
||||
const res = {
|
||||
_status: 0,
|
||||
_body: "",
|
||||
writeHead(statusCode: number, _headers?: Record<string, string>) {
|
||||
res._status = statusCode;
|
||||
},
|
||||
end(body?: string) {
|
||||
res._body = body ?? "";
|
||||
},
|
||||
} as any;
|
||||
return res;
|
||||
}
|
||||
|
||||
function makeFormBody(fields: Record<string, string>): string {
|
||||
return Object.entries(fields)
|
||||
.map(([k, v]) => `${encodeURIComponent(k)}=${encodeURIComponent(v)}`)
|
||||
.join("&");
|
||||
}
|
||||
|
||||
const validBody = makeFormBody({
|
||||
token: "valid-token",
|
||||
user_id: "123",
|
||||
|
||||
@@ -219,17 +219,7 @@ function respondNoContent(res: ServerResponse) {
|
||||
|
||||
export interface WebhookHandlerDeps {
|
||||
account: ResolvedSynologyChatAccount;
|
||||
deliver: (msg: {
|
||||
body: string;
|
||||
from: string;
|
||||
senderName: string;
|
||||
provider: string;
|
||||
chatType: string;
|
||||
accountId: string;
|
||||
commandAuthorized: boolean;
|
||||
/** Chat API user_id for sending replies (may differ from webhook user_id) */
|
||||
chatUserId?: string;
|
||||
}) => Promise<string | null>;
|
||||
deliver: (msg: import("./inbound-context.js").SynologyInboundMessage) => Promise<string | null>;
|
||||
log?: {
|
||||
info: (...args: unknown[]) => void;
|
||||
warn: (...args: unknown[]) => void;
|
||||
@@ -249,6 +239,212 @@ export interface WebhookHandlerDeps {
|
||||
* 6. Immediately ACKs request (204)
|
||||
* 7. Delivers to the agent asynchronously and sends final reply via incomingUrl
|
||||
*/
|
||||
type SynologyWebhookAuthorization =
|
||||
| { ok: false; statusCode: number; error: string }
|
||||
| { ok: true; commandAuthorized: boolean };
|
||||
|
||||
type AuthorizedSynologyWebhook = {
|
||||
payload: SynologyWebhookPayload;
|
||||
body: string;
|
||||
commandAuthorized: boolean;
|
||||
preview: string;
|
||||
};
|
||||
|
||||
async function parseWebhookPayloadRequest(params: {
|
||||
req: IncomingMessage;
|
||||
res: ServerResponse;
|
||||
log?: WebhookHandlerDeps["log"];
|
||||
}): Promise<{ ok: false } | { ok: true; payload: SynologyWebhookPayload }> {
|
||||
const bodyResult = await readBody(params.req);
|
||||
if (!bodyResult.ok) {
|
||||
params.log?.error("Failed to read request body", bodyResult.error);
|
||||
respondJson(params.res, bodyResult.statusCode, { error: bodyResult.error });
|
||||
return { ok: false };
|
||||
}
|
||||
|
||||
let payload: SynologyWebhookPayload | null = null;
|
||||
try {
|
||||
payload = parsePayload(params.req, bodyResult.body);
|
||||
} catch (err) {
|
||||
params.log?.warn("Failed to parse webhook payload", err);
|
||||
respondJson(params.res, 400, { error: "Invalid request body" });
|
||||
return { ok: false };
|
||||
}
|
||||
if (!payload) {
|
||||
respondJson(params.res, 400, { error: "Missing required fields (token, user_id, text)" });
|
||||
return { ok: false };
|
||||
}
|
||||
return { ok: true, payload };
|
||||
}
|
||||
|
||||
function authorizeSynologyWebhook(params: {
|
||||
req: IncomingMessage;
|
||||
account: ResolvedSynologyChatAccount;
|
||||
payload: SynologyWebhookPayload;
|
||||
rateLimiter: RateLimiter;
|
||||
log?: WebhookHandlerDeps["log"];
|
||||
}): SynologyWebhookAuthorization {
|
||||
if (!validateToken(params.payload.token, params.account.token)) {
|
||||
params.log?.warn(`Invalid token from ${params.req.socket?.remoteAddress}`);
|
||||
return { ok: false, statusCode: 401, error: "Invalid token" };
|
||||
}
|
||||
|
||||
const auth = authorizeUserForDm(
|
||||
params.payload.user_id,
|
||||
params.account.dmPolicy,
|
||||
params.account.allowedUserIds,
|
||||
);
|
||||
if (!auth.allowed) {
|
||||
if (auth.reason === "disabled") {
|
||||
return { ok: false, statusCode: 403, error: "DMs are disabled" };
|
||||
}
|
||||
if (auth.reason === "allowlist-empty") {
|
||||
params.log?.warn(
|
||||
"Synology Chat allowlist is empty while dmPolicy=allowlist; rejecting message",
|
||||
);
|
||||
return {
|
||||
ok: false,
|
||||
statusCode: 403,
|
||||
error: "Allowlist is empty. Configure allowedUserIds or use dmPolicy=open.",
|
||||
};
|
||||
}
|
||||
params.log?.warn(`Unauthorized user: ${params.payload.user_id}`);
|
||||
return { ok: false, statusCode: 403, error: "User not authorized" };
|
||||
}
|
||||
|
||||
if (!params.rateLimiter.check(params.payload.user_id)) {
|
||||
params.log?.warn(`Rate limit exceeded for user: ${params.payload.user_id}`);
|
||||
return { ok: false, statusCode: 429, error: "Rate limit exceeded" };
|
||||
}
|
||||
|
||||
return { ok: true, commandAuthorized: auth.allowed };
|
||||
}
|
||||
|
||||
function sanitizeSynologyWebhookText(payload: SynologyWebhookPayload): string {
|
||||
let cleanText = sanitizeInput(payload.text);
|
||||
if (payload.trigger_word && cleanText.startsWith(payload.trigger_word)) {
|
||||
cleanText = cleanText.slice(payload.trigger_word.length).trim();
|
||||
}
|
||||
return cleanText;
|
||||
}
|
||||
|
||||
async function parseAndAuthorizeSynologyWebhook(params: {
|
||||
req: IncomingMessage;
|
||||
res: ServerResponse;
|
||||
account: ResolvedSynologyChatAccount;
|
||||
rateLimiter: RateLimiter;
|
||||
log?: WebhookHandlerDeps["log"];
|
||||
}): Promise<{ ok: false } | { ok: true; message: AuthorizedSynologyWebhook }> {
|
||||
const parsed = await parseWebhookPayloadRequest(params);
|
||||
if (!parsed.ok) {
|
||||
return { ok: false };
|
||||
}
|
||||
|
||||
const authorized = authorizeSynologyWebhook({
|
||||
req: params.req,
|
||||
account: params.account,
|
||||
payload: parsed.payload,
|
||||
rateLimiter: params.rateLimiter,
|
||||
log: params.log,
|
||||
});
|
||||
if (!authorized.ok) {
|
||||
respondJson(params.res, authorized.statusCode, { error: authorized.error });
|
||||
return { ok: false };
|
||||
}
|
||||
|
||||
const cleanText = sanitizeSynologyWebhookText(parsed.payload);
|
||||
if (!cleanText) {
|
||||
respondNoContent(params.res);
|
||||
return { ok: false };
|
||||
}
|
||||
const preview = cleanText.length > 100 ? `${cleanText.slice(0, 100)}...` : cleanText;
|
||||
return {
|
||||
ok: true,
|
||||
message: {
|
||||
payload: parsed.payload,
|
||||
body: cleanText,
|
||||
commandAuthorized: authorized.commandAuthorized,
|
||||
preview,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function resolveSynologyReplyUserId(params: {
|
||||
account: ResolvedSynologyChatAccount;
|
||||
payload: SynologyWebhookPayload;
|
||||
log?: WebhookHandlerDeps["log"];
|
||||
}): Promise<string> {
|
||||
const chatUserId = await resolveChatUserId(
|
||||
params.account.incomingUrl,
|
||||
params.payload.username,
|
||||
params.account.allowInsecureSsl,
|
||||
params.log,
|
||||
);
|
||||
if (chatUserId !== undefined) {
|
||||
return String(chatUserId);
|
||||
}
|
||||
params.log?.warn(
|
||||
`Could not resolve Chat API user_id for "${params.payload.username}" — falling back to webhook user_id ${params.payload.user_id}. Reply delivery may fail.`,
|
||||
);
|
||||
return params.payload.user_id;
|
||||
}
|
||||
|
||||
async function processAuthorizedSynologyWebhook(params: {
|
||||
account: ResolvedSynologyChatAccount;
|
||||
deliver: WebhookHandlerDeps["deliver"];
|
||||
log?: WebhookHandlerDeps["log"];
|
||||
message: AuthorizedSynologyWebhook;
|
||||
}): Promise<void> {
|
||||
let replyUserId = params.message.payload.user_id;
|
||||
try {
|
||||
replyUserId = await resolveSynologyReplyUserId({
|
||||
account: params.account,
|
||||
payload: params.message.payload,
|
||||
log: params.log,
|
||||
});
|
||||
|
||||
const deliverPromise = params.deliver({
|
||||
body: params.message.body,
|
||||
from: params.message.payload.user_id,
|
||||
senderName: params.message.payload.username,
|
||||
provider: "synology-chat",
|
||||
chatType: "direct",
|
||||
accountId: params.account.accountId,
|
||||
commandAuthorized: params.message.commandAuthorized,
|
||||
chatUserId: replyUserId,
|
||||
});
|
||||
const timeoutPromise = new Promise<null>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("Agent response timeout (120s)")), 120_000),
|
||||
);
|
||||
const reply = await Promise.race([deliverPromise, timeoutPromise]);
|
||||
if (!reply) {
|
||||
return;
|
||||
}
|
||||
|
||||
await sendMessage(
|
||||
params.account.incomingUrl,
|
||||
reply,
|
||||
replyUserId,
|
||||
params.account.allowInsecureSsl,
|
||||
);
|
||||
const replyPreview = reply.length > 100 ? `${reply.slice(0, 100)}...` : reply;
|
||||
params.log?.info?.(
|
||||
`Reply sent to ${params.message.payload.username} (${replyUserId}): ${replyPreview}`,
|
||||
);
|
||||
} catch (err) {
|
||||
const errMsg = err instanceof Error ? `${err.message}\n${err.stack}` : String(err);
|
||||
params.log?.error?.(
|
||||
`Failed to process message from ${params.message.payload.username}: ${errMsg}`,
|
||||
);
|
||||
await sendMessage(
|
||||
params.account.incomingUrl,
|
||||
"Sorry, an error occurred while processing your message.",
|
||||
replyUserId,
|
||||
params.account.allowInsecureSsl,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export function createWebhookHandler(deps: WebhookHandlerDeps) {
|
||||
const { account, deliver, log } = deps;
|
||||
const rateLimiter = getRateLimiter(account);
|
||||
@@ -259,136 +455,28 @@ export function createWebhookHandler(deps: WebhookHandlerDeps) {
|
||||
respondJson(res, 405, { error: "Method not allowed" });
|
||||
return;
|
||||
}
|
||||
|
||||
// Parse body
|
||||
const bodyResult = await readBody(req);
|
||||
if (!bodyResult.ok) {
|
||||
log?.error("Failed to read request body", bodyResult.error);
|
||||
respondJson(res, bodyResult.statusCode, { error: bodyResult.error });
|
||||
const authorized = await parseAndAuthorizeSynologyWebhook({
|
||||
req,
|
||||
res,
|
||||
account,
|
||||
rateLimiter,
|
||||
log,
|
||||
});
|
||||
if (!authorized.ok) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Parse payload
|
||||
let payload: SynologyWebhookPayload | null = null;
|
||||
try {
|
||||
payload = parsePayload(req, bodyResult.body);
|
||||
} catch (err) {
|
||||
log?.warn("Failed to parse webhook payload", err);
|
||||
respondJson(res, 400, { error: "Invalid request body" });
|
||||
return;
|
||||
}
|
||||
if (!payload) {
|
||||
respondJson(res, 400, { error: "Missing required fields (token, user_id, text)" });
|
||||
return;
|
||||
}
|
||||
|
||||
// Token validation
|
||||
if (!validateToken(payload.token, account.token)) {
|
||||
log?.warn(`Invalid token from ${req.socket?.remoteAddress}`);
|
||||
respondJson(res, 401, { error: "Invalid token" });
|
||||
return;
|
||||
}
|
||||
|
||||
// DM policy authorization
|
||||
const auth = authorizeUserForDm(payload.user_id, account.dmPolicy, account.allowedUserIds);
|
||||
if (!auth.allowed) {
|
||||
if (auth.reason === "disabled") {
|
||||
respondJson(res, 403, { error: "DMs are disabled" });
|
||||
return;
|
||||
}
|
||||
if (auth.reason === "allowlist-empty") {
|
||||
log?.warn("Synology Chat allowlist is empty while dmPolicy=allowlist; rejecting message");
|
||||
respondJson(res, 403, {
|
||||
error: "Allowlist is empty. Configure allowedUserIds or use dmPolicy=open.",
|
||||
});
|
||||
return;
|
||||
}
|
||||
log?.warn(`Unauthorized user: ${payload.user_id}`);
|
||||
respondJson(res, 403, { error: "User not authorized" });
|
||||
return;
|
||||
}
|
||||
|
||||
// Rate limit
|
||||
if (!rateLimiter.check(payload.user_id)) {
|
||||
log?.warn(`Rate limit exceeded for user: ${payload.user_id}`);
|
||||
respondJson(res, 429, { error: "Rate limit exceeded" });
|
||||
return;
|
||||
}
|
||||
|
||||
// Sanitize input
|
||||
let cleanText = sanitizeInput(payload.text);
|
||||
|
||||
// Strip trigger word
|
||||
if (payload.trigger_word && cleanText.startsWith(payload.trigger_word)) {
|
||||
cleanText = cleanText.slice(payload.trigger_word.length).trim();
|
||||
}
|
||||
|
||||
if (!cleanText) {
|
||||
respondNoContent(res);
|
||||
return;
|
||||
}
|
||||
|
||||
const preview = cleanText.length > 100 ? `${cleanText.slice(0, 100)}...` : cleanText;
|
||||
log?.info(`Message from ${payload.username} (${payload.user_id}): ${preview}`);
|
||||
log?.info(
|
||||
`Message from ${authorized.message.payload.username} (${authorized.message.payload.user_id}): ${authorized.message.preview}`,
|
||||
);
|
||||
|
||||
// ACK immediately so Synology Chat won't remain in "Processing..."
|
||||
respondNoContent(res);
|
||||
|
||||
// Default to webhook user_id; may be replaced with Chat API user_id below.
|
||||
let replyUserId = payload.user_id;
|
||||
|
||||
// Deliver to agent asynchronously (with 120s timeout to match nginx proxy_read_timeout)
|
||||
try {
|
||||
// Resolve the Chat-internal user_id for sending replies.
|
||||
// Synology Chat outgoing webhooks use a per-integration user_id that may
|
||||
// differ from the global Chat API user_id required by method=chatbot.
|
||||
// We resolve via the user_list API, matching by nickname/username.
|
||||
const chatUserId = await resolveChatUserId(
|
||||
account.incomingUrl,
|
||||
payload.username,
|
||||
account.allowInsecureSsl,
|
||||
log,
|
||||
);
|
||||
if (chatUserId !== undefined) {
|
||||
replyUserId = String(chatUserId);
|
||||
} else {
|
||||
log?.warn(
|
||||
`Could not resolve Chat API user_id for "${payload.username}" — falling back to webhook user_id ${payload.user_id}. Reply delivery may fail.`,
|
||||
);
|
||||
}
|
||||
|
||||
const deliverPromise = deliver({
|
||||
body: cleanText,
|
||||
from: payload.user_id,
|
||||
senderName: payload.username,
|
||||
provider: "synology-chat",
|
||||
chatType: "direct",
|
||||
accountId: account.accountId,
|
||||
commandAuthorized: auth.allowed,
|
||||
chatUserId: replyUserId,
|
||||
});
|
||||
|
||||
const timeoutPromise = new Promise<null>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("Agent response timeout (120s)")), 120_000),
|
||||
);
|
||||
|
||||
const reply = await Promise.race([deliverPromise, timeoutPromise]);
|
||||
|
||||
// Send reply back to Synology Chat using the resolved Chat user_id
|
||||
if (reply) {
|
||||
await sendMessage(account.incomingUrl, reply, replyUserId, account.allowInsecureSsl);
|
||||
const replyPreview = reply.length > 100 ? `${reply.slice(0, 100)}...` : reply;
|
||||
log?.info(`Reply sent to ${payload.username} (${replyUserId}): ${replyPreview}`);
|
||||
}
|
||||
} catch (err) {
|
||||
const errMsg = err instanceof Error ? `${err.message}\n${err.stack}` : String(err);
|
||||
log?.error(`Failed to process message from ${payload.username}: ${errMsg}`);
|
||||
await sendMessage(
|
||||
account.incomingUrl,
|
||||
"Sorry, an error occurred while processing your message.",
|
||||
replyUserId,
|
||||
account.allowInsecureSsl,
|
||||
);
|
||||
}
|
||||
await processAuthorizedSynologyWebhook({
|
||||
account,
|
||||
deliver,
|
||||
log,
|
||||
message: authorized.message,
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user