diff --git a/CHANGELOG.md b/CHANGELOG.md index 44750c85990..0bce56bb3e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ Docs: https://docs.openclaw.ai ### Changes +- QA/lab: add Convex-backed pooled Telegram credential leasing plus `openclaw qa credentials` admin commands and broker setup docs. (#65596) Thanks @joshavant. + ### Fixes - Gateway/startup: defer scheduled services until sidecars finish, gate chat history and model listing during sidecar resume, and let Control UI retry startup-gated history loads so Sandbox wake resumes channels first. (#65365) Thanks @lml2468. diff --git a/docs/help/testing.md b/docs/help/testing.md index 37a4798d8a1..417d7a7de18 100644 --- a/docs/help/testing.md +++ b/docs/help/testing.md @@ -69,10 +69,12 @@ These commands sit beside the main test suites when you need QA-lab realism: - Runs the Matrix live QA lane against a disposable Docker-backed Tuwunel homeserver. - Provisions three temporary Matrix users (`driver`, `sut`, `observer`) plus one private room, then starts a QA gateway child with the real Matrix plugin as the SUT transport. - Uses the pinned stable Tuwunel image `ghcr.io/matrix-construct/tuwunel:v1.5.1` by default. Override with `OPENCLAW_QA_MATRIX_TUWUNEL_IMAGE` when you need to test a different image. + - Matrix currently supports only `--credential-source env` because the lane provisions disposable users locally. - Writes a Matrix QA report, summary, and observed-events artifact under `.artifacts/qa-e2e/...`. - `pnpm openclaw qa telegram` - Runs the Telegram live QA lane against a real private group using the driver and SUT bot tokens from env. - Requires `OPENCLAW_QA_TELEGRAM_GROUP_ID`, `OPENCLAW_QA_TELEGRAM_DRIVER_BOT_TOKEN`, and `OPENCLAW_QA_TELEGRAM_SUT_BOT_TOKEN`. The group id must be the numeric Telegram chat id. + - Supports `--credential-source convex` for shared pooled credentials. Use env mode by default, or set `OPENCLAW_QA_CREDENTIAL_SOURCE=convex` to opt into pooled leases. - Requires two distinct bots in the same private group, with the SUT bot exposing a Telegram username. - For stable bot-to-bot observation, enable Bot-to-Bot Communication Mode in `@BotFather` for both bots and ensure the driver bot can observe group bot traffic. - Writes a Telegram QA report, summary, and observed-messages artifact under `.artifacts/qa-e2e/...`. @@ -87,6 +89,80 @@ transport coverage matrix. | Matrix | x | x | x | x | x | x | x | x | | | Telegram | x | | | | | | | | x | +### Shared Telegram credentials via Convex (v1) + +When `--credential-source convex` (or `OPENCLAW_QA_CREDENTIAL_SOURCE=convex`) is enabled for +`openclaw qa telegram`, QA lab acquires an exclusive lease from a Convex-backed pool, heartbeats +that lease while the lane is running, and releases the lease on shutdown. + +Reference Convex project scaffold: + +- `qa/convex-credential-broker/` + +Required env vars: + +- `OPENCLAW_QA_CONVEX_SITE_URL` (for example `https://your-deployment.convex.site`) +- One secret for the selected role: + - `OPENCLAW_QA_CONVEX_SECRET_MAINTAINER` for `maintainer` + - `OPENCLAW_QA_CONVEX_SECRET_CI` for `ci` +- Credential role selection: + - CLI: `--credential-role maintainer|ci` + - Env default: `OPENCLAW_QA_CREDENTIAL_ROLE` (defaults to `maintainer`) + +Optional env vars: + +- `OPENCLAW_QA_CREDENTIAL_LEASE_TTL_MS` (default `1200000`) +- `OPENCLAW_QA_CREDENTIAL_HEARTBEAT_INTERVAL_MS` (default `30000`) +- `OPENCLAW_QA_CREDENTIAL_ACQUIRE_TIMEOUT_MS` (default `90000`) +- `OPENCLAW_QA_CREDENTIAL_HTTP_TIMEOUT_MS` (default `15000`) +- `OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX` (default `/qa-credentials/v1`) +- `OPENCLAW_QA_CREDENTIAL_OWNER_ID` (optional trace id) +- `OPENCLAW_QA_ALLOW_INSECURE_HTTP=1` allows loopback `http://` Convex URLs for local-only development. + +`OPENCLAW_QA_CONVEX_SITE_URL` should use `https://` in normal operation. + +Maintainer admin commands (pool add/remove/list) require +`OPENCLAW_QA_CONVEX_SECRET_MAINTAINER` specifically. + +CLI helpers for maintainers: + +```bash +pnpm openclaw qa credentials add --kind telegram --payload-file qa/telegram-credential.json +pnpm openclaw qa credentials list --kind telegram +pnpm openclaw qa credentials remove --credential-id +``` + +Use `--json` for machine-readable output in scripts and CI utilities. + +Default endpoint contract (`OPENCLAW_QA_CONVEX_SITE_URL` + `/qa-credentials/v1`): + +- `POST /acquire` + - Request: `{ kind, ownerId, actorRole, leaseTtlMs, heartbeatIntervalMs }` + - Success: `{ status: "ok", credentialId, leaseToken, payload, leaseTtlMs?, heartbeatIntervalMs? }` + - Exhausted/retryable: `{ status: "error", code: "POOL_EXHAUSTED" | "NO_CREDENTIAL_AVAILABLE", ... }` +- `POST /heartbeat` + - Request: `{ kind, ownerId, actorRole, credentialId, leaseToken, leaseTtlMs }` + - Success: `{ status: "ok" }` (or empty `2xx`) +- `POST /release` + - Request: `{ kind, ownerId, actorRole, credentialId, leaseToken }` + - Success: `{ status: "ok" }` (or empty `2xx`) +- `POST /admin/add` (maintainer secret only) + - Request: `{ kind, actorId, payload, note?, status? }` + - Success: `{ status: "ok", credential }` +- `POST /admin/remove` (maintainer secret only) + - Request: `{ credentialId, actorId }` + - Success: `{ status: "ok", changed, credential }` + - Active lease guard: `{ status: "error", code: "LEASE_ACTIVE", ... }` +- `POST /admin/list` (maintainer secret only) + - Request: `{ kind?, status?, includePayload?, limit? }` + - Success: `{ status: "ok", credentials, count }` + +Payload shape for Telegram kind: + +- `{ groupId: string, driverToken: string, sutToken: string }` +- `groupId` must be a numeric Telegram chat id string. +- `admin/add` validates this shape for `kind: "telegram"` and rejects malformed payloads. + ### Adding a channel to QA Adding a channel to the markdown QA system requires exactly two things: diff --git a/extensions/qa-lab/src/cli.runtime.ts b/extensions/qa-lab/src/cli.runtime.ts index 3f75624ddea..f0001ffe7be 100644 --- a/extensions/qa-lab/src/cli.runtime.ts +++ b/extensions/qa-lab/src/cli.runtime.ts @@ -1,5 +1,6 @@ import fs from "node:fs/promises"; import path from "node:path"; +import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { buildQaAgenticParityComparison, renderQaAgenticParityMarkdownReport, @@ -15,6 +16,13 @@ import { startQaLabServer } from "./lab-server.js"; import { runQaManualLane } from "./manual-lane.runtime.js"; import { startQaMockOpenAiServer } from "./mock-openai-server.js"; import { runQaMultipass } from "./multipass.runtime.js"; +import { + addQaCredentialSet, + listQaCredentialSets, + QaCredentialAdminError, + removeQaCredentialSet, + type QaCredentialRecord, +} from "./qa-credentials-admin.runtime.js"; import { normalizeQaThinkingLevel, type QaThinkingLevel } from "./qa-gateway-config.js"; import { normalizeQaTransportId } from "./qa-transport-registry.js"; import { @@ -117,6 +125,46 @@ function parseQaCliBackendAuthMode(value: string | undefined): QaCliBackendAuthM throw new Error("--cli-auth-mode must be one of auto, api-key, subscription"); } +function parseQaCredentialListStatus(value: string | undefined) { + if (value === undefined) { + return undefined; + } + const normalized = value.trim().toLowerCase(); + if (normalized === "active" || normalized === "disabled" || normalized === "all") { + return normalized; + } + throw new Error('--status must be one of "active", "disabled", or "all".'); +} + +function normalizeQaCredentialAdminError(error: unknown) { + if (error instanceof QaCredentialAdminError) { + return { + code: error.code, + message: error.message, + }; + } + return { + code: "UNEXPECTED_ERROR", + message: formatErrorMessage(error), + }; +} + +function writeQaCredentialCommandErrorJson(action: string, error: unknown) { + const normalized = normalizeQaCredentialAdminError(error); + process.stdout.write( + `${JSON.stringify( + { + status: "error", + action, + code: normalized.code, + message: normalized.message, + }, + null, + 2, + )}\n`, + ); +} + function parseQaModelSpecs(label: string, entries: readonly string[] | undefined) { const models: string[] = []; const optionsByModel: Record = {}; @@ -198,6 +246,55 @@ async function runInterruptibleServer(label: string, server: InterruptibleServer await new Promise(() => undefined); } +async function readQaCredentialPayloadFile(filePath: string) { + const text = await fs.readFile(filePath, "utf8"); + let payload: unknown; + try { + payload = JSON.parse(text) as unknown; + } catch (error) { + throw new Error(`Payload file must contain valid JSON: ${formatErrorMessage(error)}`, { + cause: error, + }); + } + if (!payload || typeof payload !== "object" || Array.isArray(payload)) { + throw new Error("Payload file JSON must be an object."); + } + return payload as Record; +} + +function formatQaCredentialLeaseState(credential: QaCredentialRecord) { + if (!credential.lease) { + return "no"; + } + return `yes(${credential.lease.actorRole}:${credential.lease.ownerId})`; +} + +function printQaCredentialListTable(credentials: QaCredentialRecord[]) { + if (credentials.length === 0) { + process.stdout.write("No credentials matched.\n"); + return; + } + const rows = credentials.map((credential) => ({ + credentialId: credential.credentialId, + kind: credential.kind, + status: credential.status, + leased: formatQaCredentialLeaseState(credential), + note: credential.note ?? "", + })); + const idWidth = Math.max("credentialId".length, ...rows.map((row) => row.credentialId.length)); + const kindWidth = Math.max("kind".length, ...rows.map((row) => row.kind.length)); + const statusWidth = Math.max("status".length, ...rows.map((row) => row.status.length)); + const leaseWidth = Math.max("leased".length, ...rows.map((row) => row.leased.length)); + process.stdout.write( + `${"credentialId".padEnd(idWidth)} ${"kind".padEnd(kindWidth)} ${"status".padEnd(statusWidth)} ${"leased".padEnd(leaseWidth)} note\n`, + ); + for (const row of rows) { + process.stdout.write( + `${row.credentialId.padEnd(idWidth)} ${row.kind.padEnd(kindWidth)} ${row.status.padEnd(statusWidth)} ${row.leased.padEnd(leaseWidth)} ${row.note}\n`, + ); + } +} + export async function runQaLabSelfCheckCommand(opts: { repoRoot?: string; output?: string }) { const repoRoot = path.resolve(opts.repoRoot ?? process.cwd()); const server = await startQaLabServer({ @@ -411,6 +508,148 @@ export async function runQaManualLaneCommand(opts: { process.stdout.write("\n"); } +export async function runQaCredentialsAddCommand(opts: { + actorId?: string; + endpointPrefix?: string; + json?: boolean; + kind: string; + note?: string; + payloadFile: string; + repoRoot?: string; + siteUrl?: string; +}) { + const repoRoot = path.resolve(opts.repoRoot ?? process.cwd()); + try { + const payloadPath = path.resolve(repoRoot, opts.payloadFile); + const payload = await readQaCredentialPayloadFile(payloadPath); + const result = await addQaCredentialSet({ + kind: opts.kind, + payload, + note: opts.note, + actorId: opts.actorId, + siteUrl: opts.siteUrl, + endpointPrefix: opts.endpointPrefix, + }); + if (opts.json) { + process.stdout.write( + `${JSON.stringify({ status: "ok", action: "add", credential: result.credential }, null, 2)}\n`, + ); + return; + } + process.stdout.write(`QA credential added: ${result.credential.credentialId}\n`); + process.stdout.write(`Kind: ${result.credential.kind}\n`); + process.stdout.write(`Status: ${result.credential.status}\n`); + if (result.credential.note) { + process.stdout.write(`Note: ${result.credential.note}\n`); + } + } catch (error) { + if (opts.json) { + writeQaCredentialCommandErrorJson("add", error); + process.exitCode = 1; + return; + } + throw error; + } +} + +export async function runQaCredentialsRemoveCommand(opts: { + actorId?: string; + credentialId: string; + endpointPrefix?: string; + json?: boolean; + siteUrl?: string; +}) { + try { + const result = await removeQaCredentialSet({ + credentialId: opts.credentialId, + actorId: opts.actorId, + siteUrl: opts.siteUrl, + endpointPrefix: opts.endpointPrefix, + }); + if (opts.json) { + process.stdout.write( + `${JSON.stringify( + { + status: "ok", + action: "remove", + changed: result.changed, + credential: result.credential, + }, + null, + 2, + )}\n`, + ); + return; + } + process.stdout.write( + result.changed + ? `QA credential removed (disabled): ${result.credential.credentialId}\n` + : `QA credential already disabled: ${result.credential.credentialId}\n`, + ); + } catch (error) { + if (opts.json) { + writeQaCredentialCommandErrorJson("remove", error); + process.exitCode = 1; + return; + } + throw error; + } +} + +export async function runQaCredentialsListCommand(opts: { + actorId?: string; + endpointPrefix?: string; + json?: boolean; + kind?: string; + limit?: number; + showSecrets?: boolean; + siteUrl?: string; + status?: string; +}) { + try { + const result = await listQaCredentialSets({ + actorId: opts.actorId, + siteUrl: opts.siteUrl, + endpointPrefix: opts.endpointPrefix, + kind: opts.kind?.trim(), + status: parseQaCredentialListStatus(opts.status), + includePayload: opts.showSecrets, + limit: parseQaPositiveIntegerOption("--limit", opts.limit), + }); + if (opts.json) { + process.stdout.write( + `${JSON.stringify( + { + status: "ok", + action: "list", + count: result.credentials.length, + credentials: result.credentials, + }, + null, + 2, + )}\n`, + ); + return; + } + printQaCredentialListTable(result.credentials); + if (opts.showSecrets && result.credentials.length > 0) { + process.stdout.write("\nPayloads:\n"); + for (const credential of result.credentials) { + process.stdout.write( + `${credential.credentialId}: ${JSON.stringify(credential.payload ?? null)}\n`, + ); + } + } + } catch (error) { + if (opts.json) { + writeQaCredentialCommandErrorJson("list", error); + process.exitCode = 1; + return; + } + throw error; + } +} + export async function runQaLabUiCommand(opts: { repoRoot?: string; host?: string; diff --git a/extensions/qa-lab/src/cli.test.ts b/extensions/qa-lab/src/cli.test.ts index a051493870b..5fc231ce621 100644 --- a/extensions/qa-lab/src/cli.test.ts +++ b/extensions/qa-lab/src/cli.test.ts @@ -1,7 +1,16 @@ import { Command } from "commander"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -const { runQaMatrixCommand, runQaTelegramCommand } = vi.hoisted(() => ({ +const { + runQaCredentialsAddCommand, + runQaCredentialsListCommand, + runQaCredentialsRemoveCommand, + runQaMatrixCommand, + runQaTelegramCommand, +} = vi.hoisted(() => ({ + runQaCredentialsAddCommand: vi.fn(), + runQaCredentialsListCommand: vi.fn(), + runQaCredentialsRemoveCommand: vi.fn(), runQaMatrixCommand: vi.fn(), runQaTelegramCommand: vi.fn(), })); @@ -14,6 +23,12 @@ vi.mock("./live-transports/telegram/cli.runtime.js", () => ({ runQaTelegramCommand, })); +vi.mock("./cli.runtime.js", () => ({ + runQaCredentialsAddCommand, + runQaCredentialsListCommand, + runQaCredentialsRemoveCommand, +})); + import { registerQaLabCli } from "./cli.js"; describe("qa cli registration", () => { @@ -22,6 +37,9 @@ describe("qa cli registration", () => { beforeEach(() => { program = new Command(); registerQaLabCli(program); + runQaCredentialsAddCommand.mockReset(); + runQaCredentialsListCommand.mockReset(); + runQaCredentialsRemoveCommand.mockReset(); runQaMatrixCommand.mockReset(); runQaTelegramCommand.mockReset(); }); @@ -34,7 +52,7 @@ describe("qa cli registration", () => { const qa = program.commands.find((command) => command.name() === "qa"); expect(qa).toBeDefined(); expect(qa?.commands.map((command) => command.name())).toEqual( - expect.arrayContaining(["matrix", "telegram"]), + expect.arrayContaining(["matrix", "telegram", "credentials"]), ); }); @@ -72,6 +90,8 @@ describe("qa cli registration", () => { fastMode: true, scenarioIds: ["matrix-thread-follow-up", "matrix-thread-isolation"], sutAccountId: "sut-live", + credentialSource: undefined, + credentialRole: undefined, }); }); @@ -87,6 +107,92 @@ describe("qa cli registration", () => { fastMode: false, scenarioIds: [], sutAccountId: "sut", + credentialSource: undefined, + credentialRole: undefined, + }); + }); + + it("routes credential add flags into the qa runtime command", async () => { + await program.parseAsync([ + "node", + "openclaw", + "qa", + "credentials", + "add", + "--kind", + "telegram", + "--payload-file", + "qa/payload.json", + "--repo-root", + "/tmp/openclaw-repo", + "--note", + "shared lane", + "--site-url", + "https://first-schnauzer-821.convex.site", + "--endpoint-prefix", + "/qa-credentials/v1", + "--actor-id", + "maintainer-local", + "--json", + ]); + + expect(runQaCredentialsAddCommand).toHaveBeenCalledWith({ + kind: "telegram", + payloadFile: "qa/payload.json", + repoRoot: "/tmp/openclaw-repo", + note: "shared lane", + siteUrl: "https://first-schnauzer-821.convex.site", + endpointPrefix: "/qa-credentials/v1", + actorId: "maintainer-local", + json: true, + }); + }); + + it("routes credential remove flags into the qa runtime command", async () => { + await program.parseAsync([ + "node", + "openclaw", + "qa", + "credentials", + "remove", + "--credential-id", + "j57b8k419ba7bcsfw99rg05c9184p8br", + "--site-url", + "https://first-schnauzer-821.convex.site", + "--actor-id", + "maintainer-local", + "--json", + ]); + + expect(runQaCredentialsRemoveCommand).toHaveBeenCalledWith({ + credentialId: "j57b8k419ba7bcsfw99rg05c9184p8br", + siteUrl: "https://first-schnauzer-821.convex.site", + actorId: "maintainer-local", + endpointPrefix: undefined, + json: true, + }); + }); + + it("routes credential list defaults into the qa runtime command", async () => { + await program.parseAsync([ + "node", + "openclaw", + "qa", + "credentials", + "list", + "--kind", + "telegram", + ]); + + expect(runQaCredentialsListCommand).toHaveBeenCalledWith({ + kind: "telegram", + status: "all", + limit: undefined, + showSecrets: false, + siteUrl: undefined, + endpointPrefix: undefined, + actorId: undefined, + json: false, }); }); }); diff --git a/extensions/qa-lab/src/cli.ts b/extensions/qa-lab/src/cli.ts index 32d87b3b1c6..3eb0b6f8a0f 100644 --- a/extensions/qa-lab/src/cli.ts +++ b/extensions/qa-lab/src/cli.ts @@ -83,6 +83,45 @@ async function runQaManualLane(opts: { await runtime.runQaManualLaneCommand(opts); } +async function runQaCredentialsAdd(opts: { + actorId?: string; + endpointPrefix?: string; + json?: boolean; + kind: string; + note?: string; + payloadFile: string; + repoRoot?: string; + siteUrl?: string; +}) { + const runtime = await loadQaLabCliRuntime(); + await runtime.runQaCredentialsAddCommand(opts); +} + +async function runQaCredentialsRemove(opts: { + actorId?: string; + credentialId: string; + endpointPrefix?: string; + json?: boolean; + siteUrl?: string; +}) { + const runtime = await loadQaLabCliRuntime(); + await runtime.runQaCredentialsRemoveCommand(opts); +} + +async function runQaCredentialsList(opts: { + actorId?: string; + endpointPrefix?: string; + json?: boolean; + kind?: string; + limit?: number; + showSecrets?: boolean; + siteUrl?: string; + status?: string; +}) { + const runtime = await loadQaLabCliRuntime(); + await runtime.runQaCredentialsListCommand(opts); +} + async function runQaUi(opts: { repoRoot?: string; host?: string; @@ -347,6 +386,82 @@ export function registerQaLabCli(program: Command) { }, ); + const credentials = qa + .command("credentials") + .description("Manage pooled Convex live credentials used by QA lanes"); + + credentials + .command("add") + .description("Add one credential payload to the shared pool") + .requiredOption("--kind ", "Credential kind (for Telegram v1, use telegram)") + .requiredOption("--payload-file ", "JSON object file containing the credential payload") + .option("--repo-root ", "Repository root for resolving relative payload-file paths") + .option("--note ", "Optional note stored with this credential row") + .option("--site-url ", "Override OPENCLAW_QA_CONVEX_SITE_URL") + .option("--endpoint-prefix ", "Override OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX") + .option("--actor-id ", "Optional admin actor id to include in broker audit events") + .option("--json", "Emit machine-readable JSON output", false) + .action( + async (opts: { + kind: string; + payloadFile: string; + repoRoot?: string; + note?: string; + siteUrl?: string; + endpointPrefix?: string; + actorId?: string; + json?: boolean; + }) => { + await runQaCredentialsAdd(opts); + }, + ); + + credentials + .command("remove") + .description("Remove one credential from active use by disabling it") + .requiredOption("--credential-id ", "Credential row id from the Convex pool") + .option("--site-url ", "Override OPENCLAW_QA_CONVEX_SITE_URL") + .option("--endpoint-prefix ", "Override OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX") + .option("--actor-id ", "Optional admin actor id to include in broker audit events") + .option("--json", "Emit machine-readable JSON output", false) + .action( + async (opts: { + credentialId: string; + siteUrl?: string; + endpointPrefix?: string; + actorId?: string; + json?: boolean; + }) => { + await runQaCredentialsRemove(opts); + }, + ); + + credentials + .command("list") + .description("List credential rows in the shared Convex pool") + .option("--kind ", "Filter by credential kind") + .option("--status ", 'Filter by row status: "active", "disabled", or "all"', "all") + .option("--limit ", "Max rows to return", (value: string) => Number(value)) + .option("--show-secrets", "Include credential payload JSON in output", false) + .option("--site-url ", "Override OPENCLAW_QA_CONVEX_SITE_URL") + .option("--endpoint-prefix ", "Override OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX") + .option("--actor-id ", "Optional admin actor id to include in broker audit events") + .option("--json", "Emit machine-readable JSON output", false) + .action( + async (opts: { + kind?: string; + status?: string; + limit?: number; + showSecrets?: boolean; + siteUrl?: string; + endpointPrefix?: string; + actorId?: string; + json?: boolean; + }) => { + await runQaCredentialsList(opts); + }, + ); + qa.command("ui") .description("Start the private QA debugger UI and local QA bus") .option("--repo-root ", "Repository root to target when running from a neutral cwd") diff --git a/extensions/qa-lab/src/live-transports/matrix/cli.runtime.test.ts b/extensions/qa-lab/src/live-transports/matrix/cli.runtime.test.ts new file mode 100644 index 00000000000..a73080273e1 --- /dev/null +++ b/extensions/qa-lab/src/live-transports/matrix/cli.runtime.test.ts @@ -0,0 +1,43 @@ +import { describe, expect, it, vi } from "vitest"; + +const runMatrixQaLive = vi.hoisted(() => vi.fn()); + +vi.mock("./matrix-live.runtime.js", () => ({ + runMatrixQaLive, +})); + +import { runQaMatrixCommand } from "./cli.runtime.js"; + +describe("matrix qa cli runtime", () => { + it("rejects non-env credential sources for the disposable Matrix lane", async () => { + await expect( + runQaMatrixCommand({ + credentialSource: "convex", + }), + ).rejects.toThrow("Matrix QA currently supports only --credential-source env"); + }); + + it("passes through default env credential source options", async () => { + runMatrixQaLive.mockResolvedValue({ + reportPath: "/tmp/matrix-report.md", + summaryPath: "/tmp/matrix-summary.json", + observedEventsPath: "/tmp/matrix-events.json", + }); + + await runQaMatrixCommand({ + repoRoot: "/tmp/openclaw", + outputDir: ".artifacts/qa-e2e/matrix", + providerMode: "mock-openai", + credentialSource: "env", + }); + + expect(runMatrixQaLive).toHaveBeenCalledWith( + expect.objectContaining({ + repoRoot: "/tmp/openclaw", + outputDir: "/tmp/openclaw/.artifacts/qa-e2e/matrix", + providerMode: "mock-openai", + credentialSource: "env", + }), + ); + }); +}); diff --git a/extensions/qa-lab/src/live-transports/matrix/cli.runtime.ts b/extensions/qa-lab/src/live-transports/matrix/cli.runtime.ts index 2b1e0c0275a..f38843aa7dc 100644 --- a/extensions/qa-lab/src/live-transports/matrix/cli.runtime.ts +++ b/extensions/qa-lab/src/live-transports/matrix/cli.runtime.ts @@ -6,7 +6,15 @@ import { import { runMatrixQaLive } from "./matrix-live.runtime.js"; export async function runQaMatrixCommand(opts: LiveTransportQaCommandOptions) { - const result = await runMatrixQaLive(resolveLiveTransportQaRunOptions(opts)); + const runOptions = resolveLiveTransportQaRunOptions(opts); + const credentialSource = runOptions.credentialSource?.toLowerCase(); + if (credentialSource && credentialSource !== "env") { + throw new Error( + "Matrix QA currently supports only --credential-source env (disposable local harness).", + ); + } + + const result = await runMatrixQaLive(runOptions); printLiveTransportQaArtifacts("Matrix QA", { report: result.reportPath, summary: result.summaryPath, diff --git a/extensions/qa-lab/src/live-transports/shared/credential-lease.runtime.test.ts b/extensions/qa-lab/src/live-transports/shared/credential-lease.runtime.test.ts new file mode 100644 index 00000000000..9853256e03e --- /dev/null +++ b/extensions/qa-lab/src/live-transports/shared/credential-lease.runtime.test.ts @@ -0,0 +1,272 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + acquireQaCredentialLease, + startQaCredentialLeaseHeartbeat, +} from "./credential-lease.runtime.js"; + +function jsonResponse(payload: unknown, status = 200) { + return new Response(JSON.stringify(payload), { + status, + headers: { "content-type": "application/json" }, + }); +} + +describe("credential lease runtime", () => { + afterEach(() => { + vi.restoreAllMocks(); + vi.useRealTimers(); + }); + + it("uses env credentials by default", async () => { + const lease = await acquireQaCredentialLease({ + kind: "telegram", + resolveEnvPayload: () => ({ groupId: "-100123", driverToken: "driver", sutToken: "sut" }), + parsePayload: () => { + throw new Error("should not parse convex payload in env mode"); + }, + env: {}, + }); + + expect(lease.source).toBe("env"); + expect(lease.payload).toEqual({ + groupId: "-100123", + driverToken: "driver", + sutToken: "sut", + }); + }); + + it("acquires, heartbeats, and releases convex credentials", async () => { + const fetchImpl = vi + .fn() + .mockResolvedValueOnce( + jsonResponse({ + status: "ok", + credentialId: "cred-1", + leaseToken: "lease-1", + payload: { groupId: "-100123", driverToken: "driver", sutToken: "sut" }, + leaseTtlMs: 1_200_000, + heartbeatIntervalMs: 30_000, + }), + ) + .mockResolvedValueOnce(jsonResponse({ status: "ok" })) + .mockResolvedValueOnce(jsonResponse({ status: "ok" })); + + const lease = await acquireQaCredentialLease({ + kind: "telegram", + source: "convex", + role: "maintainer", + env: { + OPENCLAW_QA_CONVEX_SITE_URL: "https://qa-cred.example.convex.site", + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret", + }, + fetchImpl, + resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }), + parsePayload: (payload) => + payload as { groupId: string; driverToken: string; sutToken: string }, + }); + + expect(lease.source).toBe("convex"); + expect(lease.credentialId).toBe("cred-1"); + expect(lease.payload.groupId).toBe("-100123"); + + await lease.heartbeat(); + await lease.release(); + + expect(fetchImpl).toHaveBeenCalledTimes(3); + const firstCall = fetchImpl.mock.calls[0]; + expect(firstCall?.[0]).toContain("/qa-credentials/v1/acquire"); + const firstInit = firstCall?.[1]; + const headers = firstInit?.headers as Record; + expect(headers.authorization).toBe("Bearer maintainer-secret"); + }); + + it("retries convex acquire while the pool is exhausted", async () => { + const fetchImpl = vi + .fn() + .mockResolvedValueOnce( + jsonResponse({ + status: "error", + code: "POOL_EXHAUSTED", + message: "wait", + }), + ) + .mockResolvedValueOnce( + jsonResponse({ + status: "error", + code: "POOL_EXHAUSTED", + message: "wait", + }), + ) + .mockResolvedValueOnce( + jsonResponse({ + status: "ok", + credentialId: "cred-2", + leaseToken: "lease-2", + payload: { groupId: "-100456", driverToken: "driver-2", sutToken: "sut-2" }, + }), + ); + + const sleeps: number[] = []; + let nowMs = 0; + + const lease = await acquireQaCredentialLease({ + kind: "telegram", + source: "convex", + env: { + OPENCLAW_QA_CONVEX_SITE_URL: "https://qa-cred.example.convex.site", + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret", + OPENCLAW_QA_CREDENTIAL_ACQUIRE_TIMEOUT_MS: "90000", + }, + fetchImpl, + randomImpl: () => 0, + timeImpl: () => nowMs, + sleepImpl: async (ms) => { + sleeps.push(ms); + nowMs += ms; + }, + resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }), + parsePayload: (payload) => + payload as { groupId: string; driverToken: string; sutToken: string }, + }); + + expect(lease.credentialId).toBe("cred-2"); + expect(fetchImpl).toHaveBeenCalledTimes(3); + expect(sleeps.length).toBe(2); + expect(sleeps[0]).toBeGreaterThanOrEqual(100); + expect(sleeps[1]).toBeGreaterThan(sleeps[0] ?? 0); + }); + + it("rejects non-https convex site URLs unless local insecure opt-in is enabled", async () => { + await expect( + acquireQaCredentialLease({ + kind: "telegram", + source: "convex", + env: { + OPENCLAW_QA_CONVEX_SITE_URL: "http://qa-cred.example.convex.site", + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret", + }, + resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }), + parsePayload: (payload) => + payload as { groupId: string; driverToken: string; sutToken: string }, + }), + ).rejects.toThrow("must use https://"); + }); + + it("allows loopback http URLs when OPENCLAW_QA_ALLOW_INSECURE_HTTP is enabled", async () => { + const fetchImpl = vi.fn().mockResolvedValueOnce( + jsonResponse({ + status: "ok", + credentialId: "cred-local", + leaseToken: "lease-local", + payload: { groupId: "-100123", driverToken: "driver", sutToken: "sut" }, + }), + ); + + await acquireQaCredentialLease({ + kind: "telegram", + source: "convex", + role: "maintainer", + env: { + OPENCLAW_QA_CONVEX_SITE_URL: "http://127.0.0.1:3210", + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret", + OPENCLAW_QA_ALLOW_INSECURE_HTTP: "1", + }, + fetchImpl, + resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }), + parsePayload: (payload) => + payload as { groupId: string; driverToken: string; sutToken: string }, + }); + + const firstCall = fetchImpl.mock.calls[0]; + expect(firstCall?.[0]).toBe("http://127.0.0.1:3210/qa-credentials/v1/acquire"); + }); + + it("rejects unsafe endpoint prefix overrides", async () => { + await expect( + acquireQaCredentialLease({ + kind: "telegram", + source: "convex", + env: { + OPENCLAW_QA_CONVEX_SITE_URL: "https://qa-cred.example.convex.site", + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret", + OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX: "//evil.example", + }, + resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }), + parsePayload: (payload) => + payload as { groupId: string; driverToken: string; sutToken: string }, + }), + ).rejects.toThrow("OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX must be an absolute path"); + }); + + it("releases acquired lease when payload parsing fails", async () => { + const fetchImpl = vi + .fn() + .mockResolvedValueOnce( + jsonResponse({ + status: "ok", + credentialId: "cred-parse-fail", + leaseToken: "lease-parse-fail", + payload: { broken: true }, + }), + ) + .mockResolvedValueOnce(jsonResponse({ status: "ok" })); + + await expect( + acquireQaCredentialLease({ + kind: "telegram", + source: "convex", + role: "maintainer", + env: { + OPENCLAW_QA_CONVEX_SITE_URL: "https://qa-cred.example.convex.site", + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret", + }, + fetchImpl, + resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }), + parsePayload: () => { + throw new Error("bad payload shape"); + }, + }), + ).rejects.toThrow("bad payload shape"); + + expect(fetchImpl).toHaveBeenCalledTimes(2); + expect(fetchImpl.mock.calls[1]?.[0]).toBe( + "https://qa-cred.example.convex.site/qa-credentials/v1/release", + ); + }); + + it("fails convex mode when auth secret is missing", async () => { + await expect( + acquireQaCredentialLease({ + kind: "telegram", + source: "convex", + role: "maintainer", + env: { + OPENCLAW_QA_CONVEX_SITE_URL: "https://qa-cred.example.convex.site", + }, + resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }), + parsePayload: (payload) => + payload as { groupId: string; driverToken: string; sutToken: string }, + }), + ).rejects.toThrow("OPENCLAW_QA_CONVEX_SECRET_MAINTAINER"); + }); + + it("captures heartbeat failures for fail-fast checks", async () => { + vi.useFakeTimers(); + const heartbeat = startQaCredentialLeaseHeartbeat( + { + source: "convex", + kind: "telegram", + heartbeatIntervalMs: 50, + heartbeat: async () => { + throw new Error("heartbeat-down"); + }, + }, + { intervalMs: 50 }, + ); + + await vi.advanceTimersByTimeAsync(55); + expect(heartbeat.getFailure()).toBeInstanceOf(Error); + expect(() => heartbeat.throwIfFailed()).toThrow("heartbeat-down"); + await heartbeat.stop(); + }); +}); diff --git a/extensions/qa-lab/src/live-transports/shared/credential-lease.runtime.ts b/extensions/qa-lab/src/live-transports/shared/credential-lease.runtime.ts new file mode 100644 index 00000000000..c2e38a779f8 --- /dev/null +++ b/extensions/qa-lab/src/live-transports/shared/credential-lease.runtime.ts @@ -0,0 +1,576 @@ +import { randomUUID } from "node:crypto"; +import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; +import { z } from "zod"; + +const DEFAULT_ACQUIRE_TIMEOUT_MS = 90_000; +const DEFAULT_ENDPOINT_PREFIX = "/qa-credentials/v1"; +const DEFAULT_HEARTBEAT_INTERVAL_MS = 30_000; +const DEFAULT_HTTP_TIMEOUT_MS = 15_000; +const DEFAULT_LEASE_TTL_MS = 20 * 60 * 1_000; +const ALLOW_INSECURE_HTTP_ENV_KEY = "OPENCLAW_QA_ALLOW_INSECURE_HTTP"; +const RETRY_BACKOFF_MS = [500, 1_000, 2_000, 4_000, 5_000] as const; +const RETRYABLE_ACQUIRE_CODES = new Set(["POOL_EXHAUSTED", "NO_CREDENTIAL_AVAILABLE"]); + +const convexAcquireSuccessSchema = z.object({ + status: z.literal("ok"), + credentialId: z.string().min(1), + leaseToken: z.string().min(1), + payload: z.unknown(), + leaseTtlMs: z.number().int().positive().optional(), + heartbeatIntervalMs: z.number().int().positive().optional(), +}); + +const convexErrorSchema = z.object({ + status: z.literal("error"), + code: z.string().min(1), + message: z.string().optional(), + retryAfterMs: z.number().int().positive().optional(), +}); + +const convexOkSchema = z.object({ + status: z.literal("ok"), +}); + +type ConvexCredentialBrokerConfig = { + acquireTimeoutMs: number; + acquireUrl: string; + authToken: string; + heartbeatIntervalMs: number; + heartbeatUrl: string; + httpTimeoutMs: number; + leaseTtlMs: number; + ownerId: string; + releaseUrl: string; + role: QaCredentialRole; +}; + +export type QaCredentialLeaseHeartbeat = { + getFailure(): Error | null; + stop(): Promise; + throwIfFailed(): void; +}; + +export type QaCredentialRole = "ci" | "maintainer"; + +export type QaCredentialLeaseSource = "convex" | "env"; + +export type QaCredentialLease = { + credentialId?: string; + heartbeat(): Promise; + heartbeatIntervalMs: number; + kind: string; + leaseToken?: string; + leaseTtlMs: number; + ownerId?: string; + payload: TPayload; + release(): Promise; + role?: QaCredentialRole; + source: QaCredentialLeaseSource; +}; + +export type AcquireQaCredentialLeaseOptions = { + env?: NodeJS.ProcessEnv; + fetchImpl?: typeof fetch; + kind: string; + ownerId?: string; + parsePayload: (payload: unknown) => TPayload; + randomImpl?: () => number; + resolveEnvPayload: () => TPayload; + role?: string; + sleepImpl?: (ms: number) => Promise; + source?: string; + timeImpl?: () => number; +}; + +class QaCredentialBrokerError extends Error { + code: string; + retryAfterMs?: number; + + constructor(params: { code: string; message: string; retryAfterMs?: number }) { + super(params.message); + this.name = "QaCredentialBrokerError"; + this.code = params.code; + this.retryAfterMs = params.retryAfterMs; + } +} + +function parsePositiveIntegerEnv(env: NodeJS.ProcessEnv, key: string, fallback: number): number { + const raw = env[key]?.trim(); + if (!raw) { + return fallback; + } + const value = Number(raw); + if (!Number.isFinite(value) || !Number.isInteger(value) || value < 1) { + throw new Error(`${key} must be a positive integer.`); + } + return value; +} + +function normalizeQaCredentialSource(value: string | undefined): QaCredentialLeaseSource { + const normalized = value?.trim().toLowerCase() || "env"; + if (normalized === "env" || normalized === "convex") { + return normalized; + } + throw new Error(`Credential source must be one of env or convex, got "${value}".`); +} + +function normalizeQaCredentialRole(value: string | undefined): QaCredentialRole { + const normalized = value?.trim().toLowerCase() || "maintainer"; + if (normalized === "maintainer" || normalized === "ci") { + return normalized; + } + throw new Error(`Credential role must be one of maintainer or ci, got "${value}".`); +} + +function isTruthyOptIn(value: string | undefined) { + const normalized = value?.trim().toLowerCase(); + return normalized === "1" || normalized === "true" || normalized === "yes"; +} + +function isLoopbackHostname(hostname: string) { + return hostname === "localhost" || hostname === "::1" || hostname.startsWith("127."); +} + +function normalizeConvexSiteUrl(raw: string, env: NodeJS.ProcessEnv): string { + let url: URL; + try { + url = new URL(raw); + } catch { + throw new Error(`OPENCLAW_QA_CONVEX_SITE_URL must be a valid URL, got "${raw || ""}".`); + } + if (url.protocol === "https:") { + const text = url.toString(); + return text.endsWith("/") ? text.slice(0, -1) : text; + } + if (url.protocol !== "http:") { + throw new Error("OPENCLAW_QA_CONVEX_SITE_URL must use https://."); + } + const allowInsecureHttp = isTruthyOptIn(env[ALLOW_INSECURE_HTTP_ENV_KEY]); + if (!allowInsecureHttp || !isLoopbackHostname(url.hostname)) { + throw new Error( + `OPENCLAW_QA_CONVEX_SITE_URL must use https://. http:// is only allowed for loopback hosts when ${ALLOW_INSECURE_HTTP_ENV_KEY}=1.`, + ); + } + const text = url.toString(); + return text.endsWith("/") ? text.slice(0, -1) : text; +} + +function normalizeEndpointPrefix(value: string | undefined): string { + const trimmed = value?.trim(); + if (!trimmed) { + return DEFAULT_ENDPOINT_PREFIX; + } + const prefixed = trimmed.startsWith("/") ? trimmed : `/${trimmed}`; + const normalized = prefixed.endsWith("/") ? prefixed.slice(0, -1) : prefixed; + if (!normalized.startsWith("/") || normalized.startsWith("//")) { + throw new Error( + "OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX must be an absolute path like /qa-credentials/v1.", + ); + } + if (normalized.includes("\\") || normalized.split("/").some((segment) => segment === "..")) { + throw new Error( + "OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX must not contain backslashes or .. path segments.", + ); + } + return normalized; +} + +function resolveConvexAuthToken(env: NodeJS.ProcessEnv, role: QaCredentialRole): string { + const roleToken = + role === "ci" + ? env.OPENCLAW_QA_CONVEX_SECRET_CI?.trim() + : env.OPENCLAW_QA_CONVEX_SECRET_MAINTAINER?.trim(); + const token = roleToken; + if (token) { + return token; + } + if (role === "ci") { + throw new Error("Missing OPENCLAW_QA_CONVEX_SECRET_CI for CI credential access."); + } + throw new Error("Missing OPENCLAW_QA_CONVEX_SECRET_MAINTAINER for maintainer credential access."); +} + +function joinConvexEndpoint(baseUrl: string, prefix: string, suffix: string): string { + const normalizedSuffix = suffix.startsWith("/") ? suffix : `/${suffix}`; + const url = new URL(baseUrl); + url.pathname = `${prefix}${normalizedSuffix}`.replace(/\/{2,}/gu, "/"); + url.search = ""; + url.hash = ""; + return url.toString(); +} + +function resolveConvexCredentialBrokerConfig(params: { + env: NodeJS.ProcessEnv; + ownerId?: string; + role: QaCredentialRole; +}): ConvexCredentialBrokerConfig { + const siteUrl = params.env.OPENCLAW_QA_CONVEX_SITE_URL?.trim(); + if (!siteUrl) { + throw new Error("Missing OPENCLAW_QA_CONVEX_SITE_URL for --credential-source convex."); + } + const baseUrl = normalizeConvexSiteUrl(siteUrl, params.env); + const endpointPrefix = normalizeEndpointPrefix(params.env.OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX); + const ownerId = + params.ownerId?.trim() || + params.env.OPENCLAW_QA_CREDENTIAL_OWNER_ID?.trim() || + `qa-lab-${params.role}-${process.pid}-${randomUUID().slice(0, 8)}`; + return { + role: params.role, + ownerId, + authToken: resolveConvexAuthToken(params.env, params.role), + leaseTtlMs: parsePositiveIntegerEnv( + params.env, + "OPENCLAW_QA_CREDENTIAL_LEASE_TTL_MS", + DEFAULT_LEASE_TTL_MS, + ), + heartbeatIntervalMs: parsePositiveIntegerEnv( + params.env, + "OPENCLAW_QA_CREDENTIAL_HEARTBEAT_INTERVAL_MS", + DEFAULT_HEARTBEAT_INTERVAL_MS, + ), + acquireTimeoutMs: parsePositiveIntegerEnv( + params.env, + "OPENCLAW_QA_CREDENTIAL_ACQUIRE_TIMEOUT_MS", + DEFAULT_ACQUIRE_TIMEOUT_MS, + ), + httpTimeoutMs: parsePositiveIntegerEnv( + params.env, + "OPENCLAW_QA_CREDENTIAL_HTTP_TIMEOUT_MS", + DEFAULT_HTTP_TIMEOUT_MS, + ), + acquireUrl: joinConvexEndpoint(baseUrl, endpointPrefix, "acquire"), + heartbeatUrl: joinConvexEndpoint(baseUrl, endpointPrefix, "heartbeat"), + releaseUrl: joinConvexEndpoint(baseUrl, endpointPrefix, "release"), + }; +} + +function toBrokerError(params: { + payload: unknown; + fallback: string; +}): QaCredentialBrokerError | null { + const parsed = convexErrorSchema.safeParse(params.payload); + if (!parsed.success) { + return null; + } + return new QaCredentialBrokerError({ + code: parsed.data.code, + message: parsed.data.message?.trim() || params.fallback, + retryAfterMs: parsed.data.retryAfterMs, + }); +} + +async function postConvexBroker(params: { + authToken: string; + body: Record; + fetchImpl: typeof fetch; + timeoutMs: number; + url: string; +}): Promise { + const response = await params.fetchImpl(params.url, { + method: "POST", + headers: { + authorization: `Bearer ${params.authToken}`, + "content-type": "application/json", + }, + body: JSON.stringify(params.body), + signal: AbortSignal.timeout(params.timeoutMs), + }); + + const text = await response.text(); + const payload: unknown = (() => { + if (!text.trim()) { + return undefined; + } + try { + return JSON.parse(text) as unknown; + } catch { + return text; + } + })(); + + const brokerError = toBrokerError({ + payload, + fallback: `Convex credential broker request failed (${response.status}).`, + }); + if (brokerError) { + throw brokerError; + } + if (!response.ok) { + throw new Error( + `Convex credential broker request to ${params.url} failed with HTTP ${response.status}.`, + ); + } + return payload; +} + +function computeAcquireBackoffMs(params: { + attempt: number; + randomImpl: () => number; + retryAfterMs?: number; +}): number { + if (params.retryAfterMs && params.retryAfterMs > 0) { + return params.retryAfterMs; + } + const base = RETRY_BACKOFF_MS[Math.min(RETRY_BACKOFF_MS.length - 1, params.attempt - 1)]; + const jitter = 0.75 + params.randomImpl() * 0.5; + return Math.max(100, Math.round(base * jitter)); +} + +function assertConvexOk(payload: unknown, actionLabel: string) { + if (payload === undefined) { + return; + } + if (convexOkSchema.safeParse(payload).success) { + return; + } + const brokerError = toBrokerError({ + payload, + fallback: `Convex credential ${actionLabel} failed.`, + }); + if (brokerError) { + throw brokerError; + } + throw new Error(`Convex credential ${actionLabel} failed with an invalid response payload.`); +} + +export async function acquireQaCredentialLease( + opts: AcquireQaCredentialLeaseOptions, +): Promise> { + const env = opts.env ?? process.env; + const source = normalizeQaCredentialSource(opts.source ?? env.OPENCLAW_QA_CREDENTIAL_SOURCE); + if (source === "env") { + return { + source: "env", + kind: opts.kind, + payload: opts.resolveEnvPayload(), + heartbeatIntervalMs: 0, + leaseTtlMs: 0, + async heartbeat() {}, + async release() {}, + }; + } + + const role = normalizeQaCredentialRole(opts.role ?? env.OPENCLAW_QA_CREDENTIAL_ROLE); + const config = resolveConvexCredentialBrokerConfig({ + env, + role, + ownerId: opts.ownerId, + }); + const fetchImpl = opts.fetchImpl ?? fetch; + const sleepImpl = + opts.sleepImpl ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms))); + const timeImpl = opts.timeImpl ?? (() => Date.now()); + const randomImpl = opts.randomImpl ?? (() => Math.random()); + const startedAt = timeImpl(); + let attempt = 0; + + while (true) { + attempt += 1; + try { + const payload = await postConvexBroker({ + fetchImpl, + timeoutMs: config.httpTimeoutMs, + authToken: config.authToken, + url: config.acquireUrl, + body: { + kind: opts.kind, + ownerId: config.ownerId, + actorRole: config.role, + leaseTtlMs: config.leaseTtlMs, + heartbeatIntervalMs: config.heartbeatIntervalMs, + }, + }); + const acquired = convexAcquireSuccessSchema.parse(payload); + const releaseLease = async () => { + const releasePayload = await postConvexBroker({ + fetchImpl, + timeoutMs: config.httpTimeoutMs, + authToken: config.authToken, + url: config.releaseUrl, + body: { + kind: opts.kind, + ownerId: config.ownerId, + credentialId: acquired.credentialId, + leaseToken: acquired.leaseToken, + actorRole: config.role, + }, + }); + assertConvexOk(releasePayload, "release"); + }; + let parsedPayload: TPayload; + try { + parsedPayload = opts.parsePayload(acquired.payload); + } catch (error) { + try { + await releaseLease(); + } catch (releaseError) { + throw new Error( + `Convex credential payload validation failed for kind "${opts.kind}" and cleanup release failed: ${formatErrorMessage(error)}; release failed: ${formatErrorMessage(releaseError)}`, + { cause: releaseError }, + ); + } + throw new Error( + `Convex credential payload validation failed for kind "${opts.kind}": ${formatErrorMessage(error)}`, + { cause: error }, + ); + } + const leaseTtlMs = acquired.leaseTtlMs ?? config.leaseTtlMs; + const heartbeatIntervalMs = acquired.heartbeatIntervalMs ?? config.heartbeatIntervalMs; + return { + source: "convex", + kind: opts.kind, + role, + ownerId: config.ownerId, + credentialId: acquired.credentialId, + leaseToken: acquired.leaseToken, + leaseTtlMs, + heartbeatIntervalMs, + payload: parsedPayload, + async heartbeat() { + const heartbeatPayload = await postConvexBroker({ + fetchImpl, + timeoutMs: config.httpTimeoutMs, + authToken: config.authToken, + url: config.heartbeatUrl, + body: { + kind: opts.kind, + ownerId: config.ownerId, + credentialId: acquired.credentialId, + leaseToken: acquired.leaseToken, + actorRole: config.role, + leaseTtlMs, + }, + }); + assertConvexOk(heartbeatPayload, "heartbeat"); + }, + async release() { + await releaseLease(); + }, + }; + } catch (error) { + if (error instanceof QaCredentialBrokerError && RETRYABLE_ACQUIRE_CODES.has(error.code)) { + const elapsed = timeImpl() - startedAt; + if (elapsed >= config.acquireTimeoutMs) { + throw new Error( + `Convex credential pool exhausted for kind "${opts.kind}" after ${config.acquireTimeoutMs}ms.`, + { cause: error }, + ); + } + const delayMs = Math.min( + computeAcquireBackoffMs({ + attempt, + retryAfterMs: error.retryAfterMs, + randomImpl, + }), + Math.max(0, config.acquireTimeoutMs - elapsed), + ); + if (delayMs > 0) { + await sleepImpl(delayMs); + } + continue; + } + if (error instanceof z.ZodError) { + throw new Error( + `Convex credential acquire response did not match the expected payload for kind "${opts.kind}": ${error.message}`, + { cause: error }, + ); + } + throw new Error( + `Convex credential acquire failed for kind "${opts.kind}": ${formatErrorMessage(error)}`, + { cause: error }, + ); + } + } +} + +export function startQaCredentialLeaseHeartbeat( + lease: Pick, "heartbeat" | "heartbeatIntervalMs" | "kind" | "source">, + opts?: { + intervalMs?: number; + setTimeoutImpl?: typeof setTimeout; + clearTimeoutImpl?: typeof clearTimeout; + }, +): QaCredentialLeaseHeartbeat { + if (lease.source !== "convex") { + return { + getFailure: () => null, + async stop() {}, + throwIfFailed() {}, + }; + } + const intervalMs = opts?.intervalMs ?? lease.heartbeatIntervalMs; + if (!Number.isFinite(intervalMs) || intervalMs < 1) { + return { + getFailure: () => null, + async stop() {}, + throwIfFailed() {}, + }; + } + + const setTimeoutImpl = opts?.setTimeoutImpl ?? setTimeout; + const clearTimeoutImpl = opts?.clearTimeoutImpl ?? clearTimeout; + let failure: Error | null = null; + let stopped = false; + let timer: ReturnType | null = null; + let inFlight: Promise | null = null; + + const schedule = () => { + if (stopped || failure) { + return; + } + timer = setTimeoutImpl(() => { + timer = null; + if (stopped || failure) { + return; + } + inFlight = (async () => { + try { + await lease.heartbeat(); + } catch (error) { + failure = new Error( + `Credential lease heartbeat failed for kind "${lease.kind}": ${formatErrorMessage(error)}`, + ); + return; + } finally { + inFlight = null; + } + schedule(); + })(); + }, intervalMs); + }; + + schedule(); + + return { + getFailure() { + return failure; + }, + throwIfFailed() { + if (failure) { + throw failure; + } + }, + async stop() { + stopped = true; + if (timer) { + clearTimeoutImpl(timer); + timer = null; + } + if (inFlight) { + await inFlight.catch(() => {}); + } + }, + }; +} + +export const __testing = { + DEFAULT_ACQUIRE_TIMEOUT_MS, + DEFAULT_ENDPOINT_PREFIX, + DEFAULT_HEARTBEAT_INTERVAL_MS, + DEFAULT_LEASE_TTL_MS, + computeAcquireBackoffMs, + normalizeQaCredentialRole, + normalizeQaCredentialSource, + parsePositiveIntegerEnv, + resolveConvexCredentialBrokerConfig, +}; diff --git a/extensions/qa-lab/src/live-transports/shared/live-transport-cli.runtime.ts b/extensions/qa-lab/src/live-transports/shared/live-transport-cli.runtime.ts index cbe805d899f..6306760bd2e 100644 --- a/extensions/qa-lab/src/live-transports/shared/live-transport-cli.runtime.ts +++ b/extensions/qa-lab/src/live-transports/shared/live-transport-cli.runtime.ts @@ -25,6 +25,8 @@ export function resolveLiveTransportQaRunOptions( fastMode: opts.fastMode, scenarioIds: opts.scenarioIds, sutAccountId: opts.sutAccountId, + credentialSource: opts.credentialSource?.trim(), + credentialRole: opts.credentialRole?.trim(), }; } diff --git a/extensions/qa-lab/src/live-transports/shared/live-transport-cli.ts b/extensions/qa-lab/src/live-transports/shared/live-transport-cli.ts index bc6efebc1d5..fe389ebc500 100644 --- a/extensions/qa-lab/src/live-transports/shared/live-transport-cli.ts +++ b/extensions/qa-lab/src/live-transports/shared/live-transport-cli.ts @@ -11,6 +11,8 @@ export type LiveTransportQaCommandOptions = { fastMode?: boolean; scenarioIds?: string[]; sutAccountId?: string; + credentialSource?: string; + credentialRole?: string; }; type LiveTransportQaCommanderOptions = { @@ -22,6 +24,8 @@ type LiveTransportQaCommanderOptions = { scenario?: string[]; fast?: boolean; sutAccount?: string; + credentialSource?: string; + credentialRole?: string; }; export type LiveTransportQaCliRegistration = { @@ -49,6 +53,8 @@ export function mapLiveTransportQaCommanderOptions( fastMode: opts.fast, scenarioIds: opts.scenario, sutAccountId: opts.sutAccount, + credentialSource: opts.credentialSource, + credentialRole: opts.credentialRole, }; } @@ -76,6 +82,14 @@ export function registerLiveTransportQaCli(params: { .option("--scenario ", params.scenarioHelp, collectString, []) .option("--fast", "Enable provider fast mode where supported", false) .option("--sut-account ", params.sutAccountHelp, "sut") + .option( + "--credential-source ", + "Credential source for live lanes: env or convex (default: env)", + ) + .option( + "--credential-role ", + "Credential role for convex auth: maintainer or ci (default: maintainer)", + ) .action(async (opts: LiveTransportQaCommanderOptions) => { await params.run(mapLiveTransportQaCommanderOptions(opts)); }); diff --git a/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.test.ts b/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.test.ts index 3dd6554f1aa..a9e26797952 100644 --- a/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.test.ts +++ b/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.test.ts @@ -66,6 +66,30 @@ describe("telegram live qa runtime", () => { ).toThrow("OPENCLAW_QA_TELEGRAM_GROUP_ID must be a numeric Telegram chat id."); }); + it("parses Telegram pooled credential payloads", () => { + expect( + __testing.parseTelegramQaCredentialPayload({ + groupId: "-100123", + driverToken: "driver", + sutToken: "sut", + }), + ).toEqual({ + groupId: "-100123", + driverToken: "driver", + sutToken: "sut", + }); + }); + + it("rejects Telegram pooled credential payloads with non-numeric group ids", () => { + expect(() => + __testing.parseTelegramQaCredentialPayload({ + groupId: "qa-group", + driverToken: "driver", + sutToken: "sut", + }), + ).toThrow("Telegram credential payload groupId must be a numeric Telegram chat id."); + }); + it("injects a temporary Telegram account into the QA gateway config", () => { const baseCfg: OpenClawConfig = { plugins: { diff --git a/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.ts b/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.ts index 94d23be6643..dea99994b21 100644 --- a/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.ts +++ b/extensions/qa-lab/src/live-transports/telegram/telegram-live.runtime.ts @@ -4,12 +4,18 @@ import path from "node:path"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime"; +import { z } from "zod"; import { startQaGatewayChild } from "../../gateway-child.js"; import { defaultQaModelForMode, normalizeQaProviderMode, type QaProviderModeInput, } from "../../run-config.js"; +import { + acquireQaCredentialLease, + startQaCredentialLeaseHeartbeat, + type QaCredentialRole, +} from "../shared/credential-lease.runtime.js"; import { startQaLiveLaneGateway } from "../shared/live-gateway.runtime.js"; import { appendLiveLaneIssue, buildLiveLaneArtifactsError } from "../shared/live-lane-helpers.js"; import { @@ -89,6 +95,13 @@ export type TelegramQaRunResult = { }; type TelegramQaSummary = { + credentials: { + credentialId?: string; + kind: string; + ownerId?: string; + role?: QaCredentialRole; + source: "convex" | "env"; + }; groupId: string; startedAt: string; finishedAt: string; @@ -266,6 +279,12 @@ const TELEGRAM_QA_ENV_KEYS = [ "OPENCLAW_QA_TELEGRAM_SUT_BOT_TOKEN", ] as const; +const telegramQaCredentialPayloadSchema = z.object({ + groupId: z.string().trim().min(1), + driverToken: z.string().trim().min(1), + sutToken: z.string().trim().min(1), +}); + function resolveEnvValue(env: NodeJS.ProcessEnv, key: (typeof TELEGRAM_QA_ENV_KEYS)[number]) { const value = env[key]?.trim(); if (!value) { @@ -288,6 +307,18 @@ export function resolveTelegramQaRuntimeEnv( }; } +function parseTelegramQaCredentialPayload(payload: unknown): TelegramQaRuntimeEnv { + const parsed = telegramQaCredentialPayloadSchema.parse(payload); + if (!/^-?\d+$/u.test(parsed.groupId)) { + throw new Error("Telegram credential payload groupId must be a numeric Telegram chat id."); + } + return { + groupId: parsed.groupId, + driverToken: parsed.driverToken, + sutToken: parsed.sutToken, + }; +} + function flattenInlineButtons(replyMarkup?: TelegramReplyMarkup) { return (replyMarkup?.inline_keyboard ?? []) .flat() @@ -528,6 +559,7 @@ async function waitForTelegramChannelRunning( function renderTelegramQaMarkdown(params: { cleanupIssues: string[]; + credentialSource: "convex" | "env"; groupId: string; startedAt: string; finishedAt: string; @@ -536,6 +568,7 @@ function renderTelegramQaMarkdown(params: { const lines = [ "# Telegram QA Report", "", + `- Credential source: \`${params.credentialSource}\``, `- Group: \`${params.groupId}\``, `- Started: ${params.startedAt}`, `- Finished: ${params.finishedAt}`, @@ -803,6 +836,8 @@ export async function runTelegramQaLive(params: { fastMode?: boolean; scenarioIds?: string[]; sutAccountId?: string; + credentialSource?: string; + credentialRole?: string; }): Promise { const repoRoot = path.resolve(params.repoRoot ?? process.cwd()); const outputDir = @@ -810,7 +845,19 @@ export async function runTelegramQaLive(params: { path.join(repoRoot, ".artifacts", "qa-e2e", `telegram-${Date.now().toString(36)}`); await fs.mkdir(outputDir, { recursive: true }); - const runtimeEnv = resolveTelegramQaRuntimeEnv(); + const credentialLease = await acquireQaCredentialLease({ + kind: "telegram", + source: params.credentialSource, + role: params.credentialRole, + resolveEnvPayload: () => resolveTelegramQaRuntimeEnv(), + parsePayload: parseTelegramQaCredentialPayload, + }); + const leaseHeartbeat = startQaCredentialLeaseHeartbeat(credentialLease); + const assertLeaseHealthy = () => { + leaseHeartbeat.throwIfFailed(); + }; + + const runtimeEnv = credentialLease.payload; const providerMode = normalizeQaProviderMode(params.providerMode ?? "live-frontier"); const primaryModel = params.primaryModel?.trim() || defaultQaModelForMode(providerMode); const alternateModel = params.alternateModel?.trim() || defaultQaModelForMode(providerMode, true); @@ -819,145 +866,163 @@ export async function runTelegramQaLive(params: { const observedMessages: TelegramObservedMessage[] = []; const includeObservedMessageContent = process.env.OPENCLAW_QA_TELEGRAM_CAPTURE_CONTENT === "1"; const startedAt = new Date().toISOString(); - - const driverIdentity = await getBotIdentity(runtimeEnv.driverToken); - const sutIdentity = await getBotIdentity(runtimeEnv.sutToken); - const sutUsername = sutIdentity.username?.trim(); - const uniqueIds = new Set([driverIdentity.id, sutIdentity.id]); - if (uniqueIds.size !== 2) { - throw new Error("Telegram QA requires two distinct bots for driver and SUT."); - } - if (!sutUsername) { - throw new Error("Telegram QA requires the SUT bot to have a Telegram username."); - } - - await Promise.all([ - flushTelegramUpdates(runtimeEnv.driverToken), - flushTelegramUpdates(runtimeEnv.sutToken), - ]); - - const gatewayHarness = await startQaLiveLaneGateway({ - repoRoot, - transport: { - requiredPluginIds: [], - createGatewayConfig: () => ({}), - }, - transportBaseUrl: "http://127.0.0.1:0", - providerMode, - primaryModel, - alternateModel, - fastMode: params.fastMode, - controlUiEnabled: false, - mutateConfig: (cfg) => - buildTelegramQaConfig(cfg, { - groupId: runtimeEnv.groupId, - sutToken: runtimeEnv.sutToken, - driverBotId: driverIdentity.id, - sutAccountId, - }), - }); - const scenarioResults: TelegramQaScenarioResult[] = []; const cleanupIssues: string[] = []; let canaryFailure: string | null = null; try { - await waitForTelegramChannelRunning(gatewayHarness.gateway, sutAccountId); - try { - await runCanary({ - driverToken: runtimeEnv.driverToken, - groupId: runtimeEnv.groupId, - sutUsername, - sutBotId: sutIdentity.id, - observedMessages, - }); - } catch (error) { - canaryFailure = canaryFailureMessage({ - error, - groupId: runtimeEnv.groupId, - driverBotId: driverIdentity.id, - driverUsername: driverIdentity.username, - sutBotId: sutIdentity.id, - sutUsername, - }); - scenarioResults.push({ - id: "telegram-canary", - title: "Telegram canary", - status: "fail", - details: canaryFailure, - }); + const driverIdentity = await getBotIdentity(runtimeEnv.driverToken); + const sutIdentity = await getBotIdentity(runtimeEnv.sutToken); + const sutUsername = sutIdentity.username?.trim(); + const uniqueIds = new Set([driverIdentity.id, sutIdentity.id]); + if (uniqueIds.size !== 2) { + throw new Error("Telegram QA requires two distinct bots for driver and SUT."); } - if (!canaryFailure) { - let driverOffset = await flushTelegramUpdates(runtimeEnv.driverToken); - for (const scenario of scenarios) { - const scenarioRun = scenario.buildRun(sutUsername); - try { - const sent = await sendGroupMessage( - runtimeEnv.driverToken, - runtimeEnv.groupId, - scenarioRun.input, - ); - const matched = await waitForObservedMessage({ - token: runtimeEnv.driverToken, - initialOffset: driverOffset, - timeoutMs: scenario.timeoutMs, - observedMessages, - predicate: (message) => - matchesTelegramScenarioReply({ - groupId: runtimeEnv.groupId, - matchText: scenarioRun.matchText, - message, - sentMessageId: sent.message_id, - sutBotId: sutIdentity.id, - }), - }); - driverOffset = matched.nextOffset; - if (!scenarioRun.expectReply) { - throw new Error(`unexpected reply message ${matched.message.messageId} matched`); - } - assertTelegramScenarioReply({ - expectedTextIncludes: scenarioRun.expectedTextIncludes, - message: matched.message, - }); - scenarioResults.push({ - id: scenario.id, - title: scenario.title, - status: "pass", - details: `reply message ${matched.message.messageId} matched`, - }); - } catch (error) { - if (!scenarioRun.expectReply) { - const details = formatErrorMessage(error); - if ( - details === `timed out after ${scenario.timeoutMs}ms waiting for Telegram message` - ) { - scenarioResults.push({ - id: scenario.id, - title: scenario.title, - status: "pass", - details: "no reply", - }); - continue; + if (!sutUsername) { + throw new Error("Telegram QA requires the SUT bot to have a Telegram username."); + } + + await Promise.all([ + flushTelegramUpdates(runtimeEnv.driverToken), + flushTelegramUpdates(runtimeEnv.sutToken), + ]); + + const gatewayHarness = await startQaLiveLaneGateway({ + repoRoot, + transport: { + requiredPluginIds: [], + createGatewayConfig: () => ({}), + }, + transportBaseUrl: "http://127.0.0.1:0", + providerMode, + primaryModel, + alternateModel, + fastMode: params.fastMode, + controlUiEnabled: false, + mutateConfig: (cfg) => + buildTelegramQaConfig(cfg, { + groupId: runtimeEnv.groupId, + sutToken: runtimeEnv.sutToken, + driverBotId: driverIdentity.id, + sutAccountId, + }), + }); + try { + await waitForTelegramChannelRunning(gatewayHarness.gateway, sutAccountId); + assertLeaseHealthy(); + try { + await runCanary({ + driverToken: runtimeEnv.driverToken, + groupId: runtimeEnv.groupId, + sutUsername, + sutBotId: sutIdentity.id, + observedMessages, + }); + } catch (error) { + canaryFailure = canaryFailureMessage({ + error, + groupId: runtimeEnv.groupId, + driverBotId: driverIdentity.id, + driverUsername: driverIdentity.username, + sutBotId: sutIdentity.id, + sutUsername, + }); + scenarioResults.push({ + id: "telegram-canary", + title: "Telegram canary", + status: "fail", + details: canaryFailure, + }); + } + assertLeaseHealthy(); + if (!canaryFailure) { + let driverOffset = await flushTelegramUpdates(runtimeEnv.driverToken); + for (const scenario of scenarios) { + assertLeaseHealthy(); + const scenarioRun = scenario.buildRun(sutUsername); + try { + const sent = await sendGroupMessage( + runtimeEnv.driverToken, + runtimeEnv.groupId, + scenarioRun.input, + ); + const matched = await waitForObservedMessage({ + token: runtimeEnv.driverToken, + initialOffset: driverOffset, + timeoutMs: scenario.timeoutMs, + observedMessages, + predicate: (message) => + matchesTelegramScenarioReply({ + groupId: runtimeEnv.groupId, + matchText: scenarioRun.matchText, + message, + sentMessageId: sent.message_id, + sutBotId: sutIdentity.id, + }), + }); + driverOffset = matched.nextOffset; + if (!scenarioRun.expectReply) { + throw new Error(`unexpected reply message ${matched.message.messageId} matched`); } + assertTelegramScenarioReply({ + expectedTextIncludes: scenarioRun.expectedTextIncludes, + message: matched.message, + }); + scenarioResults.push({ + id: scenario.id, + title: scenario.title, + status: "pass", + details: `reply message ${matched.message.messageId} matched`, + }); + } catch (error) { + if (!scenarioRun.expectReply) { + const details = formatErrorMessage(error); + if ( + details === `timed out after ${scenario.timeoutMs}ms waiting for Telegram message` + ) { + scenarioResults.push({ + id: scenario.id, + title: scenario.title, + status: "pass", + details: "no reply", + }); + continue; + } + } + scenarioResults.push({ + id: scenario.id, + title: scenario.title, + status: "fail", + details: formatErrorMessage(error), + }); } - scenarioResults.push({ - id: scenario.id, - title: scenario.title, - status: "fail", - details: formatErrorMessage(error), - }); + assertLeaseHealthy(); } } + } finally { + try { + await gatewayHarness.stop(); + } catch (error) { + appendLiveLaneIssue(cleanupIssues, "live gateway cleanup", error); + } } } finally { + await leaseHeartbeat.stop(); try { - await gatewayHarness.stop(); + await credentialLease.release(); } catch (error) { - appendLiveLaneIssue(cleanupIssues, "live gateway cleanup", error); + appendLiveLaneIssue(cleanupIssues, "credential lease release", error); } } const finishedAt = new Date().toISOString(); const summary: TelegramQaSummary = { + credentials: { + source: credentialLease.source, + kind: credentialLease.kind, + role: credentialLease.role, + ownerId: credentialLease.ownerId, + credentialId: credentialLease.credentialId, + }, groupId: runtimeEnv.groupId, startedAt, finishedAt, @@ -976,6 +1041,7 @@ export async function runTelegramQaLive(params: { reportPath, `${renderTelegramQaMarkdown({ cleanupIssues, + credentialSource: credentialLease.source, groupId: runtimeEnv.groupId, startedAt, finishedAt, @@ -1043,5 +1109,6 @@ export const __testing = { findScenario, matchesTelegramScenarioReply, normalizeTelegramObservedMessage, + parseTelegramQaCredentialPayload, resolveTelegramQaRuntimeEnv, }; diff --git a/extensions/qa-lab/src/qa-credentials-admin.runtime.test.ts b/extensions/qa-lab/src/qa-credentials-admin.runtime.test.ts new file mode 100644 index 00000000000..7b6f1369c9f --- /dev/null +++ b/extensions/qa-lab/src/qa-credentials-admin.runtime.test.ts @@ -0,0 +1,207 @@ +import { describe, expect, it, vi } from "vitest"; +import { + addQaCredentialSet, + listQaCredentialSets, + QaCredentialAdminError, + removeQaCredentialSet, +} from "./qa-credentials-admin.runtime.js"; + +function jsonResponse(payload: unknown, status = 200) { + return new Response(JSON.stringify(payload), { + status, + headers: { + "content-type": "application/json", + }, + }); +} + +describe("qa credential admin runtime", () => { + it("adds a credential set through the admin endpoint", async () => { + const fetchImpl = vi.fn(async (_input: RequestInfo | URL, _init?: RequestInit) => + jsonResponse({ + status: "ok", + credential: { + credentialId: "cred-1", + kind: "telegram", + status: "active", + createdAtMs: 100, + updatedAtMs: 100, + lastLeasedAtMs: 0, + note: "qa", + }, + }), + ); + + const result = await addQaCredentialSet({ + kind: "telegram", + payload: { + groupId: "-100123", + driverToken: "driver", + sutToken: "sut", + }, + note: "qa", + actorId: "maintainer-local", + siteUrl: "https://first-schnauzer-821.convex.site", + env: { + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret", + }, + fetchImpl, + }); + + expect(result.credential.credentialId).toBe("cred-1"); + const [url, init] = fetchImpl.mock.calls[0] ?? []; + expect(url).toBe("https://first-schnauzer-821.convex.site/qa-credentials/v1/admin/add"); + const headers = init?.headers as Record; + expect(headers.authorization).toBe("Bearer maint-secret"); + const bodyText = init?.body; + expect(typeof bodyText).toBe("string"); + const body = JSON.parse(bodyText as string) as Record; + expect(body.kind).toBe("telegram"); + expect(body.actorId).toBe("maintainer-local"); + expect(body.payload).toEqual({ + groupId: "-100123", + driverToken: "driver", + sutToken: "sut", + }); + }); + + it("rejects admin commands when maintainer secret is missing", async () => { + await expect( + listQaCredentialSets({ + siteUrl: "https://first-schnauzer-821.convex.site", + env: {}, + fetchImpl: vi.fn(), + }), + ).rejects.toMatchObject({ + name: "QaCredentialAdminError", + code: "MISSING_MAINTAINER_SECRET", + } satisfies Partial); + }); + + it("rejects non-https admin site URLs unless local insecure opt-in is enabled", async () => { + await expect( + listQaCredentialSets({ + siteUrl: "http://qa-cred.example.convex.site", + env: { + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret", + }, + fetchImpl: vi.fn(), + }), + ).rejects.toMatchObject({ + name: "QaCredentialAdminError", + code: "INVALID_SITE_URL", + } satisfies Partial); + }); + + it("allows loopback http admin site URLs when OPENCLAW_QA_ALLOW_INSECURE_HTTP is enabled", async () => { + const fetchImpl = vi.fn(async (_input: RequestInfo | URL, _init?: RequestInit) => + jsonResponse({ + status: "ok", + count: 0, + credentials: [], + }), + ); + + await listQaCredentialSets({ + siteUrl: "http://127.0.0.1:3210", + env: { + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret", + OPENCLAW_QA_ALLOW_INSECURE_HTTP: "1", + }, + fetchImpl, + }); + + expect(fetchImpl.mock.calls[0]?.[0]).toBe("http://127.0.0.1:3210/qa-credentials/v1/admin/list"); + }); + + it("rejects unsafe endpoint-prefix overrides", async () => { + await expect( + listQaCredentialSets({ + siteUrl: "https://first-schnauzer-821.convex.site", + endpointPrefix: "//evil.example", + env: { + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret", + }, + fetchImpl: vi.fn(), + }), + ).rejects.toMatchObject({ + name: "QaCredentialAdminError", + code: "INVALID_ARGUMENT", + } satisfies Partial); + }); + + it("surfaces broker error codes for remove", async () => { + const fetchImpl = vi.fn(async (_input: RequestInfo | URL, _init?: RequestInit) => + jsonResponse( + { + status: "error", + code: "LEASE_ACTIVE", + message: "Credential is currently leased and cannot be disabled.", + }, + 200, + ), + ); + + await expect( + removeQaCredentialSet({ + credentialId: "cred-1", + siteUrl: "https://first-schnauzer-821.convex.site", + env: { + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret", + }, + fetchImpl, + }), + ).rejects.toMatchObject({ + name: "QaCredentialAdminError", + code: "LEASE_ACTIVE", + } satisfies Partial); + }); + + it("lists credentials and forwards includePayload/status filters", async () => { + const fetchImpl = vi.fn(async (_input: RequestInfo | URL, _init?: RequestInit) => + jsonResponse({ + status: "ok", + count: 1, + credentials: [ + { + credentialId: "cred-2", + kind: "telegram", + status: "active", + createdAtMs: 100, + updatedAtMs: 100, + lastLeasedAtMs: 50, + payload: { + groupId: "-100123", + driverToken: "driver", + sutToken: "sut", + }, + }, + ], + }), + ); + + const result = await listQaCredentialSets({ + kind: "telegram", + status: "active", + includePayload: true, + limit: 5, + siteUrl: "https://first-schnauzer-821.convex.site", + env: { + OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret", + }, + fetchImpl, + }); + + expect(result.credentials).toHaveLength(1); + const [, init] = fetchImpl.mock.calls[0] ?? []; + const bodyText = init?.body; + expect(typeof bodyText).toBe("string"); + const body = JSON.parse(bodyText as string) as Record; + expect(body).toEqual({ + kind: "telegram", + status: "active", + includePayload: true, + limit: 5, + }); + }); +}); diff --git a/extensions/qa-lab/src/qa-credentials-admin.runtime.ts b/extensions/qa-lab/src/qa-credentials-admin.runtime.ts new file mode 100644 index 00000000000..372e068c1e0 --- /dev/null +++ b/extensions/qa-lab/src/qa-credentials-admin.runtime.ts @@ -0,0 +1,407 @@ +import { randomUUID } from "node:crypto"; +import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; +import { z } from "zod"; + +const DEFAULT_ENDPOINT_PREFIX = "/qa-credentials/v1"; +const DEFAULT_HTTP_TIMEOUT_MS = 15_000; +const ALLOW_INSECURE_HTTP_ENV_KEY = "OPENCLAW_QA_ALLOW_INSECURE_HTTP"; + +const actorRoleSchema = z.union([z.literal("ci"), z.literal("maintainer")]); +const credentialStatusSchema = z.union([z.literal("active"), z.literal("disabled")]); +const listStatusSchema = z.union([z.literal("active"), z.literal("disabled"), z.literal("all")]); + +const brokerErrorSchema = z.object({ + status: z.literal("error"), + code: z.string().min(1), + message: z.string().min(1), +}); + +const credentialLeaseSchema = z.object({ + ownerId: z.string().min(1), + actorRole: actorRoleSchema, + acquiredAtMs: z.number().int(), + heartbeatAtMs: z.number().int(), + expiresAtMs: z.number().int(), +}); + +const credentialRecordSchema = z.object({ + credentialId: z.string().min(1), + kind: z.string().min(1), + status: credentialStatusSchema, + createdAtMs: z.number().int(), + updatedAtMs: z.number().int(), + lastLeasedAtMs: z.number().int(), + note: z.string().optional(), + lease: credentialLeaseSchema.optional(), + payload: z.unknown().optional(), +}); + +const addCredentialResponseSchema = z.object({ + status: z.literal("ok"), + credential: credentialRecordSchema, +}); + +const removeCredentialResponseSchema = z.object({ + status: z.literal("ok"), + changed: z.boolean(), + credential: credentialRecordSchema, +}); + +const listCredentialsResponseSchema = z.object({ + status: z.literal("ok"), + credentials: z.array(credentialRecordSchema), + count: z.number().int().nonnegative().optional(), +}); + +export type QaCredentialAdminListStatus = z.infer; +export type QaCredentialRecord = z.infer; +export type QaCredentialListResponse = z.infer; + +export class QaCredentialAdminError extends Error { + code: string; + httpStatus?: number; + + constructor(params: { code: string; message: string; httpStatus?: number }) { + super(params.message); + this.name = "QaCredentialAdminError"; + this.code = params.code; + this.httpStatus = params.httpStatus; + } +} + +type AdminConfig = { + actorId: string; + authToken: string; + addUrl: string; + endpointPrefix: string; + httpTimeoutMs: number; + listUrl: string; + removeUrl: string; + siteUrl: string; +}; + +type AdminBaseOptions = { + actorId?: string; + endpointPrefix?: string; + env?: NodeJS.ProcessEnv; + fetchImpl?: typeof fetch; + siteUrl?: string; +}; + +type AddQaCredentialSetOptions = AdminBaseOptions & { + kind: string; + note?: string; + payload: Record; + status?: z.infer; +}; + +type RemoveQaCredentialSetOptions = AdminBaseOptions & { + credentialId: string; +}; + +type ListQaCredentialSetsOptions = AdminBaseOptions & { + includePayload?: boolean; + kind?: string; + limit?: number; + status?: string; +}; + +function parsePositiveIntegerEnv(env: NodeJS.ProcessEnv, key: string, fallback: number): number { + const raw = env[key]?.trim(); + if (!raw) { + return fallback; + } + const value = Number(raw); + if (!Number.isFinite(value) || !Number.isInteger(value) || value < 1) { + throw new QaCredentialAdminError({ + code: "INVALID_ENV", + message: `${key} must be a positive integer.`, + }); + } + return value; +} + +function isTruthyOptIn(value: string | undefined) { + const normalized = value?.trim().toLowerCase(); + return normalized === "1" || normalized === "true" || normalized === "yes"; +} + +function isLoopbackHostname(hostname: string) { + return hostname === "localhost" || hostname === "::1" || hostname.startsWith("127."); +} + +function normalizeConvexSiteUrl(raw: string, env: NodeJS.ProcessEnv): string { + let parsed: URL; + try { + parsed = new URL(raw); + } catch { + throw new QaCredentialAdminError({ + code: "INVALID_SITE_URL", + message: `OPENCLAW_QA_CONVEX_SITE_URL must be a valid URL, got "${raw || ""}".`, + }); + } + if (parsed.protocol === "https:") { + const text = parsed.toString(); + return text.endsWith("/") ? text.slice(0, -1) : text; + } + if (parsed.protocol !== "http:") { + throw new QaCredentialAdminError({ + code: "INVALID_SITE_URL", + message: "OPENCLAW_QA_CONVEX_SITE_URL must use https://.", + }); + } + const allowInsecureHttp = isTruthyOptIn(env[ALLOW_INSECURE_HTTP_ENV_KEY]); + if (!allowInsecureHttp || !isLoopbackHostname(parsed.hostname)) { + throw new QaCredentialAdminError({ + code: "INVALID_SITE_URL", + message: `OPENCLAW_QA_CONVEX_SITE_URL must use https://. http:// is only allowed for loopback hosts when ${ALLOW_INSECURE_HTTP_ENV_KEY}=1.`, + }); + } + const text = parsed.toString(); + return text.endsWith("/") ? text.slice(0, -1) : text; +} + +function normalizeEndpointPrefix(value: string | undefined): string { + const trimmed = value?.trim(); + if (!trimmed) { + return DEFAULT_ENDPOINT_PREFIX; + } + const prefixed = trimmed.startsWith("/") ? trimmed : `/${trimmed}`; + const normalized = prefixed.endsWith("/") ? prefixed.slice(0, -1) : prefixed; + if (!normalized.startsWith("/") || normalized.startsWith("//")) { + throw new QaCredentialAdminError({ + code: "INVALID_ARGUMENT", + message: '--endpoint-prefix must be an absolute path like "/qa-credentials/v1" (not //host).', + }); + } + if (normalized.includes("\\") || normalized.split("/").some((segment) => segment === "..")) { + throw new QaCredentialAdminError({ + code: "INVALID_ARGUMENT", + message: '--endpoint-prefix must not contain backslashes or ".." path segments.', + }); + } + return normalized; +} + +function joinEndpoint(baseUrl: string, prefix: string, suffix: string): string { + const normalizedSuffix = suffix.startsWith("/") ? suffix : `/${suffix}`; + const url = new URL(baseUrl); + url.pathname = `${prefix}${normalizedSuffix}`.replace(/\/{2,}/gu, "/"); + url.search = ""; + url.hash = ""; + return url.toString(); +} + +function resolveAdminAuthToken(env: NodeJS.ProcessEnv): string { + const token = env.OPENCLAW_QA_CONVEX_SECRET_MAINTAINER?.trim(); + if (token) { + return token; + } + throw new QaCredentialAdminError({ + code: "MISSING_MAINTAINER_SECRET", + message: "Missing OPENCLAW_QA_CONVEX_SECRET_MAINTAINER for qa credential admin commands.", + }); +} + +function resolveAdminConfig(options: AdminBaseOptions): AdminConfig { + const env = options.env ?? process.env; + const siteUrl = options.siteUrl?.trim() || env.OPENCLAW_QA_CONVEX_SITE_URL?.trim(); + if (!siteUrl) { + throw new QaCredentialAdminError({ + code: "MISSING_SITE_URL", + message: "Missing OPENCLAW_QA_CONVEX_SITE_URL for qa credential admin commands.", + }); + } + const normalizedSiteUrl = normalizeConvexSiteUrl(siteUrl, env); + const endpointPrefix = normalizeEndpointPrefix( + options.endpointPrefix?.trim() || env.OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX, + ); + const actorId = + options.actorId?.trim() || + env.OPENCLAW_QA_CREDENTIAL_OWNER_ID?.trim() || + `qa-lab-admin-${process.pid}-${randomUUID().slice(0, 8)}`; + + return { + actorId, + authToken: resolveAdminAuthToken(env), + siteUrl: normalizedSiteUrl, + endpointPrefix, + httpTimeoutMs: parsePositiveIntegerEnv( + env, + "OPENCLAW_QA_CREDENTIAL_HTTP_TIMEOUT_MS", + DEFAULT_HTTP_TIMEOUT_MS, + ), + addUrl: joinEndpoint(normalizedSiteUrl, endpointPrefix, "admin/add"), + removeUrl: joinEndpoint(normalizedSiteUrl, endpointPrefix, "admin/remove"), + listUrl: joinEndpoint(normalizedSiteUrl, endpointPrefix, "admin/list"), + }; +} + +function parseJsonResponsePayload(text: string) { + if (!text.trim()) { + return undefined; + } + try { + return JSON.parse(text) as unknown; + } catch { + return text; + } +} + +function toBrokerError(payload: unknown, httpStatus: number) { + const parsed = brokerErrorSchema.safeParse(payload); + if (!parsed.success) { + return null; + } + return new QaCredentialAdminError({ + code: parsed.data.code, + message: parsed.data.message, + httpStatus, + }); +} + +async function postJson(params: { + authToken: string; + body: Record; + fetchImpl: typeof fetch; + httpTimeoutMs: number; + responseSchema: z.ZodType; + url: string; +}) { + let response: Response; + try { + response = await params.fetchImpl(params.url, { + method: "POST", + headers: { + authorization: `Bearer ${params.authToken}`, + "content-type": "application/json", + }, + body: JSON.stringify(params.body), + signal: AbortSignal.timeout(params.httpTimeoutMs), + }); + } catch (error) { + throw new QaCredentialAdminError({ + code: "BROKER_REQUEST_FAILED", + message: `Convex credential admin request failed: ${formatErrorMessage(error)}`, + }); + } + + const text = await response.text(); + const payload = parseJsonResponsePayload(text); + + const brokerError = toBrokerError(payload, response.status); + if (brokerError) { + throw brokerError; + } + if (!response.ok) { + throw new QaCredentialAdminError({ + code: "BROKER_HTTP_ERROR", + message: `Convex credential admin request failed with HTTP ${response.status}.`, + httpStatus: response.status, + }); + } + + const parsed = params.responseSchema.safeParse(payload); + if (!parsed.success) { + throw new QaCredentialAdminError({ + code: "INVALID_RESPONSE", + message: `Convex credential admin response did not match expected shape: ${parsed.error.message}`, + httpStatus: response.status, + }); + } + + return parsed.data; +} + +function normalizeStatus(value: string | undefined): QaCredentialAdminListStatus | undefined { + if (!value) { + return undefined; + } + const normalized = value.trim().toLowerCase(); + const parsed = listStatusSchema.safeParse(normalized); + if (!parsed.success) { + throw new QaCredentialAdminError({ + code: "INVALID_ARGUMENT", + message: '--status must be one of "active", "disabled", or "all".', + }); + } + return parsed.data; +} + +function normalizeLimit(value: number | undefined) { + if (value === undefined) { + return undefined; + } + if (!Number.isFinite(value) || !Number.isInteger(value) || value < 1) { + throw new QaCredentialAdminError({ + code: "INVALID_ARGUMENT", + message: "--limit must be a positive integer.", + }); + } + return value; +} + +export async function addQaCredentialSet(options: AddQaCredentialSetOptions) { + const config = resolveAdminConfig(options); + const fetchImpl = options.fetchImpl ?? fetch; + return await postJson({ + fetchImpl, + authToken: config.authToken, + httpTimeoutMs: config.httpTimeoutMs, + url: config.addUrl, + responseSchema: addCredentialResponseSchema, + body: { + kind: options.kind, + payload: options.payload, + ...(options.note ? { note: options.note } : {}), + ...(options.status ? { status: options.status } : {}), + actorId: config.actorId, + }, + }); +} + +export async function removeQaCredentialSet(options: RemoveQaCredentialSetOptions) { + const config = resolveAdminConfig(options); + const fetchImpl = options.fetchImpl ?? fetch; + return await postJson({ + fetchImpl, + authToken: config.authToken, + httpTimeoutMs: config.httpTimeoutMs, + url: config.removeUrl, + responseSchema: removeCredentialResponseSchema, + body: { + credentialId: options.credentialId, + actorId: config.actorId, + }, + }); +} + +export async function listQaCredentialSets(options: ListQaCredentialSetsOptions) { + const config = resolveAdminConfig(options); + const fetchImpl = options.fetchImpl ?? fetch; + const status = normalizeStatus(options.status); + const limit = normalizeLimit(options.limit); + return await postJson({ + fetchImpl, + authToken: config.authToken, + httpTimeoutMs: config.httpTimeoutMs, + url: config.listUrl, + responseSchema: listCredentialsResponseSchema, + body: { + ...(options.kind ? { kind: options.kind } : {}), + ...(status ? { status } : {}), + ...(options.includePayload === true ? { includePayload: true } : {}), + ...(limit !== undefined ? { limit } : {}), + }, + }); +} + +export const __testing = { + DEFAULT_ENDPOINT_PREFIX, + DEFAULT_HTTP_TIMEOUT_MS, + normalizeConvexSiteUrl, + normalizeEndpointPrefix, + normalizeStatus, + parsePositiveIntegerEnv, + resolveAdminConfig, +}; diff --git a/qa/README.md b/qa/README.md index 756ba580770..e8040c21ea0 100644 --- a/qa/README.md +++ b/qa/README.md @@ -6,6 +6,7 @@ Files: - `scenarios.md` - canonical QA scenario pack, kickoff mission, and operator identity. - `frontier-harness-plan.md` - big-model bakeoff and tuning loop for harness work. +- `convex-credential-broker/` - standalone Convex v1 lease broker for pooled live credentials. Key workflow: diff --git a/qa/convex-credential-broker/.gitignore b/qa/convex-credential-broker/.gitignore new file mode 100644 index 00000000000..119561f124b --- /dev/null +++ b/qa/convex-credential-broker/.gitignore @@ -0,0 +1,4 @@ +.convex +convex/_generated + +.env.local diff --git a/qa/convex-credential-broker/README.md b/qa/convex-credential-broker/README.md new file mode 100644 index 00000000000..8a3c1852ff3 --- /dev/null +++ b/qa/convex-credential-broker/README.md @@ -0,0 +1,165 @@ +# QA Convex Credential Broker (v1) + +Standalone Convex project for shared `qa-lab` live credentials with lease locking. + +This broker exposes: + +- `POST /qa-credentials/v1/acquire` +- `POST /qa-credentials/v1/heartbeat` +- `POST /qa-credentials/v1/release` +- `POST /qa-credentials/v1/admin/add` +- `POST /qa-credentials/v1/admin/remove` +- `POST /qa-credentials/v1/admin/list` + +The implementation matches the contract documented in +`docs/help/testing.md` for `--credential-source convex`. + +## Policy baked in + +- Pool partitioning: by `kind` only +- Selection: least-recently-leased (round-robin behavior) +- Secrets: separate maintainer/CI secrets +- Outage behavior: callers fail fast +- Lease event retention: 2 days (hourly cleanup cron) +- Admin event retention: 30 days (hourly cleanup cron) +- App-level encryption: not included in v1 + +## Quick start + +1. Create a Convex deployment and authenticate your CLI. +2. From this folder: + +```bash +cd qa/convex-credential-broker +npm install +npx convex dev +``` + +3. Deploy: + +```bash +npx convex deploy +``` + +4. In Convex deployment environment variables, set: + +- `OPENCLAW_QA_CONVEX_SECRET_MAINTAINER` +- `OPENCLAW_QA_CONVEX_SECRET_CI` + +Client URL policy: + +- `OPENCLAW_QA_CONVEX_SITE_URL` must use `https://` in normal use. +- Local development may use loopback `http://` only when `OPENCLAW_QA_ALLOW_INSECURE_HTTP=1`. + +## Manage credentials from qa-lab CLI + +Maintainers can manage rows without using the Convex dashboard: + +```bash +pnpm openclaw qa credentials add \ + --kind telegram \ + --payload-file qa/telegram-credential.json + +pnpm openclaw qa credentials list --kind telegram + +pnpm openclaw qa credentials remove --credential-id +``` + +Admin endpoints require `OPENCLAW_QA_CONVEX_SECRET_MAINTAINER`. + +## Local request examples + +Replace `` with your Convex site URL and `` with a configured secret. + +Acquire: + +```bash +curl -sS -X POST "/qa-credentials/v1/acquire" \ + -H "authorization: Bearer " \ + -H "content-type: application/json" \ + -d '{ + "kind":"telegram", + "ownerId":"local-dev", + "actorRole":"maintainer", + "leaseTtlMs":1200000, + "heartbeatIntervalMs":30000 + }' +``` + +Heartbeat: + +```bash +curl -sS -X POST "/qa-credentials/v1/heartbeat" \ + -H "authorization: Bearer " \ + -H "content-type: application/json" \ + -d '{ + "kind":"telegram", + "ownerId":"local-dev", + "actorRole":"maintainer", + "credentialId":"", + "leaseToken":"", + "leaseTtlMs":1200000 + }' +``` + +Release: + +```bash +curl -sS -X POST "/qa-credentials/v1/release" \ + -H "authorization: Bearer " \ + -H "content-type: application/json" \ + -d '{ + "kind":"telegram", + "ownerId":"local-dev", + "actorRole":"maintainer", + "credentialId":"", + "leaseToken":"" + }' +``` + +Admin add (maintainer token only): + +```bash +curl -sS -X POST "/qa-credentials/v1/admin/add" \ + -H "authorization: Bearer " \ + -H "content-type: application/json" \ + -d '{ + "kind":"telegram", + "actorId":"local-maintainer", + "payload":{ + "groupId":"-100123", + "driverToken":"driver-token", + "sutToken":"sut-token" + } + }' +``` + +For `kind: "telegram"`, broker `admin/add` validates that payload includes: + +- `groupId` as a numeric chat id string +- non-empty `driverToken` +- non-empty `sutToken` + +Admin list (default redacted): + +```bash +curl -sS -X POST "/qa-credentials/v1/admin/list" \ + -H "authorization: Bearer " \ + -H "content-type: application/json" \ + -d '{ + "kind":"telegram", + "status":"all" + }' +``` + +Admin remove (soft disable, fails when lease is active): + +```bash +curl -sS -X POST "/qa-credentials/v1/admin/remove" \ + -H "authorization: Bearer " \ + -H "content-type: application/json" \ + -d '{ + "credentialId":"", + "actorId":"local-maintainer" + }' +``` diff --git a/qa/convex-credential-broker/convex.json b/qa/convex-credential-broker/convex.json new file mode 100644 index 00000000000..1f7bdd40de5 --- /dev/null +++ b/qa/convex-credential-broker/convex.json @@ -0,0 +1,3 @@ +{ + "functions": "convex/" +} diff --git a/qa/convex-credential-broker/convex/credentials.ts b/qa/convex-credential-broker/convex/credentials.ts new file mode 100644 index 00000000000..b940e49e70f --- /dev/null +++ b/qa/convex-credential-broker/convex/credentials.ts @@ -0,0 +1,642 @@ +import { v } from "convex/values"; +import { internal } from "./_generated/api"; +import type { Id } from "./_generated/dataModel"; +import { internalMutation, internalQuery } from "./_generated/server"; + +const LEASE_EVENT_RETENTION_MS = 2 * 24 * 60 * 60 * 1_000; +const ADMIN_EVENT_RETENTION_MS = 30 * 24 * 60 * 60 * 1_000; +const EVENT_RETENTION_BATCH_SIZE = 256; +const MAX_HEARTBEAT_INTERVAL_MS = 5 * 60 * 1_000; +const MAX_LEASE_TTL_MS = 2 * 60 * 60 * 1_000; +const MIN_HEARTBEAT_INTERVAL_MS = 5_000; +const MIN_LEASE_TTL_MS = 30_000; +const MAX_LIST_LIMIT = 500; +const MIN_LIST_LIMIT = 1; + +const DEFAULT_HEARTBEAT_INTERVAL_MS = 30_000; +const DEFAULT_LEASE_TTL_MS = 20 * 60 * 1_000; +const DEFAULT_LIST_LIMIT = 100; +const POOL_EXHAUSTED_RETRY_AFTER_MS = 2_000; + +const actorRole = v.union(v.literal("ci"), v.literal("maintainer")); +const credentialStatus = v.union(v.literal("active"), v.literal("disabled")); +const listStatus = v.union(v.literal("active"), v.literal("disabled"), v.literal("all")); + +type ActorRole = "ci" | "maintainer"; +type CredentialStatus = "active" | "disabled"; +type ListStatus = CredentialStatus | "all"; +type LeaseEventType = "acquire" | "acquire_failed" | "release"; +type AdminEventType = "add" | "disable" | "disable_failed"; + +type BrokerErrorResult = { + status: "error"; + code: string; + message: string; + retryAfterMs?: number; +}; + +type BrokerOkResult = { + status: "ok"; +}; + +type CredentialLease = { + ownerId: string; + actorRole: ActorRole; + leaseToken: string; + acquiredAtMs: number; + heartbeatAtMs: number; + expiresAtMs: number; +}; + +type CredentialSetRecord = { + _id: Id<"credential_sets">; + kind: string; + status: CredentialStatus; + payload: unknown; + createdAtMs: number; + updatedAtMs: number; + lastLeasedAtMs: number; + note?: string; + lease?: CredentialLease; +}; + +type EventInsertCtx = { + db: { + insert: ( + table: "lease_events" | "admin_events", + value: Record, + ) => Promise; + }; +}; + +function normalizeIntervalMs(params: { + value: number | undefined; + fallback: number; + min: number; + max: number; +}) { + const value = params.value ?? params.fallback; + const rounded = Math.floor(value); + if (!Number.isFinite(rounded) || rounded < params.min || rounded > params.max) { + return null; + } + return rounded; +} + +function normalizeListLimit(value: number | undefined) { + const limit = value ?? DEFAULT_LIST_LIMIT; + const rounded = Math.floor(limit); + if (!Number.isFinite(rounded) || rounded < MIN_LIST_LIMIT || rounded > MAX_LIST_LIMIT) { + return null; + } + return rounded; +} + +function brokerError(code: string, message: string, retryAfterMs?: number): BrokerErrorResult { + return retryAfterMs && retryAfterMs > 0 + ? { + status: "error", + code, + message, + retryAfterMs, + } + : { + status: "error", + code, + message, + }; +} + +function leaseIsActive(lease: CredentialLease | undefined, nowMs: number) { + return Boolean(lease && lease.expiresAtMs > nowMs); +} + +function toCredentialSummary(row: CredentialSetRecord, includePayload: boolean) { + return { + credentialId: row._id, + kind: row.kind, + status: row.status, + createdAtMs: row.createdAtMs, + updatedAtMs: row.updatedAtMs, + lastLeasedAtMs: row.lastLeasedAtMs, + ...(row.note ? { note: row.note } : {}), + ...(row.lease + ? { + lease: { + ownerId: row.lease.ownerId, + actorRole: row.lease.actorRole, + acquiredAtMs: row.lease.acquiredAtMs, + heartbeatAtMs: row.lease.heartbeatAtMs, + expiresAtMs: row.lease.expiresAtMs, + }, + } + : {}), + ...(includePayload ? { payload: row.payload } : {}), + }; +} + +async function insertLeaseEvent(params: { + ctx: EventInsertCtx; + kind: string; + eventType: LeaseEventType; + actorRole: ActorRole; + ownerId: string; + occurredAtMs: number; + credentialId?: Id<"credential_sets">; + code?: string; + message?: string; +}) { + await params.ctx.db.insert("lease_events", { + kind: params.kind, + eventType: params.eventType, + actorRole: params.actorRole, + ownerId: params.ownerId, + occurredAtMs: params.occurredAtMs, + ...(params.credentialId ? { credentialId: params.credentialId } : {}), + ...(params.code ? { code: params.code } : {}), + ...(params.message ? { message: params.message } : {}), + }); +} + +async function insertAdminEvent(params: { + ctx: EventInsertCtx; + eventType: AdminEventType; + actorRole: ActorRole; + actorId: string; + occurredAtMs: number; + credentialId?: Id<"credential_sets">; + kind?: string; + code?: string; + message?: string; +}) { + await params.ctx.db.insert("admin_events", { + eventType: params.eventType, + actorRole: params.actorRole, + actorId: params.actorId, + occurredAtMs: params.occurredAtMs, + ...(params.credentialId ? { credentialId: params.credentialId } : {}), + ...(params.kind ? { kind: params.kind } : {}), + ...(params.code ? { code: params.code } : {}), + ...(params.message ? { message: params.message } : {}), + }); +} + +function sortByLeastRecentlyLeasedThenId( + rows: Array<{ + _id: Id<"credential_sets">; + lastLeasedAtMs: number; + }>, +) { + rows.sort((left, right) => { + if (left.lastLeasedAtMs !== right.lastLeasedAtMs) { + return left.lastLeasedAtMs - right.lastLeasedAtMs; + } + const leftId = String(left._id); + const rightId = String(right._id); + return leftId.localeCompare(rightId); + }); +} + +function sortCredentialRowsForList(rows: CredentialSetRecord[]) { + const statusRank: Record = { active: 0, disabled: 1 }; + rows.sort((left, right) => { + const kindCompare = left.kind.localeCompare(right.kind); + if (kindCompare !== 0) { + return kindCompare; + } + if (left.status !== right.status) { + return statusRank[left.status] - statusRank[right.status]; + } + if (left.updatedAtMs !== right.updatedAtMs) { + return right.updatedAtMs - left.updatedAtMs; + } + return String(left._id).localeCompare(String(right._id)); + }); +} + +function normalizeActorId(value: string | undefined) { + const normalized = value?.trim(); + return normalized && normalized.length > 0 ? normalized : "unknown"; +} + +export const acquireLease = internalMutation({ + args: { + kind: v.string(), + ownerId: v.string(), + actorRole, + leaseTtlMs: v.optional(v.number()), + heartbeatIntervalMs: v.optional(v.number()), + }, + handler: async (ctx, args) => { + const nowMs = Date.now(); + const leaseTtlMs = normalizeIntervalMs({ + value: args.leaseTtlMs, + fallback: DEFAULT_LEASE_TTL_MS, + min: MIN_LEASE_TTL_MS, + max: MAX_LEASE_TTL_MS, + }); + if (!leaseTtlMs) { + return brokerError( + "INVALID_LEASE_TTL", + `leaseTtlMs must be between ${MIN_LEASE_TTL_MS} and ${MAX_LEASE_TTL_MS}.`, + ); + } + const heartbeatIntervalMs = normalizeIntervalMs({ + value: args.heartbeatIntervalMs, + fallback: DEFAULT_HEARTBEAT_INTERVAL_MS, + min: MIN_HEARTBEAT_INTERVAL_MS, + max: MAX_HEARTBEAT_INTERVAL_MS, + }); + if (!heartbeatIntervalMs) { + return brokerError( + "INVALID_HEARTBEAT_INTERVAL", + `heartbeatIntervalMs must be between ${MIN_HEARTBEAT_INTERVAL_MS} and ${MAX_HEARTBEAT_INTERVAL_MS}.`, + ); + } + + const activeRows = (await ctx.db + .query("credential_sets") + .withIndex("by_kind_status", (q) => q.eq("kind", args.kind).eq("status", "active")) + .collect()) as CredentialSetRecord[]; + + const availableRows = activeRows.filter((row) => !leaseIsActive(row.lease, nowMs)); + + if (availableRows.length === 0) { + await insertLeaseEvent({ + ctx, + kind: args.kind, + eventType: "acquire_failed", + actorRole: args.actorRole, + ownerId: args.ownerId, + occurredAtMs: nowMs, + code: "POOL_EXHAUSTED", + message: "No active credential in this kind is currently available.", + }); + return brokerError( + "POOL_EXHAUSTED", + `No available credential for kind "${args.kind}".`, + POOL_EXHAUSTED_RETRY_AFTER_MS, + ); + } + + sortByLeastRecentlyLeasedThenId(availableRows); + const selected = availableRows[0]; + const leaseToken = crypto.randomUUID(); + + await ctx.db.patch(selected._id, { + lease: { + ownerId: args.ownerId, + actorRole: args.actorRole, + leaseToken, + acquiredAtMs: nowMs, + heartbeatAtMs: nowMs, + expiresAtMs: nowMs + leaseTtlMs, + }, + lastLeasedAtMs: nowMs, + updatedAtMs: nowMs, + }); + + await insertLeaseEvent({ + ctx, + kind: args.kind, + eventType: "acquire", + actorRole: args.actorRole, + ownerId: args.ownerId, + occurredAtMs: nowMs, + credentialId: selected._id, + }); + + return { + status: "ok", + credentialId: selected._id, + leaseToken, + payload: selected.payload, + leaseTtlMs, + heartbeatIntervalMs, + }; + }, +}); + +export const heartbeatLease = internalMutation({ + args: { + kind: v.string(), + ownerId: v.string(), + actorRole, + credentialId: v.id("credential_sets"), + leaseToken: v.string(), + leaseTtlMs: v.optional(v.number()), + }, + handler: async (ctx, args): Promise => { + const nowMs = Date.now(); + const leaseTtlMs = normalizeIntervalMs({ + value: args.leaseTtlMs, + fallback: DEFAULT_LEASE_TTL_MS, + min: MIN_LEASE_TTL_MS, + max: MAX_LEASE_TTL_MS, + }); + if (!leaseTtlMs) { + return brokerError( + "INVALID_LEASE_TTL", + `leaseTtlMs must be between ${MIN_LEASE_TTL_MS} and ${MAX_LEASE_TTL_MS}.`, + ); + } + + const row = (await ctx.db.get(args.credentialId)) as CredentialSetRecord | null; + if (!row) { + return brokerError("CREDENTIAL_NOT_FOUND", "Credential record does not exist."); + } + if (row.kind !== args.kind) { + return brokerError("KIND_MISMATCH", "Credential kind did not match this lease heartbeat."); + } + if (row.status !== "active") { + return brokerError( + "CREDENTIAL_DISABLED", + "Credential is disabled and cannot be heartbeated.", + ); + } + if (!row.lease) { + return brokerError("LEASE_NOT_FOUND", "Credential is not currently leased."); + } + if (row.lease.ownerId !== args.ownerId || row.lease.leaseToken !== args.leaseToken) { + return brokerError("LEASE_NOT_OWNER", "Credential lease owner/token mismatch."); + } + if (row.lease.expiresAtMs < nowMs) { + return brokerError("LEASE_EXPIRED", "Credential lease has already expired."); + } + + await ctx.db.patch(args.credentialId, { + lease: { + ...row.lease, + heartbeatAtMs: nowMs, + expiresAtMs: nowMs + leaseTtlMs, + }, + updatedAtMs: nowMs, + }); + + return { status: "ok" }; + }, +}); + +export const releaseLease = internalMutation({ + args: { + kind: v.string(), + ownerId: v.string(), + actorRole, + credentialId: v.id("credential_sets"), + leaseToken: v.string(), + }, + handler: async (ctx, args): Promise => { + const nowMs = Date.now(); + const row = (await ctx.db.get(args.credentialId)) as CredentialSetRecord | null; + if (!row) { + return brokerError("CREDENTIAL_NOT_FOUND", "Credential record does not exist."); + } + if (row.kind !== args.kind) { + return brokerError("KIND_MISMATCH", "Credential kind did not match this lease release."); + } + if (!row.lease) { + return { status: "ok" }; + } + if (row.lease.ownerId !== args.ownerId || row.lease.leaseToken !== args.leaseToken) { + return brokerError("LEASE_NOT_OWNER", "Credential lease owner/token mismatch."); + } + + await ctx.db.patch(args.credentialId, { + lease: undefined, + updatedAtMs: nowMs, + }); + await insertLeaseEvent({ + ctx, + kind: args.kind, + eventType: "release", + actorRole: args.actorRole, + ownerId: args.ownerId, + occurredAtMs: nowMs, + credentialId: args.credentialId, + }); + return { status: "ok" }; + }, +}); + +export const addCredentialSet = internalMutation({ + args: { + kind: v.string(), + payload: v.any(), + note: v.optional(v.string()), + actorId: v.optional(v.string()), + status: v.optional(credentialStatus), + }, + handler: async (ctx, args) => { + const nowMs = Date.now(); + const actorId = normalizeActorId(args.actorId); + const status = args.status ?? "active"; + const note = args.note?.trim(); + const credentialId = await ctx.db.insert("credential_sets", { + kind: args.kind, + status, + payload: args.payload, + createdAtMs: nowMs, + updatedAtMs: nowMs, + lastLeasedAtMs: 0, + ...(note ? { note } : {}), + }); + + await insertAdminEvent({ + ctx, + eventType: "add", + actorRole: "maintainer", + actorId, + occurredAtMs: nowMs, + credentialId, + kind: args.kind, + }); + + const created: CredentialSetRecord = { + _id: credentialId, + kind: args.kind, + status, + payload: args.payload, + createdAtMs: nowMs, + updatedAtMs: nowMs, + lastLeasedAtMs: 0, + ...(note ? { note } : {}), + }; + return { + status: "ok", + credential: toCredentialSummary(created, false), + }; + }, +}); + +export const disableCredentialSet = internalMutation({ + args: { + credentialId: v.id("credential_sets"), + actorId: v.optional(v.string()), + }, + handler: async (ctx, args) => { + const nowMs = Date.now(); + const actorId = normalizeActorId(args.actorId); + const row = (await ctx.db.get(args.credentialId)) as CredentialSetRecord | null; + if (!row) { + await insertAdminEvent({ + ctx, + eventType: "disable_failed", + actorRole: "maintainer", + actorId, + occurredAtMs: nowMs, + credentialId: args.credentialId, + code: "CREDENTIAL_NOT_FOUND", + message: "Credential record does not exist.", + }); + return brokerError("CREDENTIAL_NOT_FOUND", "Credential record does not exist."); + } + if (leaseIsActive(row.lease, nowMs)) { + await insertAdminEvent({ + ctx, + eventType: "disable_failed", + actorRole: "maintainer", + actorId, + occurredAtMs: nowMs, + credentialId: row._id, + kind: row.kind, + code: "LEASE_ACTIVE", + message: "Credential is currently leased and cannot be disabled yet.", + }); + return brokerError("LEASE_ACTIVE", "Credential is currently leased and cannot be disabled."); + } + if (row.status === "disabled") { + return { + status: "ok", + changed: false, + credential: toCredentialSummary(row, false), + }; + } + + await ctx.db.patch(args.credentialId, { + status: "disabled", + lease: undefined, + updatedAtMs: nowMs, + }); + + await insertAdminEvent({ + ctx, + eventType: "disable", + actorRole: "maintainer", + actorId, + occurredAtMs: nowMs, + credentialId: row._id, + kind: row.kind, + }); + + const updated: CredentialSetRecord = { + ...row, + status: "disabled", + lease: undefined, + updatedAtMs: nowMs, + }; + return { + status: "ok", + changed: true, + credential: toCredentialSummary(updated, false), + }; + }, +}); + +export const listCredentialSets = internalQuery({ + args: { + kind: v.optional(v.string()), + status: v.optional(listStatus), + includePayload: v.optional(v.boolean()), + limit: v.optional(v.number()), + }, + handler: async (ctx, args) => { + const normalizedStatus: ListStatus = args.status ?? "all"; + const includePayload = args.includePayload === true; + const limit = normalizeListLimit(args.limit); + if (!limit) { + return brokerError( + "INVALID_LIST_LIMIT", + `limit must be between ${MIN_LIST_LIMIT} and ${MAX_LIST_LIMIT}.`, + ); + } + + let rows: CredentialSetRecord[] = []; + const kind = args.kind?.trim(); + if (kind) { + if (normalizedStatus === "all") { + rows = (await ctx.db + .query("credential_sets") + .withIndex("by_kind_lastLeasedAtMs", (q) => q.eq("kind", kind)) + .collect()) as CredentialSetRecord[]; + } else { + rows = (await ctx.db + .query("credential_sets") + .withIndex("by_kind_status", (q) => q.eq("kind", kind).eq("status", normalizedStatus)) + .collect()) as CredentialSetRecord[]; + } + } else { + rows = (await ctx.db.query("credential_sets").collect()) as CredentialSetRecord[]; + if (normalizedStatus !== "all") { + rows = rows.filter((row) => row.status === normalizedStatus); + } + } + + sortCredentialRowsForList(rows); + const selected = rows.slice(0, limit); + return { + status: "ok", + credentials: selected.map((row) => toCredentialSummary(row, includePayload)), + count: selected.length, + }; + }, +}); + +export const cleanupLeaseEvents = internalMutation({ + args: {}, + handler: async (ctx) => { + const cutoffMs = Date.now() - LEASE_EVENT_RETENTION_MS; + const staleRows = await ctx.db + .query("lease_events") + .withIndex("by_occurredAtMs", (q) => q.lt("occurredAtMs", cutoffMs)) + .take(EVENT_RETENTION_BATCH_SIZE); + + for (const row of staleRows) { + await ctx.db.delete(row._id); + } + + if (staleRows.length === EVENT_RETENTION_BATCH_SIZE) { + await ctx.scheduler.runAfter(0, internal.credentials.cleanupLeaseEvents, {}); + } + + return { + status: "ok", + deleted: staleRows.length, + retentionMs: LEASE_EVENT_RETENTION_MS, + }; + }, +}); + +export const cleanupAdminEvents = internalMutation({ + args: {}, + handler: async (ctx) => { + const cutoffMs = Date.now() - ADMIN_EVENT_RETENTION_MS; + const staleRows = await ctx.db + .query("admin_events") + .withIndex("by_occurredAtMs", (q) => q.lt("occurredAtMs", cutoffMs)) + .take(EVENT_RETENTION_BATCH_SIZE); + + for (const row of staleRows) { + await ctx.db.delete(row._id); + } + + if (staleRows.length === EVENT_RETENTION_BATCH_SIZE) { + await ctx.scheduler.runAfter(0, internal.credentials.cleanupAdminEvents, {}); + } + + return { + status: "ok", + deleted: staleRows.length, + retentionMs: ADMIN_EVENT_RETENTION_MS, + }; + }, +}); diff --git a/qa/convex-credential-broker/convex/crons.ts b/qa/convex-credential-broker/convex/crons.ts new file mode 100644 index 00000000000..739e7de6a00 --- /dev/null +++ b/qa/convex-credential-broker/convex/crons.ts @@ -0,0 +1,20 @@ +import { cronJobs } from "convex/server"; +import { internal } from "./_generated/api"; + +const crons = cronJobs(); + +crons.interval( + "qa-credential-lease-event-retention", + { hours: 1 }, + internal.credentials.cleanupLeaseEvents, + {}, +); + +crons.interval( + "qa-credential-admin-event-retention", + { hours: 1 }, + internal.credentials.cleanupAdminEvents, + {}, +); + +export default crons; diff --git a/qa/convex-credential-broker/convex/http.ts b/qa/convex-credential-broker/convex/http.ts new file mode 100644 index 00000000000..34773686c4f --- /dev/null +++ b/qa/convex-credential-broker/convex/http.ts @@ -0,0 +1,457 @@ +import { httpRouter } from "convex/server"; +import { internal } from "./_generated/api"; +import type { Id } from "./_generated/dataModel"; +import { httpAction } from "./_generated/server"; + +type ActorRole = "ci" | "maintainer"; + +class BrokerHttpError extends Error { + code: string; + httpStatus: number; + + constructor(httpStatus: number, code: string, message: string) { + super(message); + this.name = "BrokerHttpError"; + this.httpStatus = httpStatus; + this.code = code; + } +} + +function jsonResponse(status: number, payload: unknown) { + return new Response(JSON.stringify(payload), { + status, + headers: { + "content-type": "application/json; charset=utf-8", + "cache-control": "no-store", + }, + }); +} + +function parseBearerToken(request: Request) { + const header = request.headers.get("authorization")?.trim(); + if (!header) { + return null; + } + const [scheme, token] = header.split(/\s+/u, 2); + if (scheme?.toLowerCase() !== "bearer" || !token) { + return null; + } + return token; +} + +function resolveAuthRole(token: string | null): ActorRole { + if (!token) { + throw new BrokerHttpError( + 401, + "AUTH_REQUIRED", + "Missing Authorization: Bearer header.", + ); + } + const maintainerSecret = process.env.OPENCLAW_QA_CONVEX_SECRET_MAINTAINER?.trim(); + const ciSecret = process.env.OPENCLAW_QA_CONVEX_SECRET_CI?.trim(); + + if (!maintainerSecret && !ciSecret) { + throw new BrokerHttpError( + 500, + "SERVER_MISCONFIGURED", + "No Convex broker role secrets are configured on this deployment.", + ); + } + if (maintainerSecret && token === maintainerSecret) { + return "maintainer"; + } + if (ciSecret && token === ciSecret) { + return "ci"; + } + throw new BrokerHttpError(401, "AUTH_INVALID", "Credential broker secret is invalid."); +} + +function assertMaintainerAdminAuth(token: string | null) { + if (!token) { + throw new BrokerHttpError( + 401, + "AUTH_REQUIRED", + "Missing Authorization: Bearer header.", + ); + } + const maintainerSecret = process.env.OPENCLAW_QA_CONVEX_SECRET_MAINTAINER?.trim(); + if (!maintainerSecret) { + throw new BrokerHttpError( + 500, + "SERVER_MISCONFIGURED", + "Admin endpoints require OPENCLAW_QA_CONVEX_SECRET_MAINTAINER on this deployment.", + ); + } + if (token === maintainerSecret) { + return; + } + const ciSecret = process.env.OPENCLAW_QA_CONVEX_SECRET_CI?.trim(); + if (ciSecret && token === ciSecret) { + throw new BrokerHttpError( + 403, + "AUTH_ROLE_MISMATCH", + "Admin endpoints require maintainer credentials.", + ); + } + throw new BrokerHttpError(401, "AUTH_INVALID", "Credential broker secret is invalid."); +} + +function asObject(value: unknown) { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return null; + } + return value as Record; +} + +async function parseJsonObject(request: Request) { + let parsed: unknown; + try { + parsed = await request.json(); + } catch { + throw new BrokerHttpError(400, "INVALID_JSON", "Request body must be valid JSON."); + } + const body = asObject(parsed); + if (!body) { + throw new BrokerHttpError(400, "INVALID_BODY", "Request body must be a JSON object."); + } + return body; +} + +function requireString(body: Record, key: string) { + const raw = body[key]; + if (typeof raw !== "string") { + throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be a string.`); + } + const value = raw.trim(); + if (!value) { + throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be non-empty.`); + } + return value; +} + +function optionalString(body: Record, key: string) { + if (!(key in body) || body[key] === undefined || body[key] === null) { + return undefined; + } + const raw = body[key]; + if (typeof raw !== "string") { + throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be a string.`); + } + const value = raw.trim(); + return value.length > 0 ? value : undefined; +} + +function requireObject(body: Record, key: string) { + const raw = body[key]; + const parsed = asObject(raw); + if (!parsed) { + throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be a JSON object.`); + } + return parsed; +} + +function optionalPositiveInteger(body: Record, key: string) { + if (!(key in body) || body[key] === undefined || body[key] === null) { + return undefined; + } + const raw = body[key]; + if (typeof raw !== "number" || !Number.isFinite(raw) || !Number.isInteger(raw) || raw < 1) { + throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be a positive integer.`); + } + return raw; +} + +function optionalBoolean(body: Record, key: string) { + if (!(key in body) || body[key] === undefined || body[key] === null) { + return undefined; + } + if (typeof body[key] !== "boolean") { + throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be a boolean.`); + } + return body[key]; +} + +function optionalCredentialStatus(body: Record, key: string) { + const value = optionalString(body, key); + if (!value) { + return undefined; + } + if (value !== "active" && value !== "disabled") { + throw new BrokerHttpError( + 400, + "INVALID_BODY", + `Expected "${key}" to be "active" or "disabled".`, + ); + } + return value; +} + +function optionalListStatus(body: Record, key: string) { + const value = optionalString(body, key); + if (!value) { + return undefined; + } + if (value !== "active" && value !== "disabled" && value !== "all") { + throw new BrokerHttpError( + 400, + "INVALID_BODY", + `Expected "${key}" to be "active", "disabled", or "all".`, + ); + } + return value; +} + +function requirePayloadString(payload: Record, key: string, kind: string): string { + const raw = payload[key]; + if (typeof raw !== "string") { + throw new BrokerHttpError( + 400, + "INVALID_PAYLOAD", + `Credential payload for kind "${kind}" must include "${key}" as a string.`, + ); + } + const value = raw.trim(); + if (!value) { + throw new BrokerHttpError( + 400, + "INVALID_PAYLOAD", + `Credential payload for kind "${kind}" must include a non-empty "${key}" value.`, + ); + } + return value; +} + +function normalizeCredentialPayloadForKind(kind: string, payload: Record) { + if (kind !== "telegram") { + return payload; + } + + const groupId = requirePayloadString(payload, "groupId", "telegram"); + if (!/^-?\d+$/u.test(groupId)) { + throw new BrokerHttpError( + 400, + "INVALID_PAYLOAD", + 'Credential payload for kind "telegram" must include a numeric "groupId" string.', + ); + } + + const driverToken = requirePayloadString(payload, "driverToken", "telegram"); + const sutToken = requirePayloadString(payload, "sutToken", "telegram"); + + return { + groupId, + driverToken, + sutToken, + } satisfies Record; +} + +function parseActorRole(body: Record) { + const actorRole = requireString(body, "actorRole"); + if (actorRole !== "ci" && actorRole !== "maintainer") { + throw new BrokerHttpError( + 400, + "INVALID_ACTOR_ROLE", + 'Expected "actorRole" to be "maintainer" or "ci".', + ); + } + return actorRole as ActorRole; +} + +function assertRoleAllowed(tokenRole: ActorRole, requestedRole: ActorRole) { + if (tokenRole !== requestedRole) { + throw new BrokerHttpError( + 403, + "AUTH_ROLE_MISMATCH", + `Secret role "${tokenRole}" cannot be used as actorRole "${requestedRole}".`, + ); + } +} + +function normalizeCredentialId(raw: string) { + // Convex Ids are opaque strings. We only enforce non-empty shape at HTTP boundary. + return raw; +} + +function normalizeError(error: unknown) { + if (error instanceof BrokerHttpError) { + return { + httpStatus: error.httpStatus, + payload: { + status: "error", + code: error.code, + message: error.message, + }, + }; + } + if (error instanceof Error) { + return { + httpStatus: 500, + payload: { + status: "error", + code: "INTERNAL_ERROR", + message: error.message || "Internal credential broker error.", + }, + }; + } + return { + httpStatus: 500, + payload: { + status: "error", + code: "INTERNAL_ERROR", + message: "Internal credential broker error.", + }, + }; +} + +const http = httpRouter(); + +http.route({ + path: "/qa-credentials/v1/acquire", + method: "POST", + handler: httpAction(async (ctx, request) => { + try { + const tokenRole = resolveAuthRole(parseBearerToken(request)); + const body = await parseJsonObject(request); + const actorRole = parseActorRole(body); + assertRoleAllowed(tokenRole, actorRole); + + const result = await ctx.runMutation(internal.credentials.acquireLease, { + kind: requireString(body, "kind"), + ownerId: requireString(body, "ownerId"), + actorRole, + leaseTtlMs: optionalPositiveInteger(body, "leaseTtlMs"), + heartbeatIntervalMs: optionalPositiveInteger(body, "heartbeatIntervalMs"), + }); + + return jsonResponse(200, result); + } catch (error) { + const normalized = normalizeError(error); + return jsonResponse(normalized.httpStatus, normalized.payload); + } + }), +}); + +http.route({ + path: "/qa-credentials/v1/heartbeat", + method: "POST", + handler: httpAction(async (ctx, request) => { + try { + const tokenRole = resolveAuthRole(parseBearerToken(request)); + const body = await parseJsonObject(request); + const actorRole = parseActorRole(body); + assertRoleAllowed(tokenRole, actorRole); + + const result = await ctx.runMutation(internal.credentials.heartbeatLease, { + kind: requireString(body, "kind"), + ownerId: requireString(body, "ownerId"), + actorRole, + credentialId: normalizeCredentialId( + requireString(body, "credentialId"), + ) as Id<"credential_sets">, + leaseToken: requireString(body, "leaseToken"), + leaseTtlMs: optionalPositiveInteger(body, "leaseTtlMs"), + }); + + return jsonResponse(200, result); + } catch (error) { + const normalized = normalizeError(error); + return jsonResponse(normalized.httpStatus, normalized.payload); + } + }), +}); + +http.route({ + path: "/qa-credentials/v1/release", + method: "POST", + handler: httpAction(async (ctx, request) => { + try { + const tokenRole = resolveAuthRole(parseBearerToken(request)); + const body = await parseJsonObject(request); + const actorRole = parseActorRole(body); + assertRoleAllowed(tokenRole, actorRole); + + const result = await ctx.runMutation(internal.credentials.releaseLease, { + kind: requireString(body, "kind"), + ownerId: requireString(body, "ownerId"), + actorRole, + credentialId: normalizeCredentialId( + requireString(body, "credentialId"), + ) as Id<"credential_sets">, + leaseToken: requireString(body, "leaseToken"), + }); + + return jsonResponse(200, result); + } catch (error) { + const normalized = normalizeError(error); + return jsonResponse(normalized.httpStatus, normalized.payload); + } + }), +}); + +http.route({ + path: "/qa-credentials/v1/admin/add", + method: "POST", + handler: httpAction(async (ctx, request) => { + try { + assertMaintainerAdminAuth(parseBearerToken(request)); + const body = await parseJsonObject(request); + const kind = requireString(body, "kind"); + const payload = normalizeCredentialPayloadForKind(kind, requireObject(body, "payload")); + const result = await ctx.runMutation(internal.credentials.addCredentialSet, { + kind, + payload, + note: optionalString(body, "note"), + actorId: optionalString(body, "actorId"), + status: optionalCredentialStatus(body, "status"), + }); + return jsonResponse(200, result); + } catch (error) { + const normalized = normalizeError(error); + return jsonResponse(normalized.httpStatus, normalized.payload); + } + }), +}); + +http.route({ + path: "/qa-credentials/v1/admin/remove", + method: "POST", + handler: httpAction(async (ctx, request) => { + try { + assertMaintainerAdminAuth(parseBearerToken(request)); + const body = await parseJsonObject(request); + const result = await ctx.runMutation(internal.credentials.disableCredentialSet, { + credentialId: normalizeCredentialId( + requireString(body, "credentialId"), + ) as Id<"credential_sets">, + actorId: optionalString(body, "actorId"), + }); + return jsonResponse(200, result); + } catch (error) { + const normalized = normalizeError(error); + return jsonResponse(normalized.httpStatus, normalized.payload); + } + }), +}); + +http.route({ + path: "/qa-credentials/v1/admin/list", + method: "POST", + handler: httpAction(async (ctx, request) => { + try { + assertMaintainerAdminAuth(parseBearerToken(request)); + const body = await parseJsonObject(request); + const result = await ctx.runQuery(internal.credentials.listCredentialSets, { + kind: optionalString(body, "kind"), + status: optionalListStatus(body, "status"), + includePayload: optionalBoolean(body, "includePayload"), + limit: optionalPositiveInteger(body, "limit"), + }); + return jsonResponse(200, result); + } catch (error) { + const normalized = normalizeError(error); + return jsonResponse(normalized.httpStatus, normalized.payload); + } + }), +}); + +export default http; diff --git a/qa/convex-credential-broker/convex/schema.ts b/qa/convex-credential-broker/convex/schema.ts new file mode 100644 index 00000000000..444b3182c36 --- /dev/null +++ b/qa/convex-credential-broker/convex/schema.ts @@ -0,0 +1,63 @@ +import { defineSchema, defineTable } from "convex/server"; +import { v } from "convex/values"; + +const actorRole = v.union(v.literal("ci"), v.literal("maintainer")); +const credentialStatus = v.union(v.literal("active"), v.literal("disabled")); +const leaseEventType = v.union( + v.literal("acquire"), + v.literal("acquire_failed"), + v.literal("release"), +); +const adminEventType = v.union(v.literal("add"), v.literal("disable"), v.literal("disable_failed")); + +export default defineSchema({ + credential_sets: defineTable({ + kind: v.string(), + status: credentialStatus, + payload: v.any(), + createdAtMs: v.number(), + updatedAtMs: v.number(), + lastLeasedAtMs: v.number(), + note: v.optional(v.string()), + lease: v.optional( + v.object({ + ownerId: v.string(), + actorRole, + leaseToken: v.string(), + acquiredAtMs: v.number(), + heartbeatAtMs: v.number(), + expiresAtMs: v.number(), + }), + ), + }) + .index("by_kind_status", ["kind", "status"]) + .index("by_kind_lastLeasedAtMs", ["kind", "lastLeasedAtMs"]), + + lease_events: defineTable({ + kind: v.string(), + eventType: leaseEventType, + actorRole, + ownerId: v.string(), + occurredAtMs: v.number(), + credentialId: v.optional(v.id("credential_sets")), + code: v.optional(v.string()), + message: v.optional(v.string()), + }) + .index("by_occurredAtMs", ["occurredAtMs"]) + .index("by_kind_occurredAtMs", ["kind", "occurredAtMs"]) + .index("by_credential_occurredAtMs", ["credentialId", "occurredAtMs"]), + + admin_events: defineTable({ + eventType: adminEventType, + actorRole, + actorId: v.string(), + occurredAtMs: v.number(), + credentialId: v.optional(v.id("credential_sets")), + kind: v.optional(v.string()), + code: v.optional(v.string()), + message: v.optional(v.string()), + }) + .index("by_occurredAtMs", ["occurredAtMs"]) + .index("by_kind_occurredAtMs", ["kind", "occurredAtMs"]) + .index("by_credential_occurredAtMs", ["credentialId", "occurredAtMs"]), +}); diff --git a/qa/convex-credential-broker/convex/tsconfig.json b/qa/convex-credential-broker/convex/tsconfig.json new file mode 100644 index 00000000000..41bfbb9b9af --- /dev/null +++ b/qa/convex-credential-broker/convex/tsconfig.json @@ -0,0 +1,25 @@ +{ + /* This TypeScript project config describes the environment that + * Convex functions run in and is used to typecheck them. + * You can modify it, but some settings are required to use Convex. + */ + "compilerOptions": { + /* These settings are not required by Convex and can be modified. */ + "allowJs": true, + "strict": true, + "moduleResolution": "Bundler", + "jsx": "react-jsx", + "skipLibCheck": true, + "allowSyntheticDefaultImports": true, + + /* These compiler options are required by Convex */ + "target": "ESNext", + "lib": ["ES2023", "dom"], + "forceConsistentCasingInFileNames": true, + "module": "ESNext", + "isolatedModules": true, + "noEmit": true + }, + "include": ["./**/*"], + "exclude": ["./_generated"] +} diff --git a/qa/convex-credential-broker/package.json b/qa/convex-credential-broker/package.json new file mode 100644 index 00000000000..9431da4012f --- /dev/null +++ b/qa/convex-credential-broker/package.json @@ -0,0 +1,15 @@ +{ + "name": "@openclaw/qa-convex-credential-broker", + "version": "0.1.0", + "private": true, + "description": "Convex HTTP credential lease broker for OpenClaw QA lab", + "type": "module", + "scripts": { + "dashboard": "convex dashboard", + "deploy": "convex deploy", + "dev": "convex dev" + }, + "dependencies": { + "convex": "^1.35.1" + } +}