test: trim messaging test hotspots

This commit is contained in:
Peter Steinberger
2026-04-17 03:55:18 +01:00
parent 35fb3f7e1c
commit a2f2e5738e
14 changed files with 282 additions and 290 deletions

View File

@@ -0,0 +1 @@
export { createAuthRateLimiter } from "openclaw/plugin-sdk/nextcloud-talk";

View File

@@ -11,13 +11,9 @@ const hoisted = vi.hoisted(() => ({
monitorNextcloudTalkProvider: vi.fn(),
}));
vi.mock("./monitor.js", async () => {
const actual = await vi.importActual<typeof import("./monitor.js")>("./monitor.js");
return {
...actual,
monitorNextcloudTalkProvider: hoisted.monitorNextcloudTalkProvider,
};
});
vi.mock("./monitor-runtime.js", () => ({
monitorNextcloudTalkProvider: hoisted.monitorNextcloudTalkProvider,
}));
const { nextcloudTalkGatewayAdapter } = await import("./gateway.js");

View File

@@ -7,7 +7,7 @@ import {
type ChannelPlugin,
type OpenClawConfig,
} from "./channel-api.js";
import { monitorNextcloudTalkProvider } from "./monitor.js";
import { monitorNextcloudTalkProvider } from "./monitor-runtime.js";
import { getNextcloudTalkRuntime } from "./runtime.js";
import type { CoreConfig } from "./types.js";

View File

@@ -0,0 +1,138 @@
import os from "node:os";
import { resolveLoggerBackedRuntime } from "openclaw/plugin-sdk/extension-shared";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime";
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime";
import { resolveNextcloudTalkAccount } from "./accounts.js";
import { handleNextcloudTalkInbound } from "./inbound.js";
import {
createNextcloudTalkWebhookServer,
processNextcloudTalkReplayGuardedMessage,
} from "./monitor.js";
import { createNextcloudTalkReplayGuard } from "./replay-guard.js";
import { getNextcloudTalkRuntime } from "./runtime.js";
import type { CoreConfig, NextcloudTalkInboundMessage } from "./types.js";
const DEFAULT_WEBHOOK_PORT = 8788;
const DEFAULT_WEBHOOK_HOST = "0.0.0.0";
const DEFAULT_WEBHOOK_PATH = "/nextcloud-talk-webhook";
function normalizeOrigin(value: string): string | null {
try {
return normalizeLowercaseStringOrEmpty(new URL(value).origin);
} catch {
return null;
}
}
export type NextcloudTalkMonitorOptions = {
accountId?: string;
config?: CoreConfig;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
onMessage?: (message: NextcloudTalkInboundMessage) => void | Promise<void>;
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
};
export async function monitorNextcloudTalkProvider(
opts: NextcloudTalkMonitorOptions,
): Promise<{ stop: () => void }> {
const core = getNextcloudTalkRuntime();
const cfg = opts.config ?? (core.config.loadConfig() as CoreConfig);
const account = resolveNextcloudTalkAccount({
cfg,
accountId: opts.accountId,
});
const runtime: RuntimeEnv = resolveLoggerBackedRuntime(
opts.runtime,
core.logging.getChildLogger(),
);
if (!account.secret) {
throw new Error(`Nextcloud Talk bot secret not configured for account "${account.accountId}"`);
}
const port = account.config.webhookPort ?? DEFAULT_WEBHOOK_PORT;
const host = account.config.webhookHost ?? DEFAULT_WEBHOOK_HOST;
const path = account.config.webhookPath ?? DEFAULT_WEBHOOK_PATH;
const logger = core.logging.getChildLogger({
channel: "nextcloud-talk",
accountId: account.accountId,
});
const expectedBackendOrigin = normalizeOrigin(account.baseUrl);
const replayGuard = createNextcloudTalkReplayGuard({
stateDir: core.state.resolveStateDir(process.env, os.homedir),
onDiskError: (error) => {
logger.warn(
`[nextcloud-talk:${account.accountId}] replay guard disk error: ${String(error)}`,
);
},
});
const { start, stop } = createNextcloudTalkWebhookServer({
port,
host,
path,
secret: account.secret,
isBackendAllowed: (backend) => {
if (!expectedBackendOrigin) {
return true;
}
const backendOrigin = normalizeOrigin(backend);
return backendOrigin === expectedBackendOrigin;
},
processMessage: async (message) => {
const result = await processNextcloudTalkReplayGuardedMessage({
replayGuard,
accountId: account.accountId,
message,
handleMessage: async () => {
core.channel.activity.record({
channel: "nextcloud-talk",
accountId: account.accountId,
direction: "inbound",
at: message.timestamp,
});
if (opts.onMessage) {
await opts.onMessage(message);
} else {
await handleNextcloudTalkInbound({
message,
account,
config: cfg,
runtime,
statusSink: opts.statusSink,
});
}
},
});
if (result === "duplicate") {
logger.warn(
`[nextcloud-talk:${account.accountId}] replayed webhook ignored room=${message.roomToken} messageId=${message.messageId}`,
);
return;
}
},
onMessage: async () => {},
onError: (error) => {
logger.error(`[nextcloud-talk:${account.accountId}] webhook error: ${error.message}`);
},
abortSignal: opts.abortSignal,
});
if (opts.abortSignal?.aborted) {
return { stop };
}
await start();
if (opts.abortSignal?.aborted) {
stop();
return { stop };
}
const publicUrl =
account.config.webhookPublicUrl ??
`http://${host === "0.0.0.0" ? "localhost" : host}:${port}${path}`;
logger.info(`[nextcloud-talk:${account.accountId}] webhook listening on ${publicUrl}`);
return { stop };
}

View File

@@ -3,7 +3,6 @@ import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { createMockIncomingRequest } from "../../../test/helpers/mock-incoming-request.js";
import { WEBHOOK_RATE_LIMIT_DEFAULTS } from "../runtime-api.js";
import {
NextcloudTalkRetryableWebhookError,
processNextcloudTalkReplayGuardedMessage,
@@ -274,8 +273,10 @@ describe("createNextcloudTalkWebhookServer payload validation", () => {
describe("createNextcloudTalkWebhookServer auth rate limiting", () => {
it("rate limits repeated invalid signature attempts from the same source", async () => {
const maxRequests = 2;
const harness = await startWebhookServer({
path: "/nextcloud-auth-rate-limit",
authRateLimit: { maxRequests },
onMessage: vi.fn(),
});
const { body, headers } = createSignedCreateMessageRequest();
@@ -286,7 +287,7 @@ describe("createNextcloudTalkWebhookServer auth rate limiting", () => {
let firstResponse: Response | undefined;
let lastResponse: Response | undefined;
for (let attempt = 0; attempt <= WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests; attempt += 1) {
for (let attempt = 0; attempt <= maxRequests; attempt += 1) {
const response = await fetch(harness.webhookUrl, {
method: "POST",
headers: invalidHeaders,
@@ -306,14 +307,16 @@ describe("createNextcloudTalkWebhookServer auth rate limiting", () => {
});
it("does not rate limit valid signed webhook bursts from the same source", async () => {
const maxRequests = 2;
const harness = await startWebhookServer({
path: "/nextcloud-auth-rate-limit-valid",
authRateLimit: { maxRequests },
onMessage: vi.fn(),
});
const { body, headers } = createSignedCreateMessageRequest();
let lastResponse: Response | undefined;
for (let attempt = 0; attempt <= WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests; attempt += 1) {
for (let attempt = 0; attempt <= maxRequests; attempt += 1) {
lastResponse = await fetch(harness.webhookUrl, {
method: "POST",
headers,

View File

@@ -1,35 +1,22 @@
import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http";
import os from "node:os";
import {
resolveLoggerBackedRuntime,
safeParseJsonWithSchema,
} from "openclaw/plugin-sdk/extension-shared";
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime";
import { z } from "zod";
import { safeParseJsonWithSchema } from "openclaw/plugin-sdk/extension-shared";
import {
WEBHOOK_RATE_LIMIT_DEFAULTS,
createAuthRateLimiter,
type RuntimeEnv,
isRequestBodyLimitError,
readRequestBodyWithLimit,
requestBodyErrorToText,
} from "../runtime-api.js";
import { resolveNextcloudTalkAccount } from "./accounts.js";
import { handleNextcloudTalkInbound } from "./inbound.js";
import { createNextcloudTalkReplayGuard, type NextcloudTalkReplayGuard } from "./replay-guard.js";
import { getNextcloudTalkRuntime } from "./runtime.js";
} from "openclaw/plugin-sdk/webhook-ingress";
import { z } from "zod";
import { createAuthRateLimiter } from "./api.js";
import type { NextcloudTalkReplayGuard } from "./replay-guard.js";
import { extractNextcloudTalkHeaders, verifyNextcloudTalkSignature } from "./signature.js";
import type {
CoreConfig,
NextcloudTalkInboundMessage,
NextcloudTalkWebhookHeaders,
NextcloudTalkWebhookPayload,
NextcloudTalkWebhookServerOptions,
} from "./types.js";
const DEFAULT_WEBHOOK_PORT = 8788;
const DEFAULT_WEBHOOK_HOST = "0.0.0.0";
const DEFAULT_WEBHOOK_PATH = "/nextcloud-talk-webhook";
const DEFAULT_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024;
const PREAUTH_WEBHOOK_MAX_BODY_BYTES = 64 * 1024;
const PREAUTH_WEBHOOK_BODY_TIMEOUT_MS = 5_000;
@@ -122,14 +109,6 @@ function formatError(err: unknown): string {
return typeof err === "string" ? err : JSON.stringify(err);
}
function normalizeOrigin(value: string): string | null {
try {
return normalizeLowercaseStringOrEmpty(new URL(value).origin);
} catch {
return null;
}
}
function parseWebhookPayload(body: string): NextcloudTalkWebhookPayload | null {
return safeParseJsonWithSchema(NextcloudTalkWebhookPayloadSchema, body);
}
@@ -262,12 +241,20 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe
const isBackendAllowed = opts.isBackendAllowed;
const shouldProcessMessage = opts.shouldProcessMessage;
const processMessage = opts.processMessage;
const authRateLimitMaxRequests =
typeof opts.authRateLimit?.maxRequests === "number"
? opts.authRateLimit.maxRequests
: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests;
const authRateLimitWindowMs =
typeof opts.authRateLimit?.windowMs === "number"
? opts.authRateLimit.windowMs
: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs;
const webhookAuthRateLimiter = createAuthRateLimiter({
maxAttempts: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests,
windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs,
lockoutMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs,
maxAttempts: authRateLimitMaxRequests,
windowMs: authRateLimitWindowMs,
lockoutMs: authRateLimitWindowMs,
exemptLoopback: false,
pruneIntervalMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs,
pruneIntervalMs: authRateLimitWindowMs,
});
const server = createServer(async (req: IncomingMessage, res: ServerResponse) => {
@@ -396,116 +383,3 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe
return { server, start, stop };
}
export type NextcloudTalkMonitorOptions = {
accountId?: string;
config?: CoreConfig;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
onMessage?: (message: NextcloudTalkInboundMessage) => void | Promise<void>;
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
};
export async function monitorNextcloudTalkProvider(
opts: NextcloudTalkMonitorOptions,
): Promise<{ stop: () => void }> {
const core = getNextcloudTalkRuntime();
const cfg = opts.config ?? (core.config.loadConfig() as CoreConfig);
const account = resolveNextcloudTalkAccount({
cfg,
accountId: opts.accountId,
});
const runtime: RuntimeEnv = resolveLoggerBackedRuntime(
opts.runtime,
core.logging.getChildLogger(),
);
if (!account.secret) {
throw new Error(`Nextcloud Talk bot secret not configured for account "${account.accountId}"`);
}
const port = account.config.webhookPort ?? DEFAULT_WEBHOOK_PORT;
const host = account.config.webhookHost ?? DEFAULT_WEBHOOK_HOST;
const path = account.config.webhookPath ?? DEFAULT_WEBHOOK_PATH;
const logger = core.logging.getChildLogger({
channel: "nextcloud-talk",
accountId: account.accountId,
});
const expectedBackendOrigin = normalizeOrigin(account.baseUrl);
const replayGuard = createNextcloudTalkReplayGuard({
stateDir: core.state.resolveStateDir(process.env, os.homedir),
onDiskError: (error) => {
logger.warn(
`[nextcloud-talk:${account.accountId}] replay guard disk error: ${String(error)}`,
);
},
});
const { start, stop } = createNextcloudTalkWebhookServer({
port,
host,
path,
secret: account.secret,
isBackendAllowed: (backend) => {
if (!expectedBackendOrigin) {
return true;
}
const backendOrigin = normalizeOrigin(backend);
return backendOrigin === expectedBackendOrigin;
},
processMessage: async (message) => {
const result = await processNextcloudTalkReplayGuardedMessage({
replayGuard,
accountId: account.accountId,
message,
handleMessage: async () => {
core.channel.activity.record({
channel: "nextcloud-talk",
accountId: account.accountId,
direction: "inbound",
at: message.timestamp,
});
if (opts.onMessage) {
await opts.onMessage(message);
} else {
await handleNextcloudTalkInbound({
message,
account,
config: cfg,
runtime,
statusSink: opts.statusSink,
});
}
},
});
if (result === "duplicate") {
logger.warn(
`[nextcloud-talk:${account.accountId}] replayed webhook ignored room=${message.roomToken} messageId=${message.messageId}`,
);
return;
}
},
onMessage: async () => {},
onError: (error) => {
logger.error(`[nextcloud-talk:${account.accountId}] webhook error: ${error.message}`);
},
abortSignal: opts.abortSignal,
});
if (opts.abortSignal?.aborted) {
return { stop };
}
await start();
if (opts.abortSignal?.aborted) {
stop();
return { stop };
}
const publicUrl =
account.config.webhookPublicUrl ??
`http://${host === "0.0.0.0" ? "localhost" : host}:${port}${path}`;
logger.info(`[nextcloud-talk:${account.accountId}] webhook listening on ${publicUrl}`);
return { stop };
}

View File

@@ -179,6 +179,10 @@ export type NextcloudTalkWebhookServerOptions = {
path: string;
secret: string;
maxBodyBytes?: number;
authRateLimit?: {
maxRequests?: number;
windowMs?: number;
};
readBody?: (req: import("node:http").IncomingMessage, maxBodyBytes: number) => Promise<string>;
isBackendAllowed?: (backend: string) => boolean;
shouldProcessMessage?: (message: NextcloudTalkInboundMessage) => boolean | Promise<boolean>;