diff --git a/extensions/nextcloud-talk/src/core.test.ts b/extensions/nextcloud-talk/src/core.test.ts index 3b146e75898..ae931baee9a 100644 --- a/extensions/nextcloud-talk/src/core.test.ts +++ b/extensions/nextcloud-talk/src/core.test.ts @@ -207,6 +207,39 @@ describe("nextcloud talk core", () => { expect(accountBFirst).toBe(true); }); + it("releases in-flight replay claims when processing fails", async () => { + const stateDir = await makeTempDir(); + const guard = createNextcloudTalkReplayGuard({ stateDir }); + + const firstClaim = await guard.claimMessage({ + accountId: "account-a", + roomToken: "room-1", + messageId: "msg-claim", + }); + const secondClaim = await guard.claimMessage({ + accountId: "account-a", + roomToken: "room-1", + messageId: "msg-claim", + }); + + expect(firstClaim).toBe("claimed"); + expect(secondClaim).toBe("inflight"); + + guard.releaseMessage({ + accountId: "account-a", + roomToken: "room-1", + messageId: "msg-claim", + error: new Error("transient"), + }); + + const retryClaim = await guard.claimMessage({ + accountId: "account-a", + roomToken: "room-1", + messageId: "msg-claim", + }); + expect(retryClaim).toBe("claimed"); + }); + it("resolves allowlist matches and group policy decisions", () => { expect( resolveNextcloudTalkAllowlistMatch({ diff --git a/extensions/nextcloud-talk/src/monitor.replay.test.ts b/extensions/nextcloud-talk/src/monitor.replay.test.ts index 125a306a359..a8bea1a2dab 100644 --- a/extensions/nextcloud-talk/src/monitor.replay.test.ts +++ b/extensions/nextcloud-talk/src/monitor.replay.test.ts @@ -105,6 +105,41 @@ describe("createNextcloudTalkWebhookServer replay handling", () => { expect(shouldProcessMessage).toHaveBeenCalledTimes(2); expect(onMessage).toHaveBeenCalledTimes(1); }); + + it("allows a retry after processMessage fails before replay commit", async () => { + let attempts = 0; + const onError = vi.fn(); + const processMessage = vi.fn(async () => { + attempts += 1; + if (attempts === 1) { + throw new Error("transient nextcloud failure"); + } + }); + const harness = await startWebhookServer({ + path: "/nextcloud-replay-process", + processMessage, + onMessage: vi.fn(), + onError, + }); + + const { body, headers } = createSignedCreateMessageRequest(); + + const first = await fetch(harness.webhookUrl, { + method: "POST", + headers, + body, + }); + const second = await fetch(harness.webhookUrl, { + method: "POST", + headers, + body, + }); + + expect(first.status).toBe(200); + expect(second.status).toBe(200); + expect(processMessage).toHaveBeenCalledTimes(2); + expect(onError).toHaveBeenCalledTimes(1); + }); }); describe("createNextcloudTalkWebhookServer payload validation", () => { diff --git a/extensions/nextcloud-talk/src/monitor.ts b/extensions/nextcloud-talk/src/monitor.ts index 903f170f48f..1a45e87ce95 100644 --- a/extensions/nextcloud-talk/src/monitor.ts +++ b/extensions/nextcloud-talk/src/monitor.ts @@ -210,6 +210,7 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe const readBody = opts.readBody ?? readNextcloudTalkWebhookBody; const isBackendAllowed = opts.isBackendAllowed; const shouldProcessMessage = opts.shouldProcessMessage; + const processMessage = opts.processMessage; const webhookAuthRateLimiter = createAuthRateLimiter({ maxAttempts: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests, windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs, @@ -275,6 +276,16 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe } const message = decoded.message; + if (processMessage) { + writeJsonResponse(res, 200); + try { + await processMessage(message); + } catch (err) { + onError?.(err instanceof Error ? err : new Error(formatError(err))); + } + return; + } + if (shouldProcessMessage) { const shouldProcess = await shouldProcessMessage(message); if (!shouldProcess) { @@ -392,38 +403,53 @@ export async function monitorNextcloudTalkProvider( const backendOrigin = normalizeOrigin(backend); return backendOrigin === expectedBackendOrigin; }, - shouldProcessMessage: async (message) => { - const shouldProcess = await replayGuard.shouldProcessMessage({ + processMessage: async (message) => { + const claim = await replayGuard.claimMessage({ accountId: account.accountId, roomToken: message.roomToken, messageId: message.messageId, }); - if (!shouldProcess) { + if (claim !== "claimed") { logger.warn( `[nextcloud-talk:${account.accountId}] replayed webhook ignored room=${message.roomToken} messageId=${message.messageId}`, ); - } - return shouldProcess; - }, - onMessage: async (message) => { - core.channel.activity.record({ - channel: "nextcloud-talk", - accountId: account.accountId, - direction: "inbound", - at: message.timestamp, - }); - if (opts.onMessage) { - await opts.onMessage(message); return; } - await handleNextcloudTalkInbound({ - message, - account, - config: cfg, - runtime, - statusSink: opts.statusSink, - }); + + try { + core.channel.activity.record({ + channel: "nextcloud-talk", + accountId: account.accountId, + direction: "inbound", + at: message.timestamp, + }); + if (opts.onMessage) { + await opts.onMessage(message); + } else { + await handleNextcloudTalkInbound({ + message, + account, + config: cfg, + runtime, + statusSink: opts.statusSink, + }); + } + await replayGuard.commitMessage({ + accountId: account.accountId, + roomToken: message.roomToken, + messageId: message.messageId, + }); + } catch (error) { + replayGuard.releaseMessage({ + accountId: account.accountId, + roomToken: message.roomToken, + messageId: message.messageId, + error, + }); + throw error; + } }, + onMessage: async () => {}, onError: (error) => { logger.error(`[nextcloud-talk:${account.accountId}] webhook error: ${error.message}`); }, diff --git a/extensions/nextcloud-talk/src/replay-guard.ts b/extensions/nextcloud-talk/src/replay-guard.ts index ed4d1c7b79b..d0bf6e30094 100644 --- a/extensions/nextcloud-talk/src/replay-guard.ts +++ b/extensions/nextcloud-talk/src/replay-guard.ts @@ -1,5 +1,5 @@ import path from "node:path"; -import { createPersistentDedupe } from "../runtime-api.js"; +import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; const DEFAULT_REPLAY_TTL_MS = 24 * 60 * 60 * 1000; const DEFAULT_MEMORY_MAX_SIZE = 1_000; @@ -31,6 +31,22 @@ export type NextcloudTalkReplayGuardOptions = { }; export type NextcloudTalkReplayGuard = { + claimMessage: (params: { + accountId: string; + roomToken: string; + messageId: string; + }) => Promise<"claimed" | "duplicate" | "inflight" | "invalid">; + commitMessage: (params: { + accountId: string; + roomToken: string; + messageId: string; + }) => Promise; + releaseMessage: (params: { + accountId: string; + roomToken: string; + messageId: string; + error?: unknown; + }) => void; shouldProcessMessage: (params: { accountId: string; roomToken: string; @@ -42,23 +58,58 @@ export function createNextcloudTalkReplayGuard( options: NextcloudTalkReplayGuardOptions, ): NextcloudTalkReplayGuard { const stateDir = options.stateDir.trim(); - const persistentDedupe = createPersistentDedupe({ + const dedupe = createClaimableDedupe({ ttlMs: options.ttlMs ?? DEFAULT_REPLAY_TTL_MS, memoryMaxSize: options.memoryMaxSize ?? DEFAULT_MEMORY_MAX_SIZE, fileMaxEntries: options.fileMaxEntries ?? DEFAULT_FILE_MAX_ENTRIES, resolveFilePath: (namespace) => path.join(stateDir, "nextcloud-talk", "replay-dedupe", `${sanitizeSegment(namespace)}.json`), + onDiskError: options.onDiskError, }); return { + claimMessage: async ({ accountId, roomToken, messageId }) => { + const replayKey = buildReplayKey({ roomToken, messageId }); + if (!replayKey) { + return "invalid"; + } + const result = await dedupe.claim(replayKey, { + namespace: accountId, + }); + return result.kind; + }, + commitMessage: async ({ accountId, roomToken, messageId }) => { + const replayKey = buildReplayKey({ roomToken, messageId }); + if (!replayKey) { + return true; + } + return await dedupe.commit(replayKey, { + namespace: accountId, + }); + }, + releaseMessage: ({ accountId, roomToken, messageId, error }) => { + const replayKey = buildReplayKey({ roomToken, messageId }); + if (!replayKey) { + return; + } + dedupe.release(replayKey, { + namespace: accountId, + error, + }); + }, shouldProcessMessage: async ({ accountId, roomToken, messageId }) => { const replayKey = buildReplayKey({ roomToken, messageId }); if (!replayKey) { return true; } - return await persistentDedupe.checkAndRecord(replayKey, { + const result = await dedupe.claim(replayKey, { + namespace: accountId, + }); + if (result.kind !== "claimed") { + return false; + } + return await dedupe.commit(replayKey, { namespace: accountId, - onDiskError: options.onDiskError, }); }, }; diff --git a/extensions/nextcloud-talk/src/types.ts b/extensions/nextcloud-talk/src/types.ts index e271fc590df..77861148a65 100644 --- a/extensions/nextcloud-talk/src/types.ts +++ b/extensions/nextcloud-talk/src/types.ts @@ -182,6 +182,7 @@ export type NextcloudTalkWebhookServerOptions = { readBody?: (req: import("node:http").IncomingMessage, maxBodyBytes: number) => Promise; isBackendAllowed?: (backend: string) => boolean; shouldProcessMessage?: (message: NextcloudTalkInboundMessage) => boolean | Promise; + processMessage?: (message: NextcloudTalkInboundMessage) => void | Promise; onMessage: (message: NextcloudTalkInboundMessage) => void | Promise; onError?: (error: Error) => void; abortSignal?: AbortSignal;