diff --git a/extensions/matrix/src/exec-approval-resolver.test.ts b/extensions/matrix/src/exec-approval-resolver.test.ts index e2e15f106a4..272365de301 100644 --- a/extensions/matrix/src/exec-approval-resolver.test.ts +++ b/extensions/matrix/src/exec-approval-resolver.test.ts @@ -2,31 +2,21 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const gatewayRuntimeHoisted = vi.hoisted(() => ({ requestSpy: vi.fn(), - startSpy: vi.fn(), - stopSpy: vi.fn(), - stopAndWaitSpy: vi.fn(async () => undefined), - createClientSpy: vi.fn(), + withClientSpy: vi.fn(), })); vi.mock("openclaw/plugin-sdk/gateway-runtime", () => ({ - createOperatorApprovalsGatewayClient: gatewayRuntimeHoisted.createClientSpy, + withOperatorApprovalsGatewayClient: gatewayRuntimeHoisted.withClientSpy, })); describe("resolveMatrixExecApproval", () => { beforeEach(() => { gatewayRuntimeHoisted.requestSpy.mockReset(); - gatewayRuntimeHoisted.startSpy.mockReset(); - gatewayRuntimeHoisted.stopSpy.mockReset(); - gatewayRuntimeHoisted.stopAndWaitSpy.mockReset().mockResolvedValue(undefined); - gatewayRuntimeHoisted.createClientSpy.mockReset().mockImplementation((opts) => ({ - start: () => { - gatewayRuntimeHoisted.startSpy(); - opts.onHelloOk?.(); - }, - request: gatewayRuntimeHoisted.requestSpy, - stop: gatewayRuntimeHoisted.stopSpy, - stopAndWait: gatewayRuntimeHoisted.stopAndWaitSpy, - })); + gatewayRuntimeHoisted.withClientSpy.mockReset().mockImplementation(async (_params, run) => { + await run({ + request: gatewayRuntimeHoisted.requestSpy, + } as never); + }); }); it("submits exec approval resolutions through the gateway approvals client", async () => { diff --git a/extensions/matrix/src/exec-approval-resolver.ts b/extensions/matrix/src/exec-approval-resolver.ts index cf065211fda..00712b1d10f 100644 --- a/extensions/matrix/src/exec-approval-resolver.ts +++ b/extensions/matrix/src/exec-approval-resolver.ts @@ -1,7 +1,7 @@ import type { ExecApprovalReplyDecision } from "openclaw/plugin-sdk/approval-runtime"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { isApprovalNotFoundError } from "openclaw/plugin-sdk/error-runtime"; -import { createOperatorApprovalsGatewayClient } from "openclaw/plugin-sdk/gateway-runtime"; +import { withOperatorApprovalsGatewayClient } from "openclaw/plugin-sdk/gateway-runtime"; export { isApprovalNotFoundError }; @@ -12,53 +12,17 @@ export async function resolveMatrixExecApproval(params: { senderId?: string | null; gatewayUrl?: string; }): Promise { - let readySettled = false; - let resolveReady!: () => void; - let rejectReady!: (err: unknown) => void; - const ready = new Promise((resolve, reject) => { - resolveReady = resolve; - rejectReady = reject; - }); - const markReady = () => { - if (readySettled) { - return; - } - readySettled = true; - resolveReady(); - }; - const failReady = (err: unknown) => { - if (readySettled) { - return; - } - readySettled = true; - rejectReady(err); - }; - - const gatewayClient = await createOperatorApprovalsGatewayClient({ - config: params.cfg, - gatewayUrl: params.gatewayUrl, - clientDisplayName: `Matrix approval (${params.senderId?.trim() || "unknown"})`, - onHelloOk: () => { - markReady(); + await withOperatorApprovalsGatewayClient( + { + config: params.cfg, + gatewayUrl: params.gatewayUrl, + clientDisplayName: `Matrix approval (${params.senderId?.trim() || "unknown"})`, }, - onConnectError: (err) => { - failReady(err); + async (gatewayClient) => { + await gatewayClient.request("exec.approval.resolve", { + id: params.approvalId, + decision: params.decision, + }); }, - onClose: (code, reason) => { - failReady(new Error(`gateway closed (${code}): ${reason}`)); - }, - }); - - try { - gatewayClient.start(); - await ready; - await gatewayClient.request("exec.approval.resolve", { - id: params.approvalId, - decision: params.decision, - }); - } finally { - await gatewayClient.stopAndWait().catch(() => { - gatewayClient.stop(); - }); - } + ); } diff --git a/extensions/telegram/src/exec-approval-resolver.test.ts b/extensions/telegram/src/exec-approval-resolver.test.ts index d0c6165fa30..4d6bca2bc5b 100644 --- a/extensions/telegram/src/exec-approval-resolver.test.ts +++ b/extensions/telegram/src/exec-approval-resolver.test.ts @@ -2,31 +2,21 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const gatewayRuntimeHoisted = vi.hoisted(() => ({ requestSpy: vi.fn(), - startSpy: vi.fn(), - stopSpy: vi.fn(), - stopAndWaitSpy: vi.fn(async () => undefined), - createClientSpy: vi.fn(), + withClientSpy: vi.fn(), })); vi.mock("openclaw/plugin-sdk/gateway-runtime", () => ({ - createOperatorApprovalsGatewayClient: gatewayRuntimeHoisted.createClientSpy, + withOperatorApprovalsGatewayClient: gatewayRuntimeHoisted.withClientSpy, })); describe("resolveTelegramExecApproval", () => { beforeEach(() => { gatewayRuntimeHoisted.requestSpy.mockReset(); - gatewayRuntimeHoisted.startSpy.mockReset(); - gatewayRuntimeHoisted.stopSpy.mockReset(); - gatewayRuntimeHoisted.stopAndWaitSpy.mockReset().mockResolvedValue(undefined); - gatewayRuntimeHoisted.createClientSpy.mockReset().mockImplementation((opts) => ({ - start: () => { - gatewayRuntimeHoisted.startSpy(); - opts.onHelloOk?.(); - }, - request: gatewayRuntimeHoisted.requestSpy, - stop: gatewayRuntimeHoisted.stopSpy, - stopAndWait: gatewayRuntimeHoisted.stopAndWaitSpy, - })); + gatewayRuntimeHoisted.withClientSpy.mockReset().mockImplementation(async (_params, run) => { + await run({ + request: gatewayRuntimeHoisted.requestSpy, + } as never); + }); }); it("routes plugin approval ids through plugin.approval.resolve", async () => { diff --git a/extensions/telegram/src/exec-approval-resolver.ts b/extensions/telegram/src/exec-approval-resolver.ts index dee19b6586a..3ef47b8e42a 100644 --- a/extensions/telegram/src/exec-approval-resolver.ts +++ b/extensions/telegram/src/exec-approval-resolver.ts @@ -1,6 +1,6 @@ import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { isApprovalNotFoundError } from "openclaw/plugin-sdk/error-runtime"; -import { createOperatorApprovalsGatewayClient } from "openclaw/plugin-sdk/gateway-runtime"; +import { withOperatorApprovalsGatewayClient } from "openclaw/plugin-sdk/gateway-runtime"; import type { ExecApprovalReplyDecision } from "openclaw/plugin-sdk/infra-runtime"; export type ResolveTelegramExecApprovalParams = { @@ -15,69 +15,33 @@ export type ResolveTelegramExecApprovalParams = { export async function resolveTelegramExecApproval( params: ResolveTelegramExecApprovalParams, ): Promise { - let readySettled = false; - let resolveReady!: () => void; - let rejectReady!: (err: unknown) => void; - const ready = new Promise((resolve, reject) => { - resolveReady = resolve; - rejectReady = reject; - }); - const markReady = () => { - if (readySettled) { - return; - } - readySettled = true; - resolveReady(); - }; - const failReady = (err: unknown) => { - if (readySettled) { - return; - } - readySettled = true; - rejectReady(err); - }; - - const gatewayClient = await createOperatorApprovalsGatewayClient({ - config: params.cfg, - gatewayUrl: params.gatewayUrl, - clientDisplayName: `Telegram approval (${params.senderId?.trim() || "unknown"})`, - onHelloOk: () => { - markReady(); + await withOperatorApprovalsGatewayClient( + { + config: params.cfg, + gatewayUrl: params.gatewayUrl, + clientDisplayName: `Telegram approval (${params.senderId?.trim() || "unknown"})`, }, - onConnectError: (err) => { - failReady(err); - }, - onClose: (code, reason) => { - // Once onHelloOk resolves `ready`, in-flight request failures must come from - // gatewayClient.request() itself; failReady only covers the pre-ready phase. - failReady(new Error(`gateway closed (${code}): ${reason}`)); - }, - }); - - try { - gatewayClient.start(); - await ready; - const requestApproval = async (method: "exec.approval.resolve" | "plugin.approval.resolve") => { - await gatewayClient.request(method, { - id: params.approvalId, - decision: params.decision, - }); - }; - if (params.approvalId.startsWith("plugin:")) { - await requestApproval("plugin.approval.resolve"); - } else { - try { - await requestApproval("exec.approval.resolve"); - } catch (err) { - if (!params.allowPluginFallback || !isApprovalNotFoundError(err)) { - throw err; - } + async (gatewayClient) => { + const requestApproval = async ( + method: "exec.approval.resolve" | "plugin.approval.resolve", + ) => { + await gatewayClient.request(method, { + id: params.approvalId, + decision: params.decision, + }); + }; + if (params.approvalId.startsWith("plugin:")) { await requestApproval("plugin.approval.resolve"); + } else { + try { + await requestApproval("exec.approval.resolve"); + } catch (err) { + if (!params.allowPluginFallback || !isApprovalNotFoundError(err)) { + throw err; + } + await requestApproval("plugin.approval.resolve"); + } } - } - } finally { - await gatewayClient.stopAndWait().catch(() => { - gatewayClient.stop(); - }); - } + }, + ); } diff --git a/src/gateway/operator-approvals-client.test.ts b/src/gateway/operator-approvals-client.test.ts new file mode 100644 index 00000000000..4e1a52a6b35 --- /dev/null +++ b/src/gateway/operator-approvals-client.test.ts @@ -0,0 +1,109 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const clientState = vi.hoisted(() => ({ + options: null as Record | null, + startMode: "hello" as "hello" | "close", + close: { code: 1008, reason: "pairing required" }, + requestSpy: vi.fn(), + stopSpy: vi.fn(), + stopAndWaitSpy: vi.fn(async () => undefined), +})); + +class MockGatewayClient { + private readonly opts: Record; + + constructor(opts: Record) { + this.opts = opts; + clientState.options = opts; + } + + start(): void { + void Promise.resolve() + .then(async () => { + if (clientState.startMode === "close") { + const onClose = this.opts.onClose; + if (typeof onClose === "function") { + onClose(clientState.close.code, clientState.close.reason); + } + return; + } + const onHelloOk = this.opts.onHelloOk; + if (typeof onHelloOk === "function") { + await onHelloOk(); + } + }) + .catch(() => {}); + } + + async request(method: string, params: unknown): Promise { + return await clientState.requestSpy(method, params); + } + + stop(): void { + clientState.stopSpy(); + } + + async stopAndWait(): Promise { + await clientState.stopAndWaitSpy(); + } +} + +vi.mock("./client-bootstrap.js", () => ({ + resolveGatewayClientBootstrap: vi.fn(async () => ({ + url: "ws://127.0.0.1:18789", + auth: { token: "secret", password: undefined }, + })), +})); + +vi.mock("./client.js", () => ({ + GatewayClient: MockGatewayClient, +})); + +const { withOperatorApprovalsGatewayClient } = await import("./operator-approvals-client.js"); + +describe("withOperatorApprovalsGatewayClient", () => { + beforeEach(() => { + clientState.options = null; + clientState.startMode = "hello"; + clientState.close = { code: 1008, reason: "pairing required" }; + clientState.requestSpy.mockReset().mockResolvedValue(undefined); + clientState.stopSpy.mockReset(); + clientState.stopAndWaitSpy.mockReset().mockResolvedValue(undefined); + }); + + it("waits for hello before running the callback and stops cleanly", async () => { + await withOperatorApprovalsGatewayClient( + { + config: {} as never, + clientDisplayName: "Matrix approval (@owner:example.org)", + }, + async (client) => { + await client.request("exec.approval.resolve", { + id: "req-123", + decision: "allow-once", + }); + }, + ); + + expect(clientState.options?.scopes).toEqual(["operator.approvals"]); + expect(clientState.requestSpy).toHaveBeenCalledWith("exec.approval.resolve", { + id: "req-123", + decision: "allow-once", + }); + expect(clientState.stopAndWaitSpy).toHaveBeenCalledTimes(1); + }); + + it("surfaces close failures before hello", async () => { + clientState.startMode = "close"; + + await expect( + withOperatorApprovalsGatewayClient( + { + config: {} as never, + clientDisplayName: "Matrix approval (@owner:example.org)", + }, + async () => undefined, + ), + ).rejects.toThrow("gateway closed (1008): pairing required"); + }); +}); diff --git a/src/gateway/operator-approvals-client.ts b/src/gateway/operator-approvals-client.ts index cb3b063bb43..b4ecbd4b8cb 100644 --- a/src/gateway/operator-approvals-client.ts +++ b/src/gateway/operator-approvals-client.ts @@ -32,3 +32,59 @@ export async function createOperatorApprovalsGatewayClient( onClose: params.onClose, }); } + +export async function withOperatorApprovalsGatewayClient( + params: { + config: OpenClawConfig; + gatewayUrl?: string; + clientDisplayName: string; + }, + run: (client: GatewayClient) => Promise, +): Promise { + let readySettled = false; + let resolveReady!: () => void; + let rejectReady!: (err: unknown) => void; + const ready = new Promise((resolve, reject) => { + resolveReady = resolve; + rejectReady = reject; + }); + const markReady = () => { + if (readySettled) { + return; + } + readySettled = true; + resolveReady(); + }; + const failReady = (err: unknown) => { + if (readySettled) { + return; + } + readySettled = true; + rejectReady(err); + }; + + const gatewayClient = await createOperatorApprovalsGatewayClient({ + config: params.config, + gatewayUrl: params.gatewayUrl, + clientDisplayName: params.clientDisplayName, + onHelloOk: () => { + markReady(); + }, + onConnectError: (err) => { + failReady(err); + }, + onClose: (code, reason) => { + failReady(new Error(`gateway closed (${code}): ${reason}`)); + }, + }); + + try { + gatewayClient.start(); + await ready; + return await run(gatewayClient); + } finally { + await gatewayClient.stopAndWait().catch(() => { + gatewayClient.stop(); + }); + } +} diff --git a/src/plugin-sdk/gateway-runtime.ts b/src/plugin-sdk/gateway-runtime.ts index f1ef78ef14c..12b0b3ecf24 100644 --- a/src/plugin-sdk/gateway-runtime.ts +++ b/src/plugin-sdk/gateway-runtime.ts @@ -2,5 +2,8 @@ export * from "../gateway/channel-status-patches.js"; export { GatewayClient } from "../gateway/client.js"; -export { createOperatorApprovalsGatewayClient } from "../gateway/operator-approvals-client.js"; +export { + createOperatorApprovalsGatewayClient, + withOperatorApprovalsGatewayClient, +} from "../gateway/operator-approvals-client.js"; export type { EventFrame } from "../gateway/protocol/index.js";