From d1e3ed3743989de633558deac1727b86a39c237b Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 13 Apr 2026 18:04:21 +0100 Subject: [PATCH] fix(plugins): serialize interactive callback dedupe --- src/plugins/interactive-state.ts | 37 ++++++++++++++ src/plugins/interactive.test.ts | 88 ++++++++++++++++++++++++++++++++ src/plugins/interactive.ts | 33 +++++++----- 3 files changed, 145 insertions(+), 13 deletions(-) diff --git a/src/plugins/interactive-state.ts b/src/plugins/interactive-state.ts index 24ccddf95ab..7b6d339d548 100644 --- a/src/plugins/interactive-state.ts +++ b/src/plugins/interactive-state.ts @@ -11,6 +11,7 @@ export type RegisteredInteractiveHandler = PluginInteractiveHandlerRegistration type InteractiveState = { interactiveHandlers: Map; callbackDedupe: ReturnType; + inflightCallbackDedupe: Set; }; const PLUGIN_INTERACTIVE_STATE_KEY = Symbol.for("openclaw.pluginInteractiveState"); @@ -25,6 +26,7 @@ function getState() { maxSize: 4096, }, ), + inflightCallbackDedupe: new Set(), })); } @@ -36,7 +38,42 @@ export function getPluginInteractiveCallbackDedupeState() { return getState().callbackDedupe; } +export function claimPluginInteractiveCallbackDedupe( + dedupeKey: string | undefined, + now = Date.now(), +): boolean { + if (!dedupeKey) { + return true; + } + const state = getState(); + if (state.inflightCallbackDedupe.has(dedupeKey) || state.callbackDedupe.peek(dedupeKey, now)) { + return false; + } + state.inflightCallbackDedupe.add(dedupeKey); + return true; +} + +export function commitPluginInteractiveCallbackDedupe( + dedupeKey: string | undefined, + now = Date.now(), +): void { + if (!dedupeKey) { + return; + } + const state = getState(); + state.inflightCallbackDedupe.delete(dedupeKey); + state.callbackDedupe.check(dedupeKey, now); +} + +export function releasePluginInteractiveCallbackDedupe(dedupeKey: string | undefined): void { + if (!dedupeKey) { + return; + } + getState().inflightCallbackDedupe.delete(dedupeKey); +} + export function clearPluginInteractiveHandlersState(): void { getPluginInteractiveHandlersState().clear(); getPluginInteractiveCallbackDedupeState().clear(); + getState().inflightCallbackDedupe.clear(); } diff --git a/src/plugins/interactive.test.ts b/src/plugins/interactive.test.ts index 9670de806d6..a3672bc11d1 100644 --- a/src/plugins/interactive.test.ts +++ b/src/plugins/interactive.test.ts @@ -780,4 +780,92 @@ describe("plugin interactive handlers", () => { }); expect(handler).toHaveBeenCalledTimes(2); }); + + it("dedupes concurrent interactive dispatches while a handler is still running", async () => { + let releaseHandler!: () => void; + const handlerGate = new Promise((resolve) => { + releaseHandler = resolve; + }); + const handler = vi.fn(async () => { + await handlerGate; + return { handled: true }; + }); + expect( + registerPluginInteractiveHandler("codex-plugin", { + channel: "telegram", + namespace: "codex", + handler, + }), + ).toEqual({ ok: true }); + + const baseParams = createTelegramDispatchParams({ + data: "codex:resume:thread-1", + callbackId: "cb-concurrent", + }); + + const firstDispatch = dispatchInteractive(baseParams); + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledTimes(1); + }); + const duplicateDispatch = await dispatchInteractive(baseParams); + + expect(duplicateDispatch).toEqual({ + matched: true, + handled: true, + duplicate: true, + }); + + releaseHandler(); + + await expect(firstDispatch).resolves.toEqual({ + matched: true, + handled: true, + duplicate: false, + }); + expect(handler).toHaveBeenCalledTimes(1); + }); + + it("releases inflight interactive dedupe keys after a handler failure", async () => { + let rejectHandler!: (error: Error) => void; + const handlerGate = new Promise((_, reject) => { + rejectHandler = reject; + }); + const handler = vi + .fn(async () => ({ handled: true })) + .mockImplementationOnce(async () => await handlerGate) + .mockResolvedValueOnce({ handled: true }); + expect( + registerPluginInteractiveHandler("codex-plugin", { + channel: "telegram", + namespace: "codex", + handler, + }), + ).toEqual({ ok: true }); + + const baseParams = createTelegramDispatchParams({ + data: "codex:resume:thread-1", + callbackId: "cb-retry-after-failure", + }); + + const firstDispatch = dispatchInteractive(baseParams); + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledTimes(1); + }); + + await expect(dispatchInteractive(baseParams)).resolves.toEqual({ + matched: true, + handled: true, + duplicate: true, + }); + + rejectHandler(new Error("boom")); + await expect(firstDispatch).rejects.toThrow("boom"); + + await expect(dispatchInteractive(baseParams)).resolves.toEqual({ + matched: true, + handled: true, + duplicate: false, + }); + expect(handler).toHaveBeenCalledTimes(2); + }); }); diff --git a/src/plugins/interactive.ts b/src/plugins/interactive.ts index e40da3759d9..0f829dd7d82 100644 --- a/src/plugins/interactive.ts +++ b/src/plugins/interactive.ts @@ -3,7 +3,9 @@ import { type InteractiveRegistrationResult, } from "./interactive-registry.js"; import { - getPluginInteractiveCallbackDedupeState, + claimPluginInteractiveCallbackDedupe, + commitPluginInteractiveCallbackDedupe, + releasePluginInteractiveCallbackDedupe, type RegisteredInteractiveHandler, } from "./interactive-state.js"; @@ -40,27 +42,32 @@ export async function dispatchPluginInteractiveHandler< match: PluginInteractiveMatch, ) => Promise<{ handled?: boolean } | void> | { handled?: boolean } | void; }): Promise { - const callbackDedupe = getPluginInteractiveCallbackDedupeState(); const match = resolvePluginInteractiveNamespaceMatch(params.channel, params.data); if (!match) { return { matched: false, handled: false, duplicate: false }; } const dedupeKey = params.dedupeId?.trim(); - if (dedupeKey && callbackDedupe.peek(dedupeKey)) { + if (dedupeKey && !claimPluginInteractiveCallbackDedupe(dedupeKey)) { return { matched: true, handled: true, duplicate: true }; } - await params.onMatched?.(); + try { + await params.onMatched?.(); + const resolved = await params.invoke(match as PluginInteractiveMatch); + if (dedupeKey) { + commitPluginInteractiveCallbackDedupe(dedupeKey); + } - const resolved = await params.invoke(match as PluginInteractiveMatch); - if (dedupeKey) { - callbackDedupe.check(dedupeKey); + return { + matched: true, + handled: resolved?.handled ?? true, + duplicate: false, + }; + } catch (error) { + if (dedupeKey) { + releasePluginInteractiveCallbackDedupe(dedupeKey); + } + throw error; } - - return { - matched: true, - handled: resolved?.handled ?? true, - duplicate: false, - }; }