fix(plugins): serialize interactive callback dedupe

This commit is contained in:
Vincent Koc
2026-04-13 18:04:21 +01:00
parent 6d85dda336
commit d1e3ed3743
3 changed files with 145 additions and 13 deletions

View File

@@ -11,6 +11,7 @@ export type RegisteredInteractiveHandler = PluginInteractiveHandlerRegistration
type InteractiveState = {
interactiveHandlers: Map<string, RegisteredInteractiveHandler>;
callbackDedupe: ReturnType<typeof createDedupeCache>;
inflightCallbackDedupe: Set<string>;
};
const PLUGIN_INTERACTIVE_STATE_KEY = Symbol.for("openclaw.pluginInteractiveState");
@@ -25,6 +26,7 @@ function getState() {
maxSize: 4096,
},
),
inflightCallbackDedupe: new Set<string>(),
}));
}
@@ -36,7 +38,42 @@ export function getPluginInteractiveCallbackDedupeState() {
return getState().callbackDedupe;
}
export function claimPluginInteractiveCallbackDedupe(
dedupeKey: string | undefined,
now = Date.now(),
): boolean {
if (!dedupeKey) {
return true;
}
const state = getState();
if (state.inflightCallbackDedupe.has(dedupeKey) || state.callbackDedupe.peek(dedupeKey, now)) {
return false;
}
state.inflightCallbackDedupe.add(dedupeKey);
return true;
}
export function commitPluginInteractiveCallbackDedupe(
dedupeKey: string | undefined,
now = Date.now(),
): void {
if (!dedupeKey) {
return;
}
const state = getState();
state.inflightCallbackDedupe.delete(dedupeKey);
state.callbackDedupe.check(dedupeKey, now);
}
export function releasePluginInteractiveCallbackDedupe(dedupeKey: string | undefined): void {
if (!dedupeKey) {
return;
}
getState().inflightCallbackDedupe.delete(dedupeKey);
}
export function clearPluginInteractiveHandlersState(): void {
getPluginInteractiveHandlersState().clear();
getPluginInteractiveCallbackDedupeState().clear();
getState().inflightCallbackDedupe.clear();
}

View File

@@ -780,4 +780,92 @@ describe("plugin interactive handlers", () => {
});
expect(handler).toHaveBeenCalledTimes(2);
});
it("dedupes concurrent interactive dispatches while a handler is still running", async () => {
let releaseHandler!: () => void;
const handlerGate = new Promise<void>((resolve) => {
releaseHandler = resolve;
});
const handler = vi.fn(async () => {
await handlerGate;
return { handled: true };
});
expect(
registerPluginInteractiveHandler("codex-plugin", {
channel: "telegram",
namespace: "codex",
handler,
}),
).toEqual({ ok: true });
const baseParams = createTelegramDispatchParams({
data: "codex:resume:thread-1",
callbackId: "cb-concurrent",
});
const firstDispatch = dispatchInteractive(baseParams);
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledTimes(1);
});
const duplicateDispatch = await dispatchInteractive(baseParams);
expect(duplicateDispatch).toEqual({
matched: true,
handled: true,
duplicate: true,
});
releaseHandler();
await expect(firstDispatch).resolves.toEqual({
matched: true,
handled: true,
duplicate: false,
});
expect(handler).toHaveBeenCalledTimes(1);
});
it("releases inflight interactive dedupe keys after a handler failure", async () => {
let rejectHandler!: (error: Error) => void;
const handlerGate = new Promise<never>((_, reject) => {
rejectHandler = reject;
});
const handler = vi
.fn(async () => ({ handled: true }))
.mockImplementationOnce(async () => await handlerGate)
.mockResolvedValueOnce({ handled: true });
expect(
registerPluginInteractiveHandler("codex-plugin", {
channel: "telegram",
namespace: "codex",
handler,
}),
).toEqual({ ok: true });
const baseParams = createTelegramDispatchParams({
data: "codex:resume:thread-1",
callbackId: "cb-retry-after-failure",
});
const firstDispatch = dispatchInteractive(baseParams);
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledTimes(1);
});
await expect(dispatchInteractive(baseParams)).resolves.toEqual({
matched: true,
handled: true,
duplicate: true,
});
rejectHandler(new Error("boom"));
await expect(firstDispatch).rejects.toThrow("boom");
await expect(dispatchInteractive(baseParams)).resolves.toEqual({
matched: true,
handled: true,
duplicate: false,
});
expect(handler).toHaveBeenCalledTimes(2);
});
});

View File

@@ -3,7 +3,9 @@ import {
type InteractiveRegistrationResult,
} from "./interactive-registry.js";
import {
getPluginInteractiveCallbackDedupeState,
claimPluginInteractiveCallbackDedupe,
commitPluginInteractiveCallbackDedupe,
releasePluginInteractiveCallbackDedupe,
type RegisteredInteractiveHandler,
} from "./interactive-state.js";
@@ -40,27 +42,32 @@ export async function dispatchPluginInteractiveHandler<
match: PluginInteractiveMatch<TRegistration>,
) => Promise<{ handled?: boolean } | void> | { handled?: boolean } | void;
}): Promise<InteractiveDispatchResult> {
const callbackDedupe = getPluginInteractiveCallbackDedupeState();
const match = resolvePluginInteractiveNamespaceMatch(params.channel, params.data);
if (!match) {
return { matched: false, handled: false, duplicate: false };
}
const dedupeKey = params.dedupeId?.trim();
if (dedupeKey && callbackDedupe.peek(dedupeKey)) {
if (dedupeKey && !claimPluginInteractiveCallbackDedupe(dedupeKey)) {
return { matched: true, handled: true, duplicate: true };
}
await params.onMatched?.();
try {
await params.onMatched?.();
const resolved = await params.invoke(match as PluginInteractiveMatch<TRegistration>);
if (dedupeKey) {
commitPluginInteractiveCallbackDedupe(dedupeKey);
}
const resolved = await params.invoke(match as PluginInteractiveMatch<TRegistration>);
if (dedupeKey) {
callbackDedupe.check(dedupeKey);
return {
matched: true,
handled: resolved?.handled ?? true,
duplicate: false,
};
} catch (error) {
if (dedupeKey) {
releasePluginInteractiveCallbackDedupe(dedupeKey);
}
throw error;
}
return {
matched: true,
handled: resolved?.handled ?? true,
duplicate: false,
};
}