diff --git a/extensions/nextcloud-talk/src/monitor.replay.test.ts b/extensions/nextcloud-talk/src/monitor.replay.test.ts index a8bea1a2dab..63325e2fcc1 100644 --- a/extensions/nextcloud-talk/src/monitor.replay.test.ts +++ b/extensions/nextcloud-talk/src/monitor.replay.test.ts @@ -1,12 +1,31 @@ -import { describe, expect, it, vi } from "vitest"; +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { createMockIncomingRequest } from "../../../test/helpers/mock-incoming-request.js"; import { WEBHOOK_RATE_LIMIT_DEFAULTS } from "../runtime-api.js"; -import { readNextcloudTalkWebhookBody } from "./monitor.js"; +import { + NextcloudTalkRetryableWebhookError, + processNextcloudTalkReplayGuardedMessage, + readNextcloudTalkWebhookBody, +} from "./monitor.js"; import { createSignedCreateMessageRequest } from "./monitor.test-fixtures.js"; import { startWebhookServer } from "./monitor.test-harness.js"; +import { createNextcloudTalkReplayGuard } from "./replay-guard.js"; import { generateNextcloudTalkSignature } from "./signature.js"; import type { NextcloudTalkInboundMessage } from "./types.js"; +const tempDirs: string[] = []; + +afterEach(() => { + while (tempDirs.length > 0) { + const dir = tempDirs.pop(); + if (dir) { + fs.rmSync(dir, { recursive: true, force: true }); + } + } +}); + describe("readNextcloudTalkWebhookBody", () => { it("reads valid body within max bytes", async () => { const req = createMockIncomingRequest(['{"type":"Create"}']); @@ -71,6 +90,24 @@ describe("createNextcloudTalkWebhookServer backend allowlist", () => { }); describe("createNextcloudTalkWebhookServer replay handling", () => { + function createReplayAwareProcessMessage(params: { + stateDir: string; + accountId?: string; + handleMessage: (message: NextcloudTalkInboundMessage) => Promise; + }) { + const replayGuard = createNextcloudTalkReplayGuard({ + stateDir: params.stateDir, + }); + + return async (message: NextcloudTalkInboundMessage) => + await processNextcloudTalkReplayGuardedMessage({ + replayGuard, + accountId: params.accountId ?? "acct", + message, + handleMessage: () => params.handleMessage(message), + }); + } + it("acknowledges replayed requests and skips onMessage side effects", async () => { const seen = new Set(); const onMessage = vi.fn(async () => {}); @@ -107,14 +144,22 @@ describe("createNextcloudTalkWebhookServer replay handling", () => { }); it("allows a retry after processMessage fails before replay commit", async () => { + const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "nextcloud-talk-replay-")); + tempDirs.push(stateDir); let attempts = 0; const onError = vi.fn(); - const processMessage = vi.fn(async () => { + const handleMessage = vi.fn(async () => { attempts += 1; if (attempts === 1) { - throw new Error("transient nextcloud failure"); + throw new NextcloudTalkRetryableWebhookError("transient nextcloud failure"); } }); + const processMessage = vi.fn( + createReplayAwareProcessMessage({ + stateDir, + handleMessage, + }), + ); const harness = await startWebhookServer({ path: "/nextcloud-replay-process", processMessage, @@ -129,6 +174,7 @@ describe("createNextcloudTalkWebhookServer replay handling", () => { headers, body, }); + await vi.waitFor(() => expect(onError).toHaveBeenCalledTimes(1)); const second = await fetch(harness.webhookUrl, { method: "POST", headers, @@ -137,7 +183,50 @@ describe("createNextcloudTalkWebhookServer replay handling", () => { expect(first.status).toBe(200); expect(second.status).toBe(200); - expect(processMessage).toHaveBeenCalledTimes(2); + await vi.waitFor(() => expect(handleMessage).toHaveBeenCalledTimes(2)); + expect(onError).toHaveBeenCalledTimes(1); + }); + + it("keeps replay committed after a non-retryable processMessage failure", async () => { + const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "nextcloud-talk-replay-")); + tempDirs.push(stateDir); + const onError = vi.fn(); + const visibleSideEffect = vi.fn(); + const handleMessage = vi.fn(async () => { + visibleSideEffect(); + throw new Error("post-send failure"); + }); + const processMessage = vi.fn( + createReplayAwareProcessMessage({ + stateDir, + handleMessage, + }), + ); + const harness = await startWebhookServer({ + path: "/nextcloud-replay-post-send", + processMessage, + onMessage: vi.fn(), + onError, + }); + + const { body, headers } = createSignedCreateMessageRequest(); + + const first = await fetch(harness.webhookUrl, { + method: "POST", + headers, + body, + }); + await vi.waitFor(() => expect(onError).toHaveBeenCalledTimes(1)); + const second = await fetch(harness.webhookUrl, { + method: "POST", + headers, + body, + }); + + expect(first.status).toBe(200); + expect(second.status).toBe(200); + expect(handleMessage).toHaveBeenCalledTimes(1); + expect(visibleSideEffect).toHaveBeenCalledTimes(1); expect(onError).toHaveBeenCalledTimes(1); }); }); diff --git a/extensions/nextcloud-talk/src/monitor.ts b/extensions/nextcloud-talk/src/monitor.ts index 1a45e87ce95..48202750668 100644 --- a/extensions/nextcloud-talk/src/monitor.ts +++ b/extensions/nextcloud-talk/src/monitor.ts @@ -16,7 +16,7 @@ import { } from "../runtime-api.js"; import { resolveNextcloudTalkAccount } from "./accounts.js"; import { handleNextcloudTalkInbound } from "./inbound.js"; -import { createNextcloudTalkReplayGuard } from "./replay-guard.js"; +import { createNextcloudTalkReplayGuard, type NextcloudTalkReplayGuard } from "./replay-guard.js"; import { getNextcloudTalkRuntime } from "./runtime.js"; import { extractNextcloudTalkHeaders, verifyNextcloudTalkSignature } from "./signature.js"; import type { @@ -64,6 +64,57 @@ const WEBHOOK_ERRORS = { internalServerError: "Internal server error", } as const; +export class NextcloudTalkRetryableWebhookError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "NextcloudTalkRetryableWebhookError"; + } +} + +export async function processNextcloudTalkReplayGuardedMessage(params: { + replayGuard: NextcloudTalkReplayGuard; + accountId: string; + message: NextcloudTalkInboundMessage; + handleMessage: () => Promise; +}): Promise<"processed" | "duplicate"> { + const claim = await params.replayGuard.claimMessage({ + accountId: params.accountId, + roomToken: params.message.roomToken, + messageId: params.message.messageId, + }); + if (claim !== "claimed") { + return "duplicate"; + } + + try { + await params.handleMessage(); + await params.replayGuard.commitMessage({ + accountId: params.accountId, + roomToken: params.message.roomToken, + messageId: params.message.messageId, + }); + return "processed"; + } catch (error) { + if (error instanceof NextcloudTalkRetryableWebhookError) { + params.replayGuard.releaseMessage({ + accountId: params.accountId, + roomToken: params.message.roomToken, + messageId: params.message.messageId, + error, + }); + } else { + // Generic failures are treated as non-retryable because the handler may already + // have produced a visible side effect, and replaying the webhook would duplicate it. + await params.replayGuard.commitMessage({ + accountId: params.accountId, + roomToken: params.message.roomToken, + messageId: params.message.messageId, + }); + } + throw error; + } +} + function formatError(err: unknown): string { if (err instanceof Error) { return err.message; @@ -404,50 +455,36 @@ export async function monitorNextcloudTalkProvider( return backendOrigin === expectedBackendOrigin; }, processMessage: async (message) => { - const claim = await replayGuard.claimMessage({ + const result = await processNextcloudTalkReplayGuardedMessage({ + replayGuard, accountId: account.accountId, - roomToken: message.roomToken, - messageId: message.messageId, + message, + handleMessage: async () => { + core.channel.activity.record({ + channel: "nextcloud-talk", + accountId: account.accountId, + direction: "inbound", + at: message.timestamp, + }); + if (opts.onMessage) { + await opts.onMessage(message); + } else { + await handleNextcloudTalkInbound({ + message, + account, + config: cfg, + runtime, + statusSink: opts.statusSink, + }); + } + }, }); - if (claim !== "claimed") { + if (result === "duplicate") { logger.warn( `[nextcloud-talk:${account.accountId}] replayed webhook ignored room=${message.roomToken} messageId=${message.messageId}`, ); return; } - - try { - core.channel.activity.record({ - channel: "nextcloud-talk", - accountId: account.accountId, - direction: "inbound", - at: message.timestamp, - }); - if (opts.onMessage) { - await opts.onMessage(message); - } else { - await handleNextcloudTalkInbound({ - message, - account, - config: cfg, - runtime, - statusSink: opts.statusSink, - }); - } - await replayGuard.commitMessage({ - accountId: account.accountId, - roomToken: message.roomToken, - messageId: message.messageId, - }); - } catch (error) { - replayGuard.releaseMessage({ - accountId: account.accountId, - roomToken: message.roomToken, - messageId: message.messageId, - error, - }); - throw error; - } }, onMessage: async () => {}, onError: (error) => {