fix(nextcloud-talk): release replay claims on handler failure

This commit is contained in:
Vincent Koc
2026-04-13 15:19:14 +01:00
parent 4ce3f3eafc
commit df9a38120b
5 changed files with 172 additions and 26 deletions

View File

@@ -207,6 +207,39 @@ describe("nextcloud talk core", () => {
expect(accountBFirst).toBe(true);
});
it("releases in-flight replay claims when processing fails", async () => {
const stateDir = await makeTempDir();
const guard = createNextcloudTalkReplayGuard({ stateDir });
const firstClaim = await guard.claimMessage({
accountId: "account-a",
roomToken: "room-1",
messageId: "msg-claim",
});
const secondClaim = await guard.claimMessage({
accountId: "account-a",
roomToken: "room-1",
messageId: "msg-claim",
});
expect(firstClaim).toBe("claimed");
expect(secondClaim).toBe("inflight");
guard.releaseMessage({
accountId: "account-a",
roomToken: "room-1",
messageId: "msg-claim",
error: new Error("transient"),
});
const retryClaim = await guard.claimMessage({
accountId: "account-a",
roomToken: "room-1",
messageId: "msg-claim",
});
expect(retryClaim).toBe("claimed");
});
it("resolves allowlist matches and group policy decisions", () => {
expect(
resolveNextcloudTalkAllowlistMatch({

View File

@@ -105,6 +105,41 @@ describe("createNextcloudTalkWebhookServer replay handling", () => {
expect(shouldProcessMessage).toHaveBeenCalledTimes(2);
expect(onMessage).toHaveBeenCalledTimes(1);
});
it("allows a retry after processMessage fails before replay commit", async () => {
let attempts = 0;
const onError = vi.fn();
const processMessage = vi.fn(async () => {
attempts += 1;
if (attempts === 1) {
throw new Error("transient nextcloud failure");
}
});
const harness = await startWebhookServer({
path: "/nextcloud-replay-process",
processMessage,
onMessage: vi.fn(),
onError,
});
const { body, headers } = createSignedCreateMessageRequest();
const first = await fetch(harness.webhookUrl, {
method: "POST",
headers,
body,
});
const second = await fetch(harness.webhookUrl, {
method: "POST",
headers,
body,
});
expect(first.status).toBe(200);
expect(second.status).toBe(200);
expect(processMessage).toHaveBeenCalledTimes(2);
expect(onError).toHaveBeenCalledTimes(1);
});
});
describe("createNextcloudTalkWebhookServer payload validation", () => {

View File

@@ -210,6 +210,7 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe
const readBody = opts.readBody ?? readNextcloudTalkWebhookBody;
const isBackendAllowed = opts.isBackendAllowed;
const shouldProcessMessage = opts.shouldProcessMessage;
const processMessage = opts.processMessage;
const webhookAuthRateLimiter = createAuthRateLimiter({
maxAttempts: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests,
windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs,
@@ -275,6 +276,16 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe
}
const message = decoded.message;
if (processMessage) {
writeJsonResponse(res, 200);
try {
await processMessage(message);
} catch (err) {
onError?.(err instanceof Error ? err : new Error(formatError(err)));
}
return;
}
if (shouldProcessMessage) {
const shouldProcess = await shouldProcessMessage(message);
if (!shouldProcess) {
@@ -392,38 +403,53 @@ export async function monitorNextcloudTalkProvider(
const backendOrigin = normalizeOrigin(backend);
return backendOrigin === expectedBackendOrigin;
},
shouldProcessMessage: async (message) => {
const shouldProcess = await replayGuard.shouldProcessMessage({
processMessage: async (message) => {
const claim = await replayGuard.claimMessage({
accountId: account.accountId,
roomToken: message.roomToken,
messageId: message.messageId,
});
if (!shouldProcess) {
if (claim !== "claimed") {
logger.warn(
`[nextcloud-talk:${account.accountId}] replayed webhook ignored room=${message.roomToken} messageId=${message.messageId}`,
);
}
return shouldProcess;
},
onMessage: async (message) => {
core.channel.activity.record({
channel: "nextcloud-talk",
accountId: account.accountId,
direction: "inbound",
at: message.timestamp,
});
if (opts.onMessage) {
await opts.onMessage(message);
return;
}
await handleNextcloudTalkInbound({
message,
account,
config: cfg,
runtime,
statusSink: opts.statusSink,
});
try {
core.channel.activity.record({
channel: "nextcloud-talk",
accountId: account.accountId,
direction: "inbound",
at: message.timestamp,
});
if (opts.onMessage) {
await opts.onMessage(message);
} else {
await handleNextcloudTalkInbound({
message,
account,
config: cfg,
runtime,
statusSink: opts.statusSink,
});
}
await replayGuard.commitMessage({
accountId: account.accountId,
roomToken: message.roomToken,
messageId: message.messageId,
});
} catch (error) {
replayGuard.releaseMessage({
accountId: account.accountId,
roomToken: message.roomToken,
messageId: message.messageId,
error,
});
throw error;
}
},
onMessage: async () => {},
onError: (error) => {
logger.error(`[nextcloud-talk:${account.accountId}] webhook error: ${error.message}`);
},

View File

@@ -1,5 +1,5 @@
import path from "node:path";
import { createPersistentDedupe } from "../runtime-api.js";
import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
const DEFAULT_REPLAY_TTL_MS = 24 * 60 * 60 * 1000;
const DEFAULT_MEMORY_MAX_SIZE = 1_000;
@@ -31,6 +31,22 @@ export type NextcloudTalkReplayGuardOptions = {
};
export type NextcloudTalkReplayGuard = {
claimMessage: (params: {
accountId: string;
roomToken: string;
messageId: string;
}) => Promise<"claimed" | "duplicate" | "inflight" | "invalid">;
commitMessage: (params: {
accountId: string;
roomToken: string;
messageId: string;
}) => Promise<boolean>;
releaseMessage: (params: {
accountId: string;
roomToken: string;
messageId: string;
error?: unknown;
}) => void;
shouldProcessMessage: (params: {
accountId: string;
roomToken: string;
@@ -42,23 +58,58 @@ export function createNextcloudTalkReplayGuard(
options: NextcloudTalkReplayGuardOptions,
): NextcloudTalkReplayGuard {
const stateDir = options.stateDir.trim();
const persistentDedupe = createPersistentDedupe({
const dedupe = createClaimableDedupe({
ttlMs: options.ttlMs ?? DEFAULT_REPLAY_TTL_MS,
memoryMaxSize: options.memoryMaxSize ?? DEFAULT_MEMORY_MAX_SIZE,
fileMaxEntries: options.fileMaxEntries ?? DEFAULT_FILE_MAX_ENTRIES,
resolveFilePath: (namespace) =>
path.join(stateDir, "nextcloud-talk", "replay-dedupe", `${sanitizeSegment(namespace)}.json`),
onDiskError: options.onDiskError,
});
return {
claimMessage: async ({ accountId, roomToken, messageId }) => {
const replayKey = buildReplayKey({ roomToken, messageId });
if (!replayKey) {
return "invalid";
}
const result = await dedupe.claim(replayKey, {
namespace: accountId,
});
return result.kind;
},
commitMessage: async ({ accountId, roomToken, messageId }) => {
const replayKey = buildReplayKey({ roomToken, messageId });
if (!replayKey) {
return true;
}
return await dedupe.commit(replayKey, {
namespace: accountId,
});
},
releaseMessage: ({ accountId, roomToken, messageId, error }) => {
const replayKey = buildReplayKey({ roomToken, messageId });
if (!replayKey) {
return;
}
dedupe.release(replayKey, {
namespace: accountId,
error,
});
},
shouldProcessMessage: async ({ accountId, roomToken, messageId }) => {
const replayKey = buildReplayKey({ roomToken, messageId });
if (!replayKey) {
return true;
}
return await persistentDedupe.checkAndRecord(replayKey, {
const result = await dedupe.claim(replayKey, {
namespace: accountId,
});
if (result.kind !== "claimed") {
return false;
}
return await dedupe.commit(replayKey, {
namespace: accountId,
onDiskError: options.onDiskError,
});
},
};

View File

@@ -182,6 +182,7 @@ export type NextcloudTalkWebhookServerOptions = {
readBody?: (req: import("node:http").IncomingMessage, maxBodyBytes: number) => Promise<string>;
isBackendAllowed?: (backend: string) => boolean;
shouldProcessMessage?: (message: NextcloudTalkInboundMessage) => boolean | Promise<boolean>;
processMessage?: (message: NextcloudTalkInboundMessage) => void | Promise<void>;
onMessage: (message: NextcloudTalkInboundMessage) => void | Promise<void>;
onError?: (error: Error) => void;
abortSignal?: AbortSignal;