import os from "node:os"; import { resolveLoggerBackedRuntime } from "openclaw/plugin-sdk/extension-shared"; import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; import { resolveNextcloudTalkAccount } from "./accounts.js"; import { handleNextcloudTalkInbound } from "./inbound.js"; import { createNextcloudTalkWebhookServer, processNextcloudTalkReplayGuardedMessage, } from "./monitor.js"; import { createNextcloudTalkReplayGuard } from "./replay-guard.js"; import { getNextcloudTalkRuntime } from "./runtime.js"; import type { CoreConfig, NextcloudTalkInboundMessage } from "./types.js"; const DEFAULT_WEBHOOK_PORT = 8788; const DEFAULT_WEBHOOK_HOST = "0.0.0.0"; const DEFAULT_WEBHOOK_PATH = "/nextcloud-talk-webhook"; function normalizeOrigin(value: string): string | null { try { return normalizeLowercaseStringOrEmpty(new URL(value).origin); } catch { return null; } } type NextcloudTalkMonitorOptions = { accountId?: string; config?: CoreConfig; runtime?: RuntimeEnv; abortSignal?: AbortSignal; onMessage?: (message: NextcloudTalkInboundMessage) => void | Promise; statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; }; export async function monitorNextcloudTalkProvider( opts: NextcloudTalkMonitorOptions, ): Promise<{ stop: () => void }> { const core = getNextcloudTalkRuntime(); const cfg = opts.config ?? (core.config.current() as CoreConfig); const account = resolveNextcloudTalkAccount({ cfg, accountId: opts.accountId, }); const runtime: RuntimeEnv = resolveLoggerBackedRuntime( opts.runtime, core.logging.getChildLogger(), ); if (!account.secret) { throw new Error(`Nextcloud Talk bot secret not configured for account "${account.accountId}"`); } const port = account.config.webhookPort ?? DEFAULT_WEBHOOK_PORT; const host = account.config.webhookHost ?? DEFAULT_WEBHOOK_HOST; const path = account.config.webhookPath ?? DEFAULT_WEBHOOK_PATH; const logger = core.logging.getChildLogger({ channel: "nextcloud-talk", accountId: account.accountId, }); const expectedBackendOrigin = normalizeOrigin(account.baseUrl); const replayGuard = createNextcloudTalkReplayGuard({ stateDir: core.state.resolveStateDir(process.env, os.homedir), onDiskError: (error) => { logger.warn( `[nextcloud-talk:${account.accountId}] replay guard disk error: ${String(error)}`, ); }, }); const { start, stop } = createNextcloudTalkWebhookServer({ port, host, path, secret: account.secret, isBackendAllowed: (backend) => { if (!expectedBackendOrigin) { return true; } const backendOrigin = normalizeOrigin(backend); return backendOrigin === expectedBackendOrigin; }, processMessage: async (message) => { const result = await processNextcloudTalkReplayGuardedMessage({ replayGuard, accountId: account.accountId, message, handleMessage: async () => { core.channel.activity.record({ channel: "nextcloud-talk", accountId: account.accountId, direction: "inbound", at: message.timestamp, }); if (opts.onMessage) { await opts.onMessage(message); } else { await handleNextcloudTalkInbound({ message, account, config: cfg, runtime, statusSink: opts.statusSink, }); } }, }); if (result === "duplicate") { logger.warn( `[nextcloud-talk:${account.accountId}] replayed webhook ignored room=${message.roomToken} messageId=${message.messageId}`, ); return; } }, onMessage: async () => {}, onError: (error) => { logger.error(`[nextcloud-talk:${account.accountId}] webhook error: ${error.message}`); }, abortSignal: opts.abortSignal, }); if (opts.abortSignal?.aborted) { return { stop }; } await start(); if (opts.abortSignal?.aborted) { stop(); return { stop }; } const publicUrl = account.config.webhookPublicUrl ?? `http://${host === "0.0.0.0" ? "localhost" : host}:${port}${path}`; logger.info(`[nextcloud-talk:${account.accountId}] webhook listening on ${publicUrl}`); return { stop }; }