fix(nextcloud-talk): make replay retries explicit

This commit is contained in:
Vincent Koc
2026-04-13 15:34:01 +01:00
parent 143c1e81a2
commit 2c22a15719
2 changed files with 169 additions and 43 deletions

View File

@@ -1,12 +1,31 @@
import { describe, expect, it, vi } from "vitest";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { createMockIncomingRequest } from "../../../test/helpers/mock-incoming-request.js";
import { WEBHOOK_RATE_LIMIT_DEFAULTS } from "../runtime-api.js";
import { readNextcloudTalkWebhookBody } from "./monitor.js";
import {
NextcloudTalkRetryableWebhookError,
processNextcloudTalkReplayGuardedMessage,
readNextcloudTalkWebhookBody,
} from "./monitor.js";
import { createSignedCreateMessageRequest } from "./monitor.test-fixtures.js";
import { startWebhookServer } from "./monitor.test-harness.js";
import { createNextcloudTalkReplayGuard } from "./replay-guard.js";
import { generateNextcloudTalkSignature } from "./signature.js";
import type { NextcloudTalkInboundMessage } from "./types.js";
const tempDirs: string[] = [];
afterEach(() => {
while (tempDirs.length > 0) {
const dir = tempDirs.pop();
if (dir) {
fs.rmSync(dir, { recursive: true, force: true });
}
}
});
describe("readNextcloudTalkWebhookBody", () => {
it("reads valid body within max bytes", async () => {
const req = createMockIncomingRequest(['{"type":"Create"}']);
@@ -71,6 +90,24 @@ describe("createNextcloudTalkWebhookServer backend allowlist", () => {
});
describe("createNextcloudTalkWebhookServer replay handling", () => {
function createReplayAwareProcessMessage(params: {
stateDir: string;
accountId?: string;
handleMessage: (message: NextcloudTalkInboundMessage) => Promise<void>;
}) {
const replayGuard = createNextcloudTalkReplayGuard({
stateDir: params.stateDir,
});
return async (message: NextcloudTalkInboundMessage) =>
await processNextcloudTalkReplayGuardedMessage({
replayGuard,
accountId: params.accountId ?? "acct",
message,
handleMessage: () => params.handleMessage(message),
});
}
it("acknowledges replayed requests and skips onMessage side effects", async () => {
const seen = new Set<string>();
const onMessage = vi.fn(async () => {});
@@ -107,14 +144,22 @@ describe("createNextcloudTalkWebhookServer replay handling", () => {
});
it("allows a retry after processMessage fails before replay commit", async () => {
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "nextcloud-talk-replay-"));
tempDirs.push(stateDir);
let attempts = 0;
const onError = vi.fn();
const processMessage = vi.fn(async () => {
const handleMessage = vi.fn(async () => {
attempts += 1;
if (attempts === 1) {
throw new Error("transient nextcloud failure");
throw new NextcloudTalkRetryableWebhookError("transient nextcloud failure");
}
});
const processMessage = vi.fn(
createReplayAwareProcessMessage({
stateDir,
handleMessage,
}),
);
const harness = await startWebhookServer({
path: "/nextcloud-replay-process",
processMessage,
@@ -129,6 +174,7 @@ describe("createNextcloudTalkWebhookServer replay handling", () => {
headers,
body,
});
await vi.waitFor(() => expect(onError).toHaveBeenCalledTimes(1));
const second = await fetch(harness.webhookUrl, {
method: "POST",
headers,
@@ -137,7 +183,50 @@ describe("createNextcloudTalkWebhookServer replay handling", () => {
expect(first.status).toBe(200);
expect(second.status).toBe(200);
expect(processMessage).toHaveBeenCalledTimes(2);
await vi.waitFor(() => expect(handleMessage).toHaveBeenCalledTimes(2));
expect(onError).toHaveBeenCalledTimes(1);
});
it("keeps replay committed after a non-retryable processMessage failure", async () => {
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "nextcloud-talk-replay-"));
tempDirs.push(stateDir);
const onError = vi.fn();
const visibleSideEffect = vi.fn();
const handleMessage = vi.fn(async () => {
visibleSideEffect();
throw new Error("post-send failure");
});
const processMessage = vi.fn(
createReplayAwareProcessMessage({
stateDir,
handleMessage,
}),
);
const harness = await startWebhookServer({
path: "/nextcloud-replay-post-send",
processMessage,
onMessage: vi.fn(),
onError,
});
const { body, headers } = createSignedCreateMessageRequest();
const first = await fetch(harness.webhookUrl, {
method: "POST",
headers,
body,
});
await vi.waitFor(() => expect(onError).toHaveBeenCalledTimes(1));
const second = await fetch(harness.webhookUrl, {
method: "POST",
headers,
body,
});
expect(first.status).toBe(200);
expect(second.status).toBe(200);
expect(handleMessage).toHaveBeenCalledTimes(1);
expect(visibleSideEffect).toHaveBeenCalledTimes(1);
expect(onError).toHaveBeenCalledTimes(1);
});
});

View File

@@ -16,7 +16,7 @@ import {
} from "../runtime-api.js";
import { resolveNextcloudTalkAccount } from "./accounts.js";
import { handleNextcloudTalkInbound } from "./inbound.js";
import { createNextcloudTalkReplayGuard } from "./replay-guard.js";
import { createNextcloudTalkReplayGuard, type NextcloudTalkReplayGuard } from "./replay-guard.js";
import { getNextcloudTalkRuntime } from "./runtime.js";
import { extractNextcloudTalkHeaders, verifyNextcloudTalkSignature } from "./signature.js";
import type {
@@ -64,6 +64,57 @@ const WEBHOOK_ERRORS = {
internalServerError: "Internal server error",
} as const;
export class NextcloudTalkRetryableWebhookError extends Error {
constructor(message: string, options?: ErrorOptions) {
super(message, options);
this.name = "NextcloudTalkRetryableWebhookError";
}
}
export async function processNextcloudTalkReplayGuardedMessage(params: {
replayGuard: NextcloudTalkReplayGuard;
accountId: string;
message: NextcloudTalkInboundMessage;
handleMessage: () => Promise<void>;
}): Promise<"processed" | "duplicate"> {
const claim = await params.replayGuard.claimMessage({
accountId: params.accountId,
roomToken: params.message.roomToken,
messageId: params.message.messageId,
});
if (claim !== "claimed") {
return "duplicate";
}
try {
await params.handleMessage();
await params.replayGuard.commitMessage({
accountId: params.accountId,
roomToken: params.message.roomToken,
messageId: params.message.messageId,
});
return "processed";
} catch (error) {
if (error instanceof NextcloudTalkRetryableWebhookError) {
params.replayGuard.releaseMessage({
accountId: params.accountId,
roomToken: params.message.roomToken,
messageId: params.message.messageId,
error,
});
} else {
// Generic failures are treated as non-retryable because the handler may already
// have produced a visible side effect, and replaying the webhook would duplicate it.
await params.replayGuard.commitMessage({
accountId: params.accountId,
roomToken: params.message.roomToken,
messageId: params.message.messageId,
});
}
throw error;
}
}
function formatError(err: unknown): string {
if (err instanceof Error) {
return err.message;
@@ -404,50 +455,36 @@ export async function monitorNextcloudTalkProvider(
return backendOrigin === expectedBackendOrigin;
},
processMessage: async (message) => {
const claim = await replayGuard.claimMessage({
const result = await processNextcloudTalkReplayGuardedMessage({
replayGuard,
accountId: account.accountId,
roomToken: message.roomToken,
messageId: message.messageId,
message,
handleMessage: async () => {
core.channel.activity.record({
channel: "nextcloud-talk",
accountId: account.accountId,
direction: "inbound",
at: message.timestamp,
});
if (opts.onMessage) {
await opts.onMessage(message);
} else {
await handleNextcloudTalkInbound({
message,
account,
config: cfg,
runtime,
statusSink: opts.statusSink,
});
}
},
});
if (claim !== "claimed") {
if (result === "duplicate") {
logger.warn(
`[nextcloud-talk:${account.accountId}] replayed webhook ignored room=${message.roomToken} messageId=${message.messageId}`,
);
return;
}
try {
core.channel.activity.record({
channel: "nextcloud-talk",
accountId: account.accountId,
direction: "inbound",
at: message.timestamp,
});
if (opts.onMessage) {
await opts.onMessage(message);
} else {
await handleNextcloudTalkInbound({
message,
account,
config: cfg,
runtime,
statusSink: opts.statusSink,
});
}
await replayGuard.commitMessage({
accountId: account.accountId,
roomToken: message.roomToken,
messageId: message.messageId,
});
} catch (error) {
replayGuard.releaseMessage({
accountId: account.accountId,
roomToken: message.roomToken,
messageId: message.messageId,
error,
});
throw error;
}
},
onMessage: async () => {},
onError: (error) => {