feat(qa-lab): add Convex credential broker and admin CLI (#65596)

* QA Lab: add Convex credential source for Telegram lane

* QA Lab: scaffold Convex credential broker

* QA Lab: add Convex credential admin CLI

* QA Lab: harden Convex credential security paths

* QA Broker: validate Telegram payloads on admin add

* fix: note QA Convex credential broker in changelog (#65596) (thanks @joshavant)
This commit is contained in:
Josh Avant
2026-04-12 22:03:42 -05:00
committed by GitHub
parent 5da237c887
commit 3d07dfbb65
25 changed files with 3678 additions and 125 deletions

View File

@@ -6,6 +6,8 @@ Docs: https://docs.openclaw.ai
### Changes
- QA/lab: add Convex-backed pooled Telegram credential leasing plus `openclaw qa credentials` admin commands and broker setup docs. (#65596) Thanks @joshavant.
### Fixes
- Gateway/startup: defer scheduled services until sidecars finish, gate chat history and model listing during sidecar resume, and let Control UI retry startup-gated history loads so Sandbox wake resumes channels first. (#65365) Thanks @lml2468.

View File

@@ -69,10 +69,12 @@ These commands sit beside the main test suites when you need QA-lab realism:
- Runs the Matrix live QA lane against a disposable Docker-backed Tuwunel homeserver.
- Provisions three temporary Matrix users (`driver`, `sut`, `observer`) plus one private room, then starts a QA gateway child with the real Matrix plugin as the SUT transport.
- Uses the pinned stable Tuwunel image `ghcr.io/matrix-construct/tuwunel:v1.5.1` by default. Override with `OPENCLAW_QA_MATRIX_TUWUNEL_IMAGE` when you need to test a different image.
- Matrix currently supports only `--credential-source env` because the lane provisions disposable users locally.
- Writes a Matrix QA report, summary, and observed-events artifact under `.artifacts/qa-e2e/...`.
- `pnpm openclaw qa telegram`
- Runs the Telegram live QA lane against a real private group using the driver and SUT bot tokens from env.
- Requires `OPENCLAW_QA_TELEGRAM_GROUP_ID`, `OPENCLAW_QA_TELEGRAM_DRIVER_BOT_TOKEN`, and `OPENCLAW_QA_TELEGRAM_SUT_BOT_TOKEN`. The group id must be the numeric Telegram chat id.
- Supports `--credential-source convex` for shared pooled credentials. Use env mode by default, or set `OPENCLAW_QA_CREDENTIAL_SOURCE=convex` to opt into pooled leases.
- Requires two distinct bots in the same private group, with the SUT bot exposing a Telegram username.
- For stable bot-to-bot observation, enable Bot-to-Bot Communication Mode in `@BotFather` for both bots and ensure the driver bot can observe group bot traffic.
- Writes a Telegram QA report, summary, and observed-messages artifact under `.artifacts/qa-e2e/...`.
@@ -87,6 +89,80 @@ transport coverage matrix.
| Matrix | x | x | x | x | x | x | x | x | |
| Telegram | x | | | | | | | | x |
### Shared Telegram credentials via Convex (v1)
When `--credential-source convex` (or `OPENCLAW_QA_CREDENTIAL_SOURCE=convex`) is enabled for
`openclaw qa telegram`, QA lab acquires an exclusive lease from a Convex-backed pool, heartbeats
that lease while the lane is running, and releases the lease on shutdown.
Reference Convex project scaffold:
- `qa/convex-credential-broker/`
Required env vars:
- `OPENCLAW_QA_CONVEX_SITE_URL` (for example `https://your-deployment.convex.site`)
- One secret for the selected role:
- `OPENCLAW_QA_CONVEX_SECRET_MAINTAINER` for `maintainer`
- `OPENCLAW_QA_CONVEX_SECRET_CI` for `ci`
- Credential role selection:
- CLI: `--credential-role maintainer|ci`
- Env default: `OPENCLAW_QA_CREDENTIAL_ROLE` (defaults to `maintainer`)
Optional env vars:
- `OPENCLAW_QA_CREDENTIAL_LEASE_TTL_MS` (default `1200000`)
- `OPENCLAW_QA_CREDENTIAL_HEARTBEAT_INTERVAL_MS` (default `30000`)
- `OPENCLAW_QA_CREDENTIAL_ACQUIRE_TIMEOUT_MS` (default `90000`)
- `OPENCLAW_QA_CREDENTIAL_HTTP_TIMEOUT_MS` (default `15000`)
- `OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX` (default `/qa-credentials/v1`)
- `OPENCLAW_QA_CREDENTIAL_OWNER_ID` (optional trace id)
- `OPENCLAW_QA_ALLOW_INSECURE_HTTP=1` allows loopback `http://` Convex URLs for local-only development.
`OPENCLAW_QA_CONVEX_SITE_URL` should use `https://` in normal operation.
Maintainer admin commands (pool add/remove/list) require
`OPENCLAW_QA_CONVEX_SECRET_MAINTAINER` specifically.
CLI helpers for maintainers:
```bash
pnpm openclaw qa credentials add --kind telegram --payload-file qa/telegram-credential.json
pnpm openclaw qa credentials list --kind telegram
pnpm openclaw qa credentials remove --credential-id <credential-id>
```
Use `--json` for machine-readable output in scripts and CI utilities.
Default endpoint contract (`OPENCLAW_QA_CONVEX_SITE_URL` + `/qa-credentials/v1`):
- `POST /acquire`
- Request: `{ kind, ownerId, actorRole, leaseTtlMs, heartbeatIntervalMs }`
- Success: `{ status: "ok", credentialId, leaseToken, payload, leaseTtlMs?, heartbeatIntervalMs? }`
- Exhausted/retryable: `{ status: "error", code: "POOL_EXHAUSTED" | "NO_CREDENTIAL_AVAILABLE", ... }`
- `POST /heartbeat`
- Request: `{ kind, ownerId, actorRole, credentialId, leaseToken, leaseTtlMs }`
- Success: `{ status: "ok" }` (or empty `2xx`)
- `POST /release`
- Request: `{ kind, ownerId, actorRole, credentialId, leaseToken }`
- Success: `{ status: "ok" }` (or empty `2xx`)
- `POST /admin/add` (maintainer secret only)
- Request: `{ kind, actorId, payload, note?, status? }`
- Success: `{ status: "ok", credential }`
- `POST /admin/remove` (maintainer secret only)
- Request: `{ credentialId, actorId }`
- Success: `{ status: "ok", changed, credential }`
- Active lease guard: `{ status: "error", code: "LEASE_ACTIVE", ... }`
- `POST /admin/list` (maintainer secret only)
- Request: `{ kind?, status?, includePayload?, limit? }`
- Success: `{ status: "ok", credentials, count }`
Payload shape for Telegram kind:
- `{ groupId: string, driverToken: string, sutToken: string }`
- `groupId` must be a numeric Telegram chat id string.
- `admin/add` validates this shape for `kind: "telegram"` and rejects malformed payloads.
### Adding a channel to QA
Adding a channel to the markdown QA system requires exactly two things:

View File

@@ -1,5 +1,6 @@
import fs from "node:fs/promises";
import path from "node:path";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import {
buildQaAgenticParityComparison,
renderQaAgenticParityMarkdownReport,
@@ -15,6 +16,13 @@ import { startQaLabServer } from "./lab-server.js";
import { runQaManualLane } from "./manual-lane.runtime.js";
import { startQaMockOpenAiServer } from "./mock-openai-server.js";
import { runQaMultipass } from "./multipass.runtime.js";
import {
addQaCredentialSet,
listQaCredentialSets,
QaCredentialAdminError,
removeQaCredentialSet,
type QaCredentialRecord,
} from "./qa-credentials-admin.runtime.js";
import { normalizeQaThinkingLevel, type QaThinkingLevel } from "./qa-gateway-config.js";
import { normalizeQaTransportId } from "./qa-transport-registry.js";
import {
@@ -117,6 +125,46 @@ function parseQaCliBackendAuthMode(value: string | undefined): QaCliBackendAuthM
throw new Error("--cli-auth-mode must be one of auto, api-key, subscription");
}
function parseQaCredentialListStatus(value: string | undefined) {
if (value === undefined) {
return undefined;
}
const normalized = value.trim().toLowerCase();
if (normalized === "active" || normalized === "disabled" || normalized === "all") {
return normalized;
}
throw new Error('--status must be one of "active", "disabled", or "all".');
}
function normalizeQaCredentialAdminError(error: unknown) {
if (error instanceof QaCredentialAdminError) {
return {
code: error.code,
message: error.message,
};
}
return {
code: "UNEXPECTED_ERROR",
message: formatErrorMessage(error),
};
}
function writeQaCredentialCommandErrorJson(action: string, error: unknown) {
const normalized = normalizeQaCredentialAdminError(error);
process.stdout.write(
`${JSON.stringify(
{
status: "error",
action,
code: normalized.code,
message: normalized.message,
},
null,
2,
)}\n`,
);
}
function parseQaModelSpecs(label: string, entries: readonly string[] | undefined) {
const models: string[] = [];
const optionsByModel: Record<string, QaCharacterModelOptions> = {};
@@ -198,6 +246,55 @@ async function runInterruptibleServer(label: string, server: InterruptibleServer
await new Promise(() => undefined);
}
async function readQaCredentialPayloadFile(filePath: string) {
const text = await fs.readFile(filePath, "utf8");
let payload: unknown;
try {
payload = JSON.parse(text) as unknown;
} catch (error) {
throw new Error(`Payload file must contain valid JSON: ${formatErrorMessage(error)}`, {
cause: error,
});
}
if (!payload || typeof payload !== "object" || Array.isArray(payload)) {
throw new Error("Payload file JSON must be an object.");
}
return payload as Record<string, unknown>;
}
function formatQaCredentialLeaseState(credential: QaCredentialRecord) {
if (!credential.lease) {
return "no";
}
return `yes(${credential.lease.actorRole}:${credential.lease.ownerId})`;
}
function printQaCredentialListTable(credentials: QaCredentialRecord[]) {
if (credentials.length === 0) {
process.stdout.write("No credentials matched.\n");
return;
}
const rows = credentials.map((credential) => ({
credentialId: credential.credentialId,
kind: credential.kind,
status: credential.status,
leased: formatQaCredentialLeaseState(credential),
note: credential.note ?? "",
}));
const idWidth = Math.max("credentialId".length, ...rows.map((row) => row.credentialId.length));
const kindWidth = Math.max("kind".length, ...rows.map((row) => row.kind.length));
const statusWidth = Math.max("status".length, ...rows.map((row) => row.status.length));
const leaseWidth = Math.max("leased".length, ...rows.map((row) => row.leased.length));
process.stdout.write(
`${"credentialId".padEnd(idWidth)} ${"kind".padEnd(kindWidth)} ${"status".padEnd(statusWidth)} ${"leased".padEnd(leaseWidth)} note\n`,
);
for (const row of rows) {
process.stdout.write(
`${row.credentialId.padEnd(idWidth)} ${row.kind.padEnd(kindWidth)} ${row.status.padEnd(statusWidth)} ${row.leased.padEnd(leaseWidth)} ${row.note}\n`,
);
}
}
export async function runQaLabSelfCheckCommand(opts: { repoRoot?: string; output?: string }) {
const repoRoot = path.resolve(opts.repoRoot ?? process.cwd());
const server = await startQaLabServer({
@@ -411,6 +508,148 @@ export async function runQaManualLaneCommand(opts: {
process.stdout.write("\n");
}
export async function runQaCredentialsAddCommand(opts: {
actorId?: string;
endpointPrefix?: string;
json?: boolean;
kind: string;
note?: string;
payloadFile: string;
repoRoot?: string;
siteUrl?: string;
}) {
const repoRoot = path.resolve(opts.repoRoot ?? process.cwd());
try {
const payloadPath = path.resolve(repoRoot, opts.payloadFile);
const payload = await readQaCredentialPayloadFile(payloadPath);
const result = await addQaCredentialSet({
kind: opts.kind,
payload,
note: opts.note,
actorId: opts.actorId,
siteUrl: opts.siteUrl,
endpointPrefix: opts.endpointPrefix,
});
if (opts.json) {
process.stdout.write(
`${JSON.stringify({ status: "ok", action: "add", credential: result.credential }, null, 2)}\n`,
);
return;
}
process.stdout.write(`QA credential added: ${result.credential.credentialId}\n`);
process.stdout.write(`Kind: ${result.credential.kind}\n`);
process.stdout.write(`Status: ${result.credential.status}\n`);
if (result.credential.note) {
process.stdout.write(`Note: ${result.credential.note}\n`);
}
} catch (error) {
if (opts.json) {
writeQaCredentialCommandErrorJson("add", error);
process.exitCode = 1;
return;
}
throw error;
}
}
export async function runQaCredentialsRemoveCommand(opts: {
actorId?: string;
credentialId: string;
endpointPrefix?: string;
json?: boolean;
siteUrl?: string;
}) {
try {
const result = await removeQaCredentialSet({
credentialId: opts.credentialId,
actorId: opts.actorId,
siteUrl: opts.siteUrl,
endpointPrefix: opts.endpointPrefix,
});
if (opts.json) {
process.stdout.write(
`${JSON.stringify(
{
status: "ok",
action: "remove",
changed: result.changed,
credential: result.credential,
},
null,
2,
)}\n`,
);
return;
}
process.stdout.write(
result.changed
? `QA credential removed (disabled): ${result.credential.credentialId}\n`
: `QA credential already disabled: ${result.credential.credentialId}\n`,
);
} catch (error) {
if (opts.json) {
writeQaCredentialCommandErrorJson("remove", error);
process.exitCode = 1;
return;
}
throw error;
}
}
export async function runQaCredentialsListCommand(opts: {
actorId?: string;
endpointPrefix?: string;
json?: boolean;
kind?: string;
limit?: number;
showSecrets?: boolean;
siteUrl?: string;
status?: string;
}) {
try {
const result = await listQaCredentialSets({
actorId: opts.actorId,
siteUrl: opts.siteUrl,
endpointPrefix: opts.endpointPrefix,
kind: opts.kind?.trim(),
status: parseQaCredentialListStatus(opts.status),
includePayload: opts.showSecrets,
limit: parseQaPositiveIntegerOption("--limit", opts.limit),
});
if (opts.json) {
process.stdout.write(
`${JSON.stringify(
{
status: "ok",
action: "list",
count: result.credentials.length,
credentials: result.credentials,
},
null,
2,
)}\n`,
);
return;
}
printQaCredentialListTable(result.credentials);
if (opts.showSecrets && result.credentials.length > 0) {
process.stdout.write("\nPayloads:\n");
for (const credential of result.credentials) {
process.stdout.write(
`${credential.credentialId}: ${JSON.stringify(credential.payload ?? null)}\n`,
);
}
}
} catch (error) {
if (opts.json) {
writeQaCredentialCommandErrorJson("list", error);
process.exitCode = 1;
return;
}
throw error;
}
}
export async function runQaLabUiCommand(opts: {
repoRoot?: string;
host?: string;

View File

@@ -1,7 +1,16 @@
import { Command } from "commander";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const { runQaMatrixCommand, runQaTelegramCommand } = vi.hoisted(() => ({
const {
runQaCredentialsAddCommand,
runQaCredentialsListCommand,
runQaCredentialsRemoveCommand,
runQaMatrixCommand,
runQaTelegramCommand,
} = vi.hoisted(() => ({
runQaCredentialsAddCommand: vi.fn(),
runQaCredentialsListCommand: vi.fn(),
runQaCredentialsRemoveCommand: vi.fn(),
runQaMatrixCommand: vi.fn(),
runQaTelegramCommand: vi.fn(),
}));
@@ -14,6 +23,12 @@ vi.mock("./live-transports/telegram/cli.runtime.js", () => ({
runQaTelegramCommand,
}));
vi.mock("./cli.runtime.js", () => ({
runQaCredentialsAddCommand,
runQaCredentialsListCommand,
runQaCredentialsRemoveCommand,
}));
import { registerQaLabCli } from "./cli.js";
describe("qa cli registration", () => {
@@ -22,6 +37,9 @@ describe("qa cli registration", () => {
beforeEach(() => {
program = new Command();
registerQaLabCli(program);
runQaCredentialsAddCommand.mockReset();
runQaCredentialsListCommand.mockReset();
runQaCredentialsRemoveCommand.mockReset();
runQaMatrixCommand.mockReset();
runQaTelegramCommand.mockReset();
});
@@ -34,7 +52,7 @@ describe("qa cli registration", () => {
const qa = program.commands.find((command) => command.name() === "qa");
expect(qa).toBeDefined();
expect(qa?.commands.map((command) => command.name())).toEqual(
expect.arrayContaining(["matrix", "telegram"]),
expect.arrayContaining(["matrix", "telegram", "credentials"]),
);
});
@@ -72,6 +90,8 @@ describe("qa cli registration", () => {
fastMode: true,
scenarioIds: ["matrix-thread-follow-up", "matrix-thread-isolation"],
sutAccountId: "sut-live",
credentialSource: undefined,
credentialRole: undefined,
});
});
@@ -87,6 +107,92 @@ describe("qa cli registration", () => {
fastMode: false,
scenarioIds: [],
sutAccountId: "sut",
credentialSource: undefined,
credentialRole: undefined,
});
});
it("routes credential add flags into the qa runtime command", async () => {
await program.parseAsync([
"node",
"openclaw",
"qa",
"credentials",
"add",
"--kind",
"telegram",
"--payload-file",
"qa/payload.json",
"--repo-root",
"/tmp/openclaw-repo",
"--note",
"shared lane",
"--site-url",
"https://first-schnauzer-821.convex.site",
"--endpoint-prefix",
"/qa-credentials/v1",
"--actor-id",
"maintainer-local",
"--json",
]);
expect(runQaCredentialsAddCommand).toHaveBeenCalledWith({
kind: "telegram",
payloadFile: "qa/payload.json",
repoRoot: "/tmp/openclaw-repo",
note: "shared lane",
siteUrl: "https://first-schnauzer-821.convex.site",
endpointPrefix: "/qa-credentials/v1",
actorId: "maintainer-local",
json: true,
});
});
it("routes credential remove flags into the qa runtime command", async () => {
await program.parseAsync([
"node",
"openclaw",
"qa",
"credentials",
"remove",
"--credential-id",
"j57b8k419ba7bcsfw99rg05c9184p8br",
"--site-url",
"https://first-schnauzer-821.convex.site",
"--actor-id",
"maintainer-local",
"--json",
]);
expect(runQaCredentialsRemoveCommand).toHaveBeenCalledWith({
credentialId: "j57b8k419ba7bcsfw99rg05c9184p8br",
siteUrl: "https://first-schnauzer-821.convex.site",
actorId: "maintainer-local",
endpointPrefix: undefined,
json: true,
});
});
it("routes credential list defaults into the qa runtime command", async () => {
await program.parseAsync([
"node",
"openclaw",
"qa",
"credentials",
"list",
"--kind",
"telegram",
]);
expect(runQaCredentialsListCommand).toHaveBeenCalledWith({
kind: "telegram",
status: "all",
limit: undefined,
showSecrets: false,
siteUrl: undefined,
endpointPrefix: undefined,
actorId: undefined,
json: false,
});
});
});

View File

@@ -83,6 +83,45 @@ async function runQaManualLane(opts: {
await runtime.runQaManualLaneCommand(opts);
}
async function runQaCredentialsAdd(opts: {
actorId?: string;
endpointPrefix?: string;
json?: boolean;
kind: string;
note?: string;
payloadFile: string;
repoRoot?: string;
siteUrl?: string;
}) {
const runtime = await loadQaLabCliRuntime();
await runtime.runQaCredentialsAddCommand(opts);
}
async function runQaCredentialsRemove(opts: {
actorId?: string;
credentialId: string;
endpointPrefix?: string;
json?: boolean;
siteUrl?: string;
}) {
const runtime = await loadQaLabCliRuntime();
await runtime.runQaCredentialsRemoveCommand(opts);
}
async function runQaCredentialsList(opts: {
actorId?: string;
endpointPrefix?: string;
json?: boolean;
kind?: string;
limit?: number;
showSecrets?: boolean;
siteUrl?: string;
status?: string;
}) {
const runtime = await loadQaLabCliRuntime();
await runtime.runQaCredentialsListCommand(opts);
}
async function runQaUi(opts: {
repoRoot?: string;
host?: string;
@@ -347,6 +386,82 @@ export function registerQaLabCli(program: Command) {
},
);
const credentials = qa
.command("credentials")
.description("Manage pooled Convex live credentials used by QA lanes");
credentials
.command("add")
.description("Add one credential payload to the shared pool")
.requiredOption("--kind <kind>", "Credential kind (for Telegram v1, use telegram)")
.requiredOption("--payload-file <path>", "JSON object file containing the credential payload")
.option("--repo-root <path>", "Repository root for resolving relative payload-file paths")
.option("--note <text>", "Optional note stored with this credential row")
.option("--site-url <url>", "Override OPENCLAW_QA_CONVEX_SITE_URL")
.option("--endpoint-prefix <path>", "Override OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX")
.option("--actor-id <id>", "Optional admin actor id to include in broker audit events")
.option("--json", "Emit machine-readable JSON output", false)
.action(
async (opts: {
kind: string;
payloadFile: string;
repoRoot?: string;
note?: string;
siteUrl?: string;
endpointPrefix?: string;
actorId?: string;
json?: boolean;
}) => {
await runQaCredentialsAdd(opts);
},
);
credentials
.command("remove")
.description("Remove one credential from active use by disabling it")
.requiredOption("--credential-id <id>", "Credential row id from the Convex pool")
.option("--site-url <url>", "Override OPENCLAW_QA_CONVEX_SITE_URL")
.option("--endpoint-prefix <path>", "Override OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX")
.option("--actor-id <id>", "Optional admin actor id to include in broker audit events")
.option("--json", "Emit machine-readable JSON output", false)
.action(
async (opts: {
credentialId: string;
siteUrl?: string;
endpointPrefix?: string;
actorId?: string;
json?: boolean;
}) => {
await runQaCredentialsRemove(opts);
},
);
credentials
.command("list")
.description("List credential rows in the shared Convex pool")
.option("--kind <kind>", "Filter by credential kind")
.option("--status <status>", 'Filter by row status: "active", "disabled", or "all"', "all")
.option("--limit <count>", "Max rows to return", (value: string) => Number(value))
.option("--show-secrets", "Include credential payload JSON in output", false)
.option("--site-url <url>", "Override OPENCLAW_QA_CONVEX_SITE_URL")
.option("--endpoint-prefix <path>", "Override OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX")
.option("--actor-id <id>", "Optional admin actor id to include in broker audit events")
.option("--json", "Emit machine-readable JSON output", false)
.action(
async (opts: {
kind?: string;
status?: string;
limit?: number;
showSecrets?: boolean;
siteUrl?: string;
endpointPrefix?: string;
actorId?: string;
json?: boolean;
}) => {
await runQaCredentialsList(opts);
},
);
qa.command("ui")
.description("Start the private QA debugger UI and local QA bus")
.option("--repo-root <path>", "Repository root to target when running from a neutral cwd")

View File

@@ -0,0 +1,43 @@
import { describe, expect, it, vi } from "vitest";
const runMatrixQaLive = vi.hoisted(() => vi.fn());
vi.mock("./matrix-live.runtime.js", () => ({
runMatrixQaLive,
}));
import { runQaMatrixCommand } from "./cli.runtime.js";
describe("matrix qa cli runtime", () => {
it("rejects non-env credential sources for the disposable Matrix lane", async () => {
await expect(
runQaMatrixCommand({
credentialSource: "convex",
}),
).rejects.toThrow("Matrix QA currently supports only --credential-source env");
});
it("passes through default env credential source options", async () => {
runMatrixQaLive.mockResolvedValue({
reportPath: "/tmp/matrix-report.md",
summaryPath: "/tmp/matrix-summary.json",
observedEventsPath: "/tmp/matrix-events.json",
});
await runQaMatrixCommand({
repoRoot: "/tmp/openclaw",
outputDir: ".artifacts/qa-e2e/matrix",
providerMode: "mock-openai",
credentialSource: "env",
});
expect(runMatrixQaLive).toHaveBeenCalledWith(
expect.objectContaining({
repoRoot: "/tmp/openclaw",
outputDir: "/tmp/openclaw/.artifacts/qa-e2e/matrix",
providerMode: "mock-openai",
credentialSource: "env",
}),
);
});
});

View File

@@ -6,7 +6,15 @@ import {
import { runMatrixQaLive } from "./matrix-live.runtime.js";
export async function runQaMatrixCommand(opts: LiveTransportQaCommandOptions) {
const result = await runMatrixQaLive(resolveLiveTransportQaRunOptions(opts));
const runOptions = resolveLiveTransportQaRunOptions(opts);
const credentialSource = runOptions.credentialSource?.toLowerCase();
if (credentialSource && credentialSource !== "env") {
throw new Error(
"Matrix QA currently supports only --credential-source env (disposable local harness).",
);
}
const result = await runMatrixQaLive(runOptions);
printLiveTransportQaArtifacts("Matrix QA", {
report: result.reportPath,
summary: result.summaryPath,

View File

@@ -0,0 +1,272 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
acquireQaCredentialLease,
startQaCredentialLeaseHeartbeat,
} from "./credential-lease.runtime.js";
function jsonResponse(payload: unknown, status = 200) {
return new Response(JSON.stringify(payload), {
status,
headers: { "content-type": "application/json" },
});
}
describe("credential lease runtime", () => {
afterEach(() => {
vi.restoreAllMocks();
vi.useRealTimers();
});
it("uses env credentials by default", async () => {
const lease = await acquireQaCredentialLease({
kind: "telegram",
resolveEnvPayload: () => ({ groupId: "-100123", driverToken: "driver", sutToken: "sut" }),
parsePayload: () => {
throw new Error("should not parse convex payload in env mode");
},
env: {},
});
expect(lease.source).toBe("env");
expect(lease.payload).toEqual({
groupId: "-100123",
driverToken: "driver",
sutToken: "sut",
});
});
it("acquires, heartbeats, and releases convex credentials", async () => {
const fetchImpl = vi
.fn<typeof fetch>()
.mockResolvedValueOnce(
jsonResponse({
status: "ok",
credentialId: "cred-1",
leaseToken: "lease-1",
payload: { groupId: "-100123", driverToken: "driver", sutToken: "sut" },
leaseTtlMs: 1_200_000,
heartbeatIntervalMs: 30_000,
}),
)
.mockResolvedValueOnce(jsonResponse({ status: "ok" }))
.mockResolvedValueOnce(jsonResponse({ status: "ok" }));
const lease = await acquireQaCredentialLease({
kind: "telegram",
source: "convex",
role: "maintainer",
env: {
OPENCLAW_QA_CONVEX_SITE_URL: "https://qa-cred.example.convex.site",
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret",
},
fetchImpl,
resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }),
parsePayload: (payload) =>
payload as { groupId: string; driverToken: string; sutToken: string },
});
expect(lease.source).toBe("convex");
expect(lease.credentialId).toBe("cred-1");
expect(lease.payload.groupId).toBe("-100123");
await lease.heartbeat();
await lease.release();
expect(fetchImpl).toHaveBeenCalledTimes(3);
const firstCall = fetchImpl.mock.calls[0];
expect(firstCall?.[0]).toContain("/qa-credentials/v1/acquire");
const firstInit = firstCall?.[1];
const headers = firstInit?.headers as Record<string, string>;
expect(headers.authorization).toBe("Bearer maintainer-secret");
});
it("retries convex acquire while the pool is exhausted", async () => {
const fetchImpl = vi
.fn<typeof fetch>()
.mockResolvedValueOnce(
jsonResponse({
status: "error",
code: "POOL_EXHAUSTED",
message: "wait",
}),
)
.mockResolvedValueOnce(
jsonResponse({
status: "error",
code: "POOL_EXHAUSTED",
message: "wait",
}),
)
.mockResolvedValueOnce(
jsonResponse({
status: "ok",
credentialId: "cred-2",
leaseToken: "lease-2",
payload: { groupId: "-100456", driverToken: "driver-2", sutToken: "sut-2" },
}),
);
const sleeps: number[] = [];
let nowMs = 0;
const lease = await acquireQaCredentialLease({
kind: "telegram",
source: "convex",
env: {
OPENCLAW_QA_CONVEX_SITE_URL: "https://qa-cred.example.convex.site",
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret",
OPENCLAW_QA_CREDENTIAL_ACQUIRE_TIMEOUT_MS: "90000",
},
fetchImpl,
randomImpl: () => 0,
timeImpl: () => nowMs,
sleepImpl: async (ms) => {
sleeps.push(ms);
nowMs += ms;
},
resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }),
parsePayload: (payload) =>
payload as { groupId: string; driverToken: string; sutToken: string },
});
expect(lease.credentialId).toBe("cred-2");
expect(fetchImpl).toHaveBeenCalledTimes(3);
expect(sleeps.length).toBe(2);
expect(sleeps[0]).toBeGreaterThanOrEqual(100);
expect(sleeps[1]).toBeGreaterThan(sleeps[0] ?? 0);
});
it("rejects non-https convex site URLs unless local insecure opt-in is enabled", async () => {
await expect(
acquireQaCredentialLease({
kind: "telegram",
source: "convex",
env: {
OPENCLAW_QA_CONVEX_SITE_URL: "http://qa-cred.example.convex.site",
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret",
},
resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }),
parsePayload: (payload) =>
payload as { groupId: string; driverToken: string; sutToken: string },
}),
).rejects.toThrow("must use https://");
});
it("allows loopback http URLs when OPENCLAW_QA_ALLOW_INSECURE_HTTP is enabled", async () => {
const fetchImpl = vi.fn<typeof fetch>().mockResolvedValueOnce(
jsonResponse({
status: "ok",
credentialId: "cred-local",
leaseToken: "lease-local",
payload: { groupId: "-100123", driverToken: "driver", sutToken: "sut" },
}),
);
await acquireQaCredentialLease({
kind: "telegram",
source: "convex",
role: "maintainer",
env: {
OPENCLAW_QA_CONVEX_SITE_URL: "http://127.0.0.1:3210",
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret",
OPENCLAW_QA_ALLOW_INSECURE_HTTP: "1",
},
fetchImpl,
resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }),
parsePayload: (payload) =>
payload as { groupId: string; driverToken: string; sutToken: string },
});
const firstCall = fetchImpl.mock.calls[0];
expect(firstCall?.[0]).toBe("http://127.0.0.1:3210/qa-credentials/v1/acquire");
});
it("rejects unsafe endpoint prefix overrides", async () => {
await expect(
acquireQaCredentialLease({
kind: "telegram",
source: "convex",
env: {
OPENCLAW_QA_CONVEX_SITE_URL: "https://qa-cred.example.convex.site",
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret",
OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX: "//evil.example",
},
resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }),
parsePayload: (payload) =>
payload as { groupId: string; driverToken: string; sutToken: string },
}),
).rejects.toThrow("OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX must be an absolute path");
});
it("releases acquired lease when payload parsing fails", async () => {
const fetchImpl = vi
.fn<typeof fetch>()
.mockResolvedValueOnce(
jsonResponse({
status: "ok",
credentialId: "cred-parse-fail",
leaseToken: "lease-parse-fail",
payload: { broken: true },
}),
)
.mockResolvedValueOnce(jsonResponse({ status: "ok" }));
await expect(
acquireQaCredentialLease({
kind: "telegram",
source: "convex",
role: "maintainer",
env: {
OPENCLAW_QA_CONVEX_SITE_URL: "https://qa-cred.example.convex.site",
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maintainer-secret",
},
fetchImpl,
resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }),
parsePayload: () => {
throw new Error("bad payload shape");
},
}),
).rejects.toThrow("bad payload shape");
expect(fetchImpl).toHaveBeenCalledTimes(2);
expect(fetchImpl.mock.calls[1]?.[0]).toBe(
"https://qa-cred.example.convex.site/qa-credentials/v1/release",
);
});
it("fails convex mode when auth secret is missing", async () => {
await expect(
acquireQaCredentialLease({
kind: "telegram",
source: "convex",
role: "maintainer",
env: {
OPENCLAW_QA_CONVEX_SITE_URL: "https://qa-cred.example.convex.site",
},
resolveEnvPayload: () => ({ groupId: "-1", driverToken: "unused", sutToken: "unused" }),
parsePayload: (payload) =>
payload as { groupId: string; driverToken: string; sutToken: string },
}),
).rejects.toThrow("OPENCLAW_QA_CONVEX_SECRET_MAINTAINER");
});
it("captures heartbeat failures for fail-fast checks", async () => {
vi.useFakeTimers();
const heartbeat = startQaCredentialLeaseHeartbeat(
{
source: "convex",
kind: "telegram",
heartbeatIntervalMs: 50,
heartbeat: async () => {
throw new Error("heartbeat-down");
},
},
{ intervalMs: 50 },
);
await vi.advanceTimersByTimeAsync(55);
expect(heartbeat.getFailure()).toBeInstanceOf(Error);
expect(() => heartbeat.throwIfFailed()).toThrow("heartbeat-down");
await heartbeat.stop();
});
});

View File

@@ -0,0 +1,576 @@
import { randomUUID } from "node:crypto";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { z } from "zod";
const DEFAULT_ACQUIRE_TIMEOUT_MS = 90_000;
const DEFAULT_ENDPOINT_PREFIX = "/qa-credentials/v1";
const DEFAULT_HEARTBEAT_INTERVAL_MS = 30_000;
const DEFAULT_HTTP_TIMEOUT_MS = 15_000;
const DEFAULT_LEASE_TTL_MS = 20 * 60 * 1_000;
const ALLOW_INSECURE_HTTP_ENV_KEY = "OPENCLAW_QA_ALLOW_INSECURE_HTTP";
const RETRY_BACKOFF_MS = [500, 1_000, 2_000, 4_000, 5_000] as const;
const RETRYABLE_ACQUIRE_CODES = new Set(["POOL_EXHAUSTED", "NO_CREDENTIAL_AVAILABLE"]);
const convexAcquireSuccessSchema = z.object({
status: z.literal("ok"),
credentialId: z.string().min(1),
leaseToken: z.string().min(1),
payload: z.unknown(),
leaseTtlMs: z.number().int().positive().optional(),
heartbeatIntervalMs: z.number().int().positive().optional(),
});
const convexErrorSchema = z.object({
status: z.literal("error"),
code: z.string().min(1),
message: z.string().optional(),
retryAfterMs: z.number().int().positive().optional(),
});
const convexOkSchema = z.object({
status: z.literal("ok"),
});
type ConvexCredentialBrokerConfig = {
acquireTimeoutMs: number;
acquireUrl: string;
authToken: string;
heartbeatIntervalMs: number;
heartbeatUrl: string;
httpTimeoutMs: number;
leaseTtlMs: number;
ownerId: string;
releaseUrl: string;
role: QaCredentialRole;
};
export type QaCredentialLeaseHeartbeat = {
getFailure(): Error | null;
stop(): Promise<void>;
throwIfFailed(): void;
};
export type QaCredentialRole = "ci" | "maintainer";
export type QaCredentialLeaseSource = "convex" | "env";
export type QaCredentialLease<TPayload> = {
credentialId?: string;
heartbeat(): Promise<void>;
heartbeatIntervalMs: number;
kind: string;
leaseToken?: string;
leaseTtlMs: number;
ownerId?: string;
payload: TPayload;
release(): Promise<void>;
role?: QaCredentialRole;
source: QaCredentialLeaseSource;
};
export type AcquireQaCredentialLeaseOptions<TPayload> = {
env?: NodeJS.ProcessEnv;
fetchImpl?: typeof fetch;
kind: string;
ownerId?: string;
parsePayload: (payload: unknown) => TPayload;
randomImpl?: () => number;
resolveEnvPayload: () => TPayload;
role?: string;
sleepImpl?: (ms: number) => Promise<unknown>;
source?: string;
timeImpl?: () => number;
};
class QaCredentialBrokerError extends Error {
code: string;
retryAfterMs?: number;
constructor(params: { code: string; message: string; retryAfterMs?: number }) {
super(params.message);
this.name = "QaCredentialBrokerError";
this.code = params.code;
this.retryAfterMs = params.retryAfterMs;
}
}
function parsePositiveIntegerEnv(env: NodeJS.ProcessEnv, key: string, fallback: number): number {
const raw = env[key]?.trim();
if (!raw) {
return fallback;
}
const value = Number(raw);
if (!Number.isFinite(value) || !Number.isInteger(value) || value < 1) {
throw new Error(`${key} must be a positive integer.`);
}
return value;
}
function normalizeQaCredentialSource(value: string | undefined): QaCredentialLeaseSource {
const normalized = value?.trim().toLowerCase() || "env";
if (normalized === "env" || normalized === "convex") {
return normalized;
}
throw new Error(`Credential source must be one of env or convex, got "${value}".`);
}
function normalizeQaCredentialRole(value: string | undefined): QaCredentialRole {
const normalized = value?.trim().toLowerCase() || "maintainer";
if (normalized === "maintainer" || normalized === "ci") {
return normalized;
}
throw new Error(`Credential role must be one of maintainer or ci, got "${value}".`);
}
function isTruthyOptIn(value: string | undefined) {
const normalized = value?.trim().toLowerCase();
return normalized === "1" || normalized === "true" || normalized === "yes";
}
function isLoopbackHostname(hostname: string) {
return hostname === "localhost" || hostname === "::1" || hostname.startsWith("127.");
}
function normalizeConvexSiteUrl(raw: string, env: NodeJS.ProcessEnv): string {
let url: URL;
try {
url = new URL(raw);
} catch {
throw new Error(`OPENCLAW_QA_CONVEX_SITE_URL must be a valid URL, got "${raw || "<empty>"}".`);
}
if (url.protocol === "https:") {
const text = url.toString();
return text.endsWith("/") ? text.slice(0, -1) : text;
}
if (url.protocol !== "http:") {
throw new Error("OPENCLAW_QA_CONVEX_SITE_URL must use https://.");
}
const allowInsecureHttp = isTruthyOptIn(env[ALLOW_INSECURE_HTTP_ENV_KEY]);
if (!allowInsecureHttp || !isLoopbackHostname(url.hostname)) {
throw new Error(
`OPENCLAW_QA_CONVEX_SITE_URL must use https://. http:// is only allowed for loopback hosts when ${ALLOW_INSECURE_HTTP_ENV_KEY}=1.`,
);
}
const text = url.toString();
return text.endsWith("/") ? text.slice(0, -1) : text;
}
function normalizeEndpointPrefix(value: string | undefined): string {
const trimmed = value?.trim();
if (!trimmed) {
return DEFAULT_ENDPOINT_PREFIX;
}
const prefixed = trimmed.startsWith("/") ? trimmed : `/${trimmed}`;
const normalized = prefixed.endsWith("/") ? prefixed.slice(0, -1) : prefixed;
if (!normalized.startsWith("/") || normalized.startsWith("//")) {
throw new Error(
"OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX must be an absolute path like /qa-credentials/v1.",
);
}
if (normalized.includes("\\") || normalized.split("/").some((segment) => segment === "..")) {
throw new Error(
"OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX must not contain backslashes or .. path segments.",
);
}
return normalized;
}
function resolveConvexAuthToken(env: NodeJS.ProcessEnv, role: QaCredentialRole): string {
const roleToken =
role === "ci"
? env.OPENCLAW_QA_CONVEX_SECRET_CI?.trim()
: env.OPENCLAW_QA_CONVEX_SECRET_MAINTAINER?.trim();
const token = roleToken;
if (token) {
return token;
}
if (role === "ci") {
throw new Error("Missing OPENCLAW_QA_CONVEX_SECRET_CI for CI credential access.");
}
throw new Error("Missing OPENCLAW_QA_CONVEX_SECRET_MAINTAINER for maintainer credential access.");
}
function joinConvexEndpoint(baseUrl: string, prefix: string, suffix: string): string {
const normalizedSuffix = suffix.startsWith("/") ? suffix : `/${suffix}`;
const url = new URL(baseUrl);
url.pathname = `${prefix}${normalizedSuffix}`.replace(/\/{2,}/gu, "/");
url.search = "";
url.hash = "";
return url.toString();
}
function resolveConvexCredentialBrokerConfig(params: {
env: NodeJS.ProcessEnv;
ownerId?: string;
role: QaCredentialRole;
}): ConvexCredentialBrokerConfig {
const siteUrl = params.env.OPENCLAW_QA_CONVEX_SITE_URL?.trim();
if (!siteUrl) {
throw new Error("Missing OPENCLAW_QA_CONVEX_SITE_URL for --credential-source convex.");
}
const baseUrl = normalizeConvexSiteUrl(siteUrl, params.env);
const endpointPrefix = normalizeEndpointPrefix(params.env.OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX);
const ownerId =
params.ownerId?.trim() ||
params.env.OPENCLAW_QA_CREDENTIAL_OWNER_ID?.trim() ||
`qa-lab-${params.role}-${process.pid}-${randomUUID().slice(0, 8)}`;
return {
role: params.role,
ownerId,
authToken: resolveConvexAuthToken(params.env, params.role),
leaseTtlMs: parsePositiveIntegerEnv(
params.env,
"OPENCLAW_QA_CREDENTIAL_LEASE_TTL_MS",
DEFAULT_LEASE_TTL_MS,
),
heartbeatIntervalMs: parsePositiveIntegerEnv(
params.env,
"OPENCLAW_QA_CREDENTIAL_HEARTBEAT_INTERVAL_MS",
DEFAULT_HEARTBEAT_INTERVAL_MS,
),
acquireTimeoutMs: parsePositiveIntegerEnv(
params.env,
"OPENCLAW_QA_CREDENTIAL_ACQUIRE_TIMEOUT_MS",
DEFAULT_ACQUIRE_TIMEOUT_MS,
),
httpTimeoutMs: parsePositiveIntegerEnv(
params.env,
"OPENCLAW_QA_CREDENTIAL_HTTP_TIMEOUT_MS",
DEFAULT_HTTP_TIMEOUT_MS,
),
acquireUrl: joinConvexEndpoint(baseUrl, endpointPrefix, "acquire"),
heartbeatUrl: joinConvexEndpoint(baseUrl, endpointPrefix, "heartbeat"),
releaseUrl: joinConvexEndpoint(baseUrl, endpointPrefix, "release"),
};
}
function toBrokerError(params: {
payload: unknown;
fallback: string;
}): QaCredentialBrokerError | null {
const parsed = convexErrorSchema.safeParse(params.payload);
if (!parsed.success) {
return null;
}
return new QaCredentialBrokerError({
code: parsed.data.code,
message: parsed.data.message?.trim() || params.fallback,
retryAfterMs: parsed.data.retryAfterMs,
});
}
async function postConvexBroker(params: {
authToken: string;
body: Record<string, unknown>;
fetchImpl: typeof fetch;
timeoutMs: number;
url: string;
}): Promise<unknown> {
const response = await params.fetchImpl(params.url, {
method: "POST",
headers: {
authorization: `Bearer ${params.authToken}`,
"content-type": "application/json",
},
body: JSON.stringify(params.body),
signal: AbortSignal.timeout(params.timeoutMs),
});
const text = await response.text();
const payload: unknown = (() => {
if (!text.trim()) {
return undefined;
}
try {
return JSON.parse(text) as unknown;
} catch {
return text;
}
})();
const brokerError = toBrokerError({
payload,
fallback: `Convex credential broker request failed (${response.status}).`,
});
if (brokerError) {
throw brokerError;
}
if (!response.ok) {
throw new Error(
`Convex credential broker request to ${params.url} failed with HTTP ${response.status}.`,
);
}
return payload;
}
function computeAcquireBackoffMs(params: {
attempt: number;
randomImpl: () => number;
retryAfterMs?: number;
}): number {
if (params.retryAfterMs && params.retryAfterMs > 0) {
return params.retryAfterMs;
}
const base = RETRY_BACKOFF_MS[Math.min(RETRY_BACKOFF_MS.length - 1, params.attempt - 1)];
const jitter = 0.75 + params.randomImpl() * 0.5;
return Math.max(100, Math.round(base * jitter));
}
function assertConvexOk(payload: unknown, actionLabel: string) {
if (payload === undefined) {
return;
}
if (convexOkSchema.safeParse(payload).success) {
return;
}
const brokerError = toBrokerError({
payload,
fallback: `Convex credential ${actionLabel} failed.`,
});
if (brokerError) {
throw brokerError;
}
throw new Error(`Convex credential ${actionLabel} failed with an invalid response payload.`);
}
export async function acquireQaCredentialLease<TPayload>(
opts: AcquireQaCredentialLeaseOptions<TPayload>,
): Promise<QaCredentialLease<TPayload>> {
const env = opts.env ?? process.env;
const source = normalizeQaCredentialSource(opts.source ?? env.OPENCLAW_QA_CREDENTIAL_SOURCE);
if (source === "env") {
return {
source: "env",
kind: opts.kind,
payload: opts.resolveEnvPayload(),
heartbeatIntervalMs: 0,
leaseTtlMs: 0,
async heartbeat() {},
async release() {},
};
}
const role = normalizeQaCredentialRole(opts.role ?? env.OPENCLAW_QA_CREDENTIAL_ROLE);
const config = resolveConvexCredentialBrokerConfig({
env,
role,
ownerId: opts.ownerId,
});
const fetchImpl = opts.fetchImpl ?? fetch;
const sleepImpl =
opts.sleepImpl ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms)));
const timeImpl = opts.timeImpl ?? (() => Date.now());
const randomImpl = opts.randomImpl ?? (() => Math.random());
const startedAt = timeImpl();
let attempt = 0;
while (true) {
attempt += 1;
try {
const payload = await postConvexBroker({
fetchImpl,
timeoutMs: config.httpTimeoutMs,
authToken: config.authToken,
url: config.acquireUrl,
body: {
kind: opts.kind,
ownerId: config.ownerId,
actorRole: config.role,
leaseTtlMs: config.leaseTtlMs,
heartbeatIntervalMs: config.heartbeatIntervalMs,
},
});
const acquired = convexAcquireSuccessSchema.parse(payload);
const releaseLease = async () => {
const releasePayload = await postConvexBroker({
fetchImpl,
timeoutMs: config.httpTimeoutMs,
authToken: config.authToken,
url: config.releaseUrl,
body: {
kind: opts.kind,
ownerId: config.ownerId,
credentialId: acquired.credentialId,
leaseToken: acquired.leaseToken,
actorRole: config.role,
},
});
assertConvexOk(releasePayload, "release");
};
let parsedPayload: TPayload;
try {
parsedPayload = opts.parsePayload(acquired.payload);
} catch (error) {
try {
await releaseLease();
} catch (releaseError) {
throw new Error(
`Convex credential payload validation failed for kind "${opts.kind}" and cleanup release failed: ${formatErrorMessage(error)}; release failed: ${formatErrorMessage(releaseError)}`,
{ cause: releaseError },
);
}
throw new Error(
`Convex credential payload validation failed for kind "${opts.kind}": ${formatErrorMessage(error)}`,
{ cause: error },
);
}
const leaseTtlMs = acquired.leaseTtlMs ?? config.leaseTtlMs;
const heartbeatIntervalMs = acquired.heartbeatIntervalMs ?? config.heartbeatIntervalMs;
return {
source: "convex",
kind: opts.kind,
role,
ownerId: config.ownerId,
credentialId: acquired.credentialId,
leaseToken: acquired.leaseToken,
leaseTtlMs,
heartbeatIntervalMs,
payload: parsedPayload,
async heartbeat() {
const heartbeatPayload = await postConvexBroker({
fetchImpl,
timeoutMs: config.httpTimeoutMs,
authToken: config.authToken,
url: config.heartbeatUrl,
body: {
kind: opts.kind,
ownerId: config.ownerId,
credentialId: acquired.credentialId,
leaseToken: acquired.leaseToken,
actorRole: config.role,
leaseTtlMs,
},
});
assertConvexOk(heartbeatPayload, "heartbeat");
},
async release() {
await releaseLease();
},
};
} catch (error) {
if (error instanceof QaCredentialBrokerError && RETRYABLE_ACQUIRE_CODES.has(error.code)) {
const elapsed = timeImpl() - startedAt;
if (elapsed >= config.acquireTimeoutMs) {
throw new Error(
`Convex credential pool exhausted for kind "${opts.kind}" after ${config.acquireTimeoutMs}ms.`,
{ cause: error },
);
}
const delayMs = Math.min(
computeAcquireBackoffMs({
attempt,
retryAfterMs: error.retryAfterMs,
randomImpl,
}),
Math.max(0, config.acquireTimeoutMs - elapsed),
);
if (delayMs > 0) {
await sleepImpl(delayMs);
}
continue;
}
if (error instanceof z.ZodError) {
throw new Error(
`Convex credential acquire response did not match the expected payload for kind "${opts.kind}": ${error.message}`,
{ cause: error },
);
}
throw new Error(
`Convex credential acquire failed for kind "${opts.kind}": ${formatErrorMessage(error)}`,
{ cause: error },
);
}
}
}
export function startQaCredentialLeaseHeartbeat(
lease: Pick<QaCredentialLease<unknown>, "heartbeat" | "heartbeatIntervalMs" | "kind" | "source">,
opts?: {
intervalMs?: number;
setTimeoutImpl?: typeof setTimeout;
clearTimeoutImpl?: typeof clearTimeout;
},
): QaCredentialLeaseHeartbeat {
if (lease.source !== "convex") {
return {
getFailure: () => null,
async stop() {},
throwIfFailed() {},
};
}
const intervalMs = opts?.intervalMs ?? lease.heartbeatIntervalMs;
if (!Number.isFinite(intervalMs) || intervalMs < 1) {
return {
getFailure: () => null,
async stop() {},
throwIfFailed() {},
};
}
const setTimeoutImpl = opts?.setTimeoutImpl ?? setTimeout;
const clearTimeoutImpl = opts?.clearTimeoutImpl ?? clearTimeout;
let failure: Error | null = null;
let stopped = false;
let timer: ReturnType<typeof setTimeout> | null = null;
let inFlight: Promise<void> | null = null;
const schedule = () => {
if (stopped || failure) {
return;
}
timer = setTimeoutImpl(() => {
timer = null;
if (stopped || failure) {
return;
}
inFlight = (async () => {
try {
await lease.heartbeat();
} catch (error) {
failure = new Error(
`Credential lease heartbeat failed for kind "${lease.kind}": ${formatErrorMessage(error)}`,
);
return;
} finally {
inFlight = null;
}
schedule();
})();
}, intervalMs);
};
schedule();
return {
getFailure() {
return failure;
},
throwIfFailed() {
if (failure) {
throw failure;
}
},
async stop() {
stopped = true;
if (timer) {
clearTimeoutImpl(timer);
timer = null;
}
if (inFlight) {
await inFlight.catch(() => {});
}
},
};
}
export const __testing = {
DEFAULT_ACQUIRE_TIMEOUT_MS,
DEFAULT_ENDPOINT_PREFIX,
DEFAULT_HEARTBEAT_INTERVAL_MS,
DEFAULT_LEASE_TTL_MS,
computeAcquireBackoffMs,
normalizeQaCredentialRole,
normalizeQaCredentialSource,
parsePositiveIntegerEnv,
resolveConvexCredentialBrokerConfig,
};

View File

@@ -25,6 +25,8 @@ export function resolveLiveTransportQaRunOptions(
fastMode: opts.fastMode,
scenarioIds: opts.scenarioIds,
sutAccountId: opts.sutAccountId,
credentialSource: opts.credentialSource?.trim(),
credentialRole: opts.credentialRole?.trim(),
};
}

View File

@@ -11,6 +11,8 @@ export type LiveTransportQaCommandOptions = {
fastMode?: boolean;
scenarioIds?: string[];
sutAccountId?: string;
credentialSource?: string;
credentialRole?: string;
};
type LiveTransportQaCommanderOptions = {
@@ -22,6 +24,8 @@ type LiveTransportQaCommanderOptions = {
scenario?: string[];
fast?: boolean;
sutAccount?: string;
credentialSource?: string;
credentialRole?: string;
};
export type LiveTransportQaCliRegistration = {
@@ -49,6 +53,8 @@ export function mapLiveTransportQaCommanderOptions(
fastMode: opts.fast,
scenarioIds: opts.scenario,
sutAccountId: opts.sutAccount,
credentialSource: opts.credentialSource,
credentialRole: opts.credentialRole,
};
}
@@ -76,6 +82,14 @@ export function registerLiveTransportQaCli(params: {
.option("--scenario <id>", params.scenarioHelp, collectString, [])
.option("--fast", "Enable provider fast mode where supported", false)
.option("--sut-account <id>", params.sutAccountHelp, "sut")
.option(
"--credential-source <source>",
"Credential source for live lanes: env or convex (default: env)",
)
.option(
"--credential-role <role>",
"Credential role for convex auth: maintainer or ci (default: maintainer)",
)
.action(async (opts: LiveTransportQaCommanderOptions) => {
await params.run(mapLiveTransportQaCommanderOptions(opts));
});

View File

@@ -66,6 +66,30 @@ describe("telegram live qa runtime", () => {
).toThrow("OPENCLAW_QA_TELEGRAM_GROUP_ID must be a numeric Telegram chat id.");
});
it("parses Telegram pooled credential payloads", () => {
expect(
__testing.parseTelegramQaCredentialPayload({
groupId: "-100123",
driverToken: "driver",
sutToken: "sut",
}),
).toEqual({
groupId: "-100123",
driverToken: "driver",
sutToken: "sut",
});
});
it("rejects Telegram pooled credential payloads with non-numeric group ids", () => {
expect(() =>
__testing.parseTelegramQaCredentialPayload({
groupId: "qa-group",
driverToken: "driver",
sutToken: "sut",
}),
).toThrow("Telegram credential payload groupId must be a numeric Telegram chat id.");
});
it("injects a temporary Telegram account into the QA gateway config", () => {
const baseCfg: OpenClawConfig = {
plugins: {

View File

@@ -4,12 +4,18 @@ import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime";
import { z } from "zod";
import { startQaGatewayChild } from "../../gateway-child.js";
import {
defaultQaModelForMode,
normalizeQaProviderMode,
type QaProviderModeInput,
} from "../../run-config.js";
import {
acquireQaCredentialLease,
startQaCredentialLeaseHeartbeat,
type QaCredentialRole,
} from "../shared/credential-lease.runtime.js";
import { startQaLiveLaneGateway } from "../shared/live-gateway.runtime.js";
import { appendLiveLaneIssue, buildLiveLaneArtifactsError } from "../shared/live-lane-helpers.js";
import {
@@ -89,6 +95,13 @@ export type TelegramQaRunResult = {
};
type TelegramQaSummary = {
credentials: {
credentialId?: string;
kind: string;
ownerId?: string;
role?: QaCredentialRole;
source: "convex" | "env";
};
groupId: string;
startedAt: string;
finishedAt: string;
@@ -266,6 +279,12 @@ const TELEGRAM_QA_ENV_KEYS = [
"OPENCLAW_QA_TELEGRAM_SUT_BOT_TOKEN",
] as const;
const telegramQaCredentialPayloadSchema = z.object({
groupId: z.string().trim().min(1),
driverToken: z.string().trim().min(1),
sutToken: z.string().trim().min(1),
});
function resolveEnvValue(env: NodeJS.ProcessEnv, key: (typeof TELEGRAM_QA_ENV_KEYS)[number]) {
const value = env[key]?.trim();
if (!value) {
@@ -288,6 +307,18 @@ export function resolveTelegramQaRuntimeEnv(
};
}
function parseTelegramQaCredentialPayload(payload: unknown): TelegramQaRuntimeEnv {
const parsed = telegramQaCredentialPayloadSchema.parse(payload);
if (!/^-?\d+$/u.test(parsed.groupId)) {
throw new Error("Telegram credential payload groupId must be a numeric Telegram chat id.");
}
return {
groupId: parsed.groupId,
driverToken: parsed.driverToken,
sutToken: parsed.sutToken,
};
}
function flattenInlineButtons(replyMarkup?: TelegramReplyMarkup) {
return (replyMarkup?.inline_keyboard ?? [])
.flat()
@@ -528,6 +559,7 @@ async function waitForTelegramChannelRunning(
function renderTelegramQaMarkdown(params: {
cleanupIssues: string[];
credentialSource: "convex" | "env";
groupId: string;
startedAt: string;
finishedAt: string;
@@ -536,6 +568,7 @@ function renderTelegramQaMarkdown(params: {
const lines = [
"# Telegram QA Report",
"",
`- Credential source: \`${params.credentialSource}\``,
`- Group: \`${params.groupId}\``,
`- Started: ${params.startedAt}`,
`- Finished: ${params.finishedAt}`,
@@ -803,6 +836,8 @@ export async function runTelegramQaLive(params: {
fastMode?: boolean;
scenarioIds?: string[];
sutAccountId?: string;
credentialSource?: string;
credentialRole?: string;
}): Promise<TelegramQaRunResult> {
const repoRoot = path.resolve(params.repoRoot ?? process.cwd());
const outputDir =
@@ -810,7 +845,19 @@ export async function runTelegramQaLive(params: {
path.join(repoRoot, ".artifacts", "qa-e2e", `telegram-${Date.now().toString(36)}`);
await fs.mkdir(outputDir, { recursive: true });
const runtimeEnv = resolveTelegramQaRuntimeEnv();
const credentialLease = await acquireQaCredentialLease({
kind: "telegram",
source: params.credentialSource,
role: params.credentialRole,
resolveEnvPayload: () => resolveTelegramQaRuntimeEnv(),
parsePayload: parseTelegramQaCredentialPayload,
});
const leaseHeartbeat = startQaCredentialLeaseHeartbeat(credentialLease);
const assertLeaseHealthy = () => {
leaseHeartbeat.throwIfFailed();
};
const runtimeEnv = credentialLease.payload;
const providerMode = normalizeQaProviderMode(params.providerMode ?? "live-frontier");
const primaryModel = params.primaryModel?.trim() || defaultQaModelForMode(providerMode);
const alternateModel = params.alternateModel?.trim() || defaultQaModelForMode(providerMode, true);
@@ -819,145 +866,163 @@ export async function runTelegramQaLive(params: {
const observedMessages: TelegramObservedMessage[] = [];
const includeObservedMessageContent = process.env.OPENCLAW_QA_TELEGRAM_CAPTURE_CONTENT === "1";
const startedAt = new Date().toISOString();
const driverIdentity = await getBotIdentity(runtimeEnv.driverToken);
const sutIdentity = await getBotIdentity(runtimeEnv.sutToken);
const sutUsername = sutIdentity.username?.trim();
const uniqueIds = new Set([driverIdentity.id, sutIdentity.id]);
if (uniqueIds.size !== 2) {
throw new Error("Telegram QA requires two distinct bots for driver and SUT.");
}
if (!sutUsername) {
throw new Error("Telegram QA requires the SUT bot to have a Telegram username.");
}
await Promise.all([
flushTelegramUpdates(runtimeEnv.driverToken),
flushTelegramUpdates(runtimeEnv.sutToken),
]);
const gatewayHarness = await startQaLiveLaneGateway({
repoRoot,
transport: {
requiredPluginIds: [],
createGatewayConfig: () => ({}),
},
transportBaseUrl: "http://127.0.0.1:0",
providerMode,
primaryModel,
alternateModel,
fastMode: params.fastMode,
controlUiEnabled: false,
mutateConfig: (cfg) =>
buildTelegramQaConfig(cfg, {
groupId: runtimeEnv.groupId,
sutToken: runtimeEnv.sutToken,
driverBotId: driverIdentity.id,
sutAccountId,
}),
});
const scenarioResults: TelegramQaScenarioResult[] = [];
const cleanupIssues: string[] = [];
let canaryFailure: string | null = null;
try {
await waitForTelegramChannelRunning(gatewayHarness.gateway, sutAccountId);
try {
await runCanary({
driverToken: runtimeEnv.driverToken,
groupId: runtimeEnv.groupId,
sutUsername,
sutBotId: sutIdentity.id,
observedMessages,
});
} catch (error) {
canaryFailure = canaryFailureMessage({
error,
groupId: runtimeEnv.groupId,
driverBotId: driverIdentity.id,
driverUsername: driverIdentity.username,
sutBotId: sutIdentity.id,
sutUsername,
});
scenarioResults.push({
id: "telegram-canary",
title: "Telegram canary",
status: "fail",
details: canaryFailure,
});
const driverIdentity = await getBotIdentity(runtimeEnv.driverToken);
const sutIdentity = await getBotIdentity(runtimeEnv.sutToken);
const sutUsername = sutIdentity.username?.trim();
const uniqueIds = new Set([driverIdentity.id, sutIdentity.id]);
if (uniqueIds.size !== 2) {
throw new Error("Telegram QA requires two distinct bots for driver and SUT.");
}
if (!canaryFailure) {
let driverOffset = await flushTelegramUpdates(runtimeEnv.driverToken);
for (const scenario of scenarios) {
const scenarioRun = scenario.buildRun(sutUsername);
try {
const sent = await sendGroupMessage(
runtimeEnv.driverToken,
runtimeEnv.groupId,
scenarioRun.input,
);
const matched = await waitForObservedMessage({
token: runtimeEnv.driverToken,
initialOffset: driverOffset,
timeoutMs: scenario.timeoutMs,
observedMessages,
predicate: (message) =>
matchesTelegramScenarioReply({
groupId: runtimeEnv.groupId,
matchText: scenarioRun.matchText,
message,
sentMessageId: sent.message_id,
sutBotId: sutIdentity.id,
}),
});
driverOffset = matched.nextOffset;
if (!scenarioRun.expectReply) {
throw new Error(`unexpected reply message ${matched.message.messageId} matched`);
}
assertTelegramScenarioReply({
expectedTextIncludes: scenarioRun.expectedTextIncludes,
message: matched.message,
});
scenarioResults.push({
id: scenario.id,
title: scenario.title,
status: "pass",
details: `reply message ${matched.message.messageId} matched`,
});
} catch (error) {
if (!scenarioRun.expectReply) {
const details = formatErrorMessage(error);
if (
details === `timed out after ${scenario.timeoutMs}ms waiting for Telegram message`
) {
scenarioResults.push({
id: scenario.id,
title: scenario.title,
status: "pass",
details: "no reply",
});
continue;
if (!sutUsername) {
throw new Error("Telegram QA requires the SUT bot to have a Telegram username.");
}
await Promise.all([
flushTelegramUpdates(runtimeEnv.driverToken),
flushTelegramUpdates(runtimeEnv.sutToken),
]);
const gatewayHarness = await startQaLiveLaneGateway({
repoRoot,
transport: {
requiredPluginIds: [],
createGatewayConfig: () => ({}),
},
transportBaseUrl: "http://127.0.0.1:0",
providerMode,
primaryModel,
alternateModel,
fastMode: params.fastMode,
controlUiEnabled: false,
mutateConfig: (cfg) =>
buildTelegramQaConfig(cfg, {
groupId: runtimeEnv.groupId,
sutToken: runtimeEnv.sutToken,
driverBotId: driverIdentity.id,
sutAccountId,
}),
});
try {
await waitForTelegramChannelRunning(gatewayHarness.gateway, sutAccountId);
assertLeaseHealthy();
try {
await runCanary({
driverToken: runtimeEnv.driverToken,
groupId: runtimeEnv.groupId,
sutUsername,
sutBotId: sutIdentity.id,
observedMessages,
});
} catch (error) {
canaryFailure = canaryFailureMessage({
error,
groupId: runtimeEnv.groupId,
driverBotId: driverIdentity.id,
driverUsername: driverIdentity.username,
sutBotId: sutIdentity.id,
sutUsername,
});
scenarioResults.push({
id: "telegram-canary",
title: "Telegram canary",
status: "fail",
details: canaryFailure,
});
}
assertLeaseHealthy();
if (!canaryFailure) {
let driverOffset = await flushTelegramUpdates(runtimeEnv.driverToken);
for (const scenario of scenarios) {
assertLeaseHealthy();
const scenarioRun = scenario.buildRun(sutUsername);
try {
const sent = await sendGroupMessage(
runtimeEnv.driverToken,
runtimeEnv.groupId,
scenarioRun.input,
);
const matched = await waitForObservedMessage({
token: runtimeEnv.driverToken,
initialOffset: driverOffset,
timeoutMs: scenario.timeoutMs,
observedMessages,
predicate: (message) =>
matchesTelegramScenarioReply({
groupId: runtimeEnv.groupId,
matchText: scenarioRun.matchText,
message,
sentMessageId: sent.message_id,
sutBotId: sutIdentity.id,
}),
});
driverOffset = matched.nextOffset;
if (!scenarioRun.expectReply) {
throw new Error(`unexpected reply message ${matched.message.messageId} matched`);
}
assertTelegramScenarioReply({
expectedTextIncludes: scenarioRun.expectedTextIncludes,
message: matched.message,
});
scenarioResults.push({
id: scenario.id,
title: scenario.title,
status: "pass",
details: `reply message ${matched.message.messageId} matched`,
});
} catch (error) {
if (!scenarioRun.expectReply) {
const details = formatErrorMessage(error);
if (
details === `timed out after ${scenario.timeoutMs}ms waiting for Telegram message`
) {
scenarioResults.push({
id: scenario.id,
title: scenario.title,
status: "pass",
details: "no reply",
});
continue;
}
}
scenarioResults.push({
id: scenario.id,
title: scenario.title,
status: "fail",
details: formatErrorMessage(error),
});
}
scenarioResults.push({
id: scenario.id,
title: scenario.title,
status: "fail",
details: formatErrorMessage(error),
});
assertLeaseHealthy();
}
}
} finally {
try {
await gatewayHarness.stop();
} catch (error) {
appendLiveLaneIssue(cleanupIssues, "live gateway cleanup", error);
}
}
} finally {
await leaseHeartbeat.stop();
try {
await gatewayHarness.stop();
await credentialLease.release();
} catch (error) {
appendLiveLaneIssue(cleanupIssues, "live gateway cleanup", error);
appendLiveLaneIssue(cleanupIssues, "credential lease release", error);
}
}
const finishedAt = new Date().toISOString();
const summary: TelegramQaSummary = {
credentials: {
source: credentialLease.source,
kind: credentialLease.kind,
role: credentialLease.role,
ownerId: credentialLease.ownerId,
credentialId: credentialLease.credentialId,
},
groupId: runtimeEnv.groupId,
startedAt,
finishedAt,
@@ -976,6 +1041,7 @@ export async function runTelegramQaLive(params: {
reportPath,
`${renderTelegramQaMarkdown({
cleanupIssues,
credentialSource: credentialLease.source,
groupId: runtimeEnv.groupId,
startedAt,
finishedAt,
@@ -1043,5 +1109,6 @@ export const __testing = {
findScenario,
matchesTelegramScenarioReply,
normalizeTelegramObservedMessage,
parseTelegramQaCredentialPayload,
resolveTelegramQaRuntimeEnv,
};

View File

@@ -0,0 +1,207 @@
import { describe, expect, it, vi } from "vitest";
import {
addQaCredentialSet,
listQaCredentialSets,
QaCredentialAdminError,
removeQaCredentialSet,
} from "./qa-credentials-admin.runtime.js";
function jsonResponse(payload: unknown, status = 200) {
return new Response(JSON.stringify(payload), {
status,
headers: {
"content-type": "application/json",
},
});
}
describe("qa credential admin runtime", () => {
it("adds a credential set through the admin endpoint", async () => {
const fetchImpl = vi.fn(async (_input: RequestInfo | URL, _init?: RequestInit) =>
jsonResponse({
status: "ok",
credential: {
credentialId: "cred-1",
kind: "telegram",
status: "active",
createdAtMs: 100,
updatedAtMs: 100,
lastLeasedAtMs: 0,
note: "qa",
},
}),
);
const result = await addQaCredentialSet({
kind: "telegram",
payload: {
groupId: "-100123",
driverToken: "driver",
sutToken: "sut",
},
note: "qa",
actorId: "maintainer-local",
siteUrl: "https://first-schnauzer-821.convex.site",
env: {
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret",
},
fetchImpl,
});
expect(result.credential.credentialId).toBe("cred-1");
const [url, init] = fetchImpl.mock.calls[0] ?? [];
expect(url).toBe("https://first-schnauzer-821.convex.site/qa-credentials/v1/admin/add");
const headers = init?.headers as Record<string, string>;
expect(headers.authorization).toBe("Bearer maint-secret");
const bodyText = init?.body;
expect(typeof bodyText).toBe("string");
const body = JSON.parse(bodyText as string) as Record<string, unknown>;
expect(body.kind).toBe("telegram");
expect(body.actorId).toBe("maintainer-local");
expect(body.payload).toEqual({
groupId: "-100123",
driverToken: "driver",
sutToken: "sut",
});
});
it("rejects admin commands when maintainer secret is missing", async () => {
await expect(
listQaCredentialSets({
siteUrl: "https://first-schnauzer-821.convex.site",
env: {},
fetchImpl: vi.fn(),
}),
).rejects.toMatchObject({
name: "QaCredentialAdminError",
code: "MISSING_MAINTAINER_SECRET",
} satisfies Partial<QaCredentialAdminError>);
});
it("rejects non-https admin site URLs unless local insecure opt-in is enabled", async () => {
await expect(
listQaCredentialSets({
siteUrl: "http://qa-cred.example.convex.site",
env: {
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret",
},
fetchImpl: vi.fn(),
}),
).rejects.toMatchObject({
name: "QaCredentialAdminError",
code: "INVALID_SITE_URL",
} satisfies Partial<QaCredentialAdminError>);
});
it("allows loopback http admin site URLs when OPENCLAW_QA_ALLOW_INSECURE_HTTP is enabled", async () => {
const fetchImpl = vi.fn(async (_input: RequestInfo | URL, _init?: RequestInit) =>
jsonResponse({
status: "ok",
count: 0,
credentials: [],
}),
);
await listQaCredentialSets({
siteUrl: "http://127.0.0.1:3210",
env: {
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret",
OPENCLAW_QA_ALLOW_INSECURE_HTTP: "1",
},
fetchImpl,
});
expect(fetchImpl.mock.calls[0]?.[0]).toBe("http://127.0.0.1:3210/qa-credentials/v1/admin/list");
});
it("rejects unsafe endpoint-prefix overrides", async () => {
await expect(
listQaCredentialSets({
siteUrl: "https://first-schnauzer-821.convex.site",
endpointPrefix: "//evil.example",
env: {
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret",
},
fetchImpl: vi.fn(),
}),
).rejects.toMatchObject({
name: "QaCredentialAdminError",
code: "INVALID_ARGUMENT",
} satisfies Partial<QaCredentialAdminError>);
});
it("surfaces broker error codes for remove", async () => {
const fetchImpl = vi.fn(async (_input: RequestInfo | URL, _init?: RequestInit) =>
jsonResponse(
{
status: "error",
code: "LEASE_ACTIVE",
message: "Credential is currently leased and cannot be disabled.",
},
200,
),
);
await expect(
removeQaCredentialSet({
credentialId: "cred-1",
siteUrl: "https://first-schnauzer-821.convex.site",
env: {
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret",
},
fetchImpl,
}),
).rejects.toMatchObject({
name: "QaCredentialAdminError",
code: "LEASE_ACTIVE",
} satisfies Partial<QaCredentialAdminError>);
});
it("lists credentials and forwards includePayload/status filters", async () => {
const fetchImpl = vi.fn(async (_input: RequestInfo | URL, _init?: RequestInit) =>
jsonResponse({
status: "ok",
count: 1,
credentials: [
{
credentialId: "cred-2",
kind: "telegram",
status: "active",
createdAtMs: 100,
updatedAtMs: 100,
lastLeasedAtMs: 50,
payload: {
groupId: "-100123",
driverToken: "driver",
sutToken: "sut",
},
},
],
}),
);
const result = await listQaCredentialSets({
kind: "telegram",
status: "active",
includePayload: true,
limit: 5,
siteUrl: "https://first-schnauzer-821.convex.site",
env: {
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret",
},
fetchImpl,
});
expect(result.credentials).toHaveLength(1);
const [, init] = fetchImpl.mock.calls[0] ?? [];
const bodyText = init?.body;
expect(typeof bodyText).toBe("string");
const body = JSON.parse(bodyText as string) as Record<string, unknown>;
expect(body).toEqual({
kind: "telegram",
status: "active",
includePayload: true,
limit: 5,
});
});
});

View File

@@ -0,0 +1,407 @@
import { randomUUID } from "node:crypto";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { z } from "zod";
const DEFAULT_ENDPOINT_PREFIX = "/qa-credentials/v1";
const DEFAULT_HTTP_TIMEOUT_MS = 15_000;
const ALLOW_INSECURE_HTTP_ENV_KEY = "OPENCLAW_QA_ALLOW_INSECURE_HTTP";
const actorRoleSchema = z.union([z.literal("ci"), z.literal("maintainer")]);
const credentialStatusSchema = z.union([z.literal("active"), z.literal("disabled")]);
const listStatusSchema = z.union([z.literal("active"), z.literal("disabled"), z.literal("all")]);
const brokerErrorSchema = z.object({
status: z.literal("error"),
code: z.string().min(1),
message: z.string().min(1),
});
const credentialLeaseSchema = z.object({
ownerId: z.string().min(1),
actorRole: actorRoleSchema,
acquiredAtMs: z.number().int(),
heartbeatAtMs: z.number().int(),
expiresAtMs: z.number().int(),
});
const credentialRecordSchema = z.object({
credentialId: z.string().min(1),
kind: z.string().min(1),
status: credentialStatusSchema,
createdAtMs: z.number().int(),
updatedAtMs: z.number().int(),
lastLeasedAtMs: z.number().int(),
note: z.string().optional(),
lease: credentialLeaseSchema.optional(),
payload: z.unknown().optional(),
});
const addCredentialResponseSchema = z.object({
status: z.literal("ok"),
credential: credentialRecordSchema,
});
const removeCredentialResponseSchema = z.object({
status: z.literal("ok"),
changed: z.boolean(),
credential: credentialRecordSchema,
});
const listCredentialsResponseSchema = z.object({
status: z.literal("ok"),
credentials: z.array(credentialRecordSchema),
count: z.number().int().nonnegative().optional(),
});
export type QaCredentialAdminListStatus = z.infer<typeof listStatusSchema>;
export type QaCredentialRecord = z.infer<typeof credentialRecordSchema>;
export type QaCredentialListResponse = z.infer<typeof listCredentialsResponseSchema>;
export class QaCredentialAdminError extends Error {
code: string;
httpStatus?: number;
constructor(params: { code: string; message: string; httpStatus?: number }) {
super(params.message);
this.name = "QaCredentialAdminError";
this.code = params.code;
this.httpStatus = params.httpStatus;
}
}
type AdminConfig = {
actorId: string;
authToken: string;
addUrl: string;
endpointPrefix: string;
httpTimeoutMs: number;
listUrl: string;
removeUrl: string;
siteUrl: string;
};
type AdminBaseOptions = {
actorId?: string;
endpointPrefix?: string;
env?: NodeJS.ProcessEnv;
fetchImpl?: typeof fetch;
siteUrl?: string;
};
type AddQaCredentialSetOptions = AdminBaseOptions & {
kind: string;
note?: string;
payload: Record<string, unknown>;
status?: z.infer<typeof credentialStatusSchema>;
};
type RemoveQaCredentialSetOptions = AdminBaseOptions & {
credentialId: string;
};
type ListQaCredentialSetsOptions = AdminBaseOptions & {
includePayload?: boolean;
kind?: string;
limit?: number;
status?: string;
};
function parsePositiveIntegerEnv(env: NodeJS.ProcessEnv, key: string, fallback: number): number {
const raw = env[key]?.trim();
if (!raw) {
return fallback;
}
const value = Number(raw);
if (!Number.isFinite(value) || !Number.isInteger(value) || value < 1) {
throw new QaCredentialAdminError({
code: "INVALID_ENV",
message: `${key} must be a positive integer.`,
});
}
return value;
}
function isTruthyOptIn(value: string | undefined) {
const normalized = value?.trim().toLowerCase();
return normalized === "1" || normalized === "true" || normalized === "yes";
}
function isLoopbackHostname(hostname: string) {
return hostname === "localhost" || hostname === "::1" || hostname.startsWith("127.");
}
function normalizeConvexSiteUrl(raw: string, env: NodeJS.ProcessEnv): string {
let parsed: URL;
try {
parsed = new URL(raw);
} catch {
throw new QaCredentialAdminError({
code: "INVALID_SITE_URL",
message: `OPENCLAW_QA_CONVEX_SITE_URL must be a valid URL, got "${raw || "<empty>"}".`,
});
}
if (parsed.protocol === "https:") {
const text = parsed.toString();
return text.endsWith("/") ? text.slice(0, -1) : text;
}
if (parsed.protocol !== "http:") {
throw new QaCredentialAdminError({
code: "INVALID_SITE_URL",
message: "OPENCLAW_QA_CONVEX_SITE_URL must use https://.",
});
}
const allowInsecureHttp = isTruthyOptIn(env[ALLOW_INSECURE_HTTP_ENV_KEY]);
if (!allowInsecureHttp || !isLoopbackHostname(parsed.hostname)) {
throw new QaCredentialAdminError({
code: "INVALID_SITE_URL",
message: `OPENCLAW_QA_CONVEX_SITE_URL must use https://. http:// is only allowed for loopback hosts when ${ALLOW_INSECURE_HTTP_ENV_KEY}=1.`,
});
}
const text = parsed.toString();
return text.endsWith("/") ? text.slice(0, -1) : text;
}
function normalizeEndpointPrefix(value: string | undefined): string {
const trimmed = value?.trim();
if (!trimmed) {
return DEFAULT_ENDPOINT_PREFIX;
}
const prefixed = trimmed.startsWith("/") ? trimmed : `/${trimmed}`;
const normalized = prefixed.endsWith("/") ? prefixed.slice(0, -1) : prefixed;
if (!normalized.startsWith("/") || normalized.startsWith("//")) {
throw new QaCredentialAdminError({
code: "INVALID_ARGUMENT",
message: '--endpoint-prefix must be an absolute path like "/qa-credentials/v1" (not //host).',
});
}
if (normalized.includes("\\") || normalized.split("/").some((segment) => segment === "..")) {
throw new QaCredentialAdminError({
code: "INVALID_ARGUMENT",
message: '--endpoint-prefix must not contain backslashes or ".." path segments.',
});
}
return normalized;
}
function joinEndpoint(baseUrl: string, prefix: string, suffix: string): string {
const normalizedSuffix = suffix.startsWith("/") ? suffix : `/${suffix}`;
const url = new URL(baseUrl);
url.pathname = `${prefix}${normalizedSuffix}`.replace(/\/{2,}/gu, "/");
url.search = "";
url.hash = "";
return url.toString();
}
function resolveAdminAuthToken(env: NodeJS.ProcessEnv): string {
const token = env.OPENCLAW_QA_CONVEX_SECRET_MAINTAINER?.trim();
if (token) {
return token;
}
throw new QaCredentialAdminError({
code: "MISSING_MAINTAINER_SECRET",
message: "Missing OPENCLAW_QA_CONVEX_SECRET_MAINTAINER for qa credential admin commands.",
});
}
function resolveAdminConfig(options: AdminBaseOptions): AdminConfig {
const env = options.env ?? process.env;
const siteUrl = options.siteUrl?.trim() || env.OPENCLAW_QA_CONVEX_SITE_URL?.trim();
if (!siteUrl) {
throw new QaCredentialAdminError({
code: "MISSING_SITE_URL",
message: "Missing OPENCLAW_QA_CONVEX_SITE_URL for qa credential admin commands.",
});
}
const normalizedSiteUrl = normalizeConvexSiteUrl(siteUrl, env);
const endpointPrefix = normalizeEndpointPrefix(
options.endpointPrefix?.trim() || env.OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX,
);
const actorId =
options.actorId?.trim() ||
env.OPENCLAW_QA_CREDENTIAL_OWNER_ID?.trim() ||
`qa-lab-admin-${process.pid}-${randomUUID().slice(0, 8)}`;
return {
actorId,
authToken: resolveAdminAuthToken(env),
siteUrl: normalizedSiteUrl,
endpointPrefix,
httpTimeoutMs: parsePositiveIntegerEnv(
env,
"OPENCLAW_QA_CREDENTIAL_HTTP_TIMEOUT_MS",
DEFAULT_HTTP_TIMEOUT_MS,
),
addUrl: joinEndpoint(normalizedSiteUrl, endpointPrefix, "admin/add"),
removeUrl: joinEndpoint(normalizedSiteUrl, endpointPrefix, "admin/remove"),
listUrl: joinEndpoint(normalizedSiteUrl, endpointPrefix, "admin/list"),
};
}
function parseJsonResponsePayload(text: string) {
if (!text.trim()) {
return undefined;
}
try {
return JSON.parse(text) as unknown;
} catch {
return text;
}
}
function toBrokerError(payload: unknown, httpStatus: number) {
const parsed = brokerErrorSchema.safeParse(payload);
if (!parsed.success) {
return null;
}
return new QaCredentialAdminError({
code: parsed.data.code,
message: parsed.data.message,
httpStatus,
});
}
async function postJson<T>(params: {
authToken: string;
body: Record<string, unknown>;
fetchImpl: typeof fetch;
httpTimeoutMs: number;
responseSchema: z.ZodType<T>;
url: string;
}) {
let response: Response;
try {
response = await params.fetchImpl(params.url, {
method: "POST",
headers: {
authorization: `Bearer ${params.authToken}`,
"content-type": "application/json",
},
body: JSON.stringify(params.body),
signal: AbortSignal.timeout(params.httpTimeoutMs),
});
} catch (error) {
throw new QaCredentialAdminError({
code: "BROKER_REQUEST_FAILED",
message: `Convex credential admin request failed: ${formatErrorMessage(error)}`,
});
}
const text = await response.text();
const payload = parseJsonResponsePayload(text);
const brokerError = toBrokerError(payload, response.status);
if (brokerError) {
throw brokerError;
}
if (!response.ok) {
throw new QaCredentialAdminError({
code: "BROKER_HTTP_ERROR",
message: `Convex credential admin request failed with HTTP ${response.status}.`,
httpStatus: response.status,
});
}
const parsed = params.responseSchema.safeParse(payload);
if (!parsed.success) {
throw new QaCredentialAdminError({
code: "INVALID_RESPONSE",
message: `Convex credential admin response did not match expected shape: ${parsed.error.message}`,
httpStatus: response.status,
});
}
return parsed.data;
}
function normalizeStatus(value: string | undefined): QaCredentialAdminListStatus | undefined {
if (!value) {
return undefined;
}
const normalized = value.trim().toLowerCase();
const parsed = listStatusSchema.safeParse(normalized);
if (!parsed.success) {
throw new QaCredentialAdminError({
code: "INVALID_ARGUMENT",
message: '--status must be one of "active", "disabled", or "all".',
});
}
return parsed.data;
}
function normalizeLimit(value: number | undefined) {
if (value === undefined) {
return undefined;
}
if (!Number.isFinite(value) || !Number.isInteger(value) || value < 1) {
throw new QaCredentialAdminError({
code: "INVALID_ARGUMENT",
message: "--limit must be a positive integer.",
});
}
return value;
}
export async function addQaCredentialSet(options: AddQaCredentialSetOptions) {
const config = resolveAdminConfig(options);
const fetchImpl = options.fetchImpl ?? fetch;
return await postJson({
fetchImpl,
authToken: config.authToken,
httpTimeoutMs: config.httpTimeoutMs,
url: config.addUrl,
responseSchema: addCredentialResponseSchema,
body: {
kind: options.kind,
payload: options.payload,
...(options.note ? { note: options.note } : {}),
...(options.status ? { status: options.status } : {}),
actorId: config.actorId,
},
});
}
export async function removeQaCredentialSet(options: RemoveQaCredentialSetOptions) {
const config = resolveAdminConfig(options);
const fetchImpl = options.fetchImpl ?? fetch;
return await postJson({
fetchImpl,
authToken: config.authToken,
httpTimeoutMs: config.httpTimeoutMs,
url: config.removeUrl,
responseSchema: removeCredentialResponseSchema,
body: {
credentialId: options.credentialId,
actorId: config.actorId,
},
});
}
export async function listQaCredentialSets(options: ListQaCredentialSetsOptions) {
const config = resolveAdminConfig(options);
const fetchImpl = options.fetchImpl ?? fetch;
const status = normalizeStatus(options.status);
const limit = normalizeLimit(options.limit);
return await postJson({
fetchImpl,
authToken: config.authToken,
httpTimeoutMs: config.httpTimeoutMs,
url: config.listUrl,
responseSchema: listCredentialsResponseSchema,
body: {
...(options.kind ? { kind: options.kind } : {}),
...(status ? { status } : {}),
...(options.includePayload === true ? { includePayload: true } : {}),
...(limit !== undefined ? { limit } : {}),
},
});
}
export const __testing = {
DEFAULT_ENDPOINT_PREFIX,
DEFAULT_HTTP_TIMEOUT_MS,
normalizeConvexSiteUrl,
normalizeEndpointPrefix,
normalizeStatus,
parsePositiveIntegerEnv,
resolveAdminConfig,
};

View File

@@ -6,6 +6,7 @@ Files:
- `scenarios.md` - canonical QA scenario pack, kickoff mission, and operator identity.
- `frontier-harness-plan.md` - big-model bakeoff and tuning loop for harness work.
- `convex-credential-broker/` - standalone Convex v1 lease broker for pooled live credentials.
Key workflow:

View File

@@ -0,0 +1,4 @@
.convex
convex/_generated
.env.local

View File

@@ -0,0 +1,165 @@
# QA Convex Credential Broker (v1)
Standalone Convex project for shared `qa-lab` live credentials with lease locking.
This broker exposes:
- `POST /qa-credentials/v1/acquire`
- `POST /qa-credentials/v1/heartbeat`
- `POST /qa-credentials/v1/release`
- `POST /qa-credentials/v1/admin/add`
- `POST /qa-credentials/v1/admin/remove`
- `POST /qa-credentials/v1/admin/list`
The implementation matches the contract documented in
`docs/help/testing.md` for `--credential-source convex`.
## Policy baked in
- Pool partitioning: by `kind` only
- Selection: least-recently-leased (round-robin behavior)
- Secrets: separate maintainer/CI secrets
- Outage behavior: callers fail fast
- Lease event retention: 2 days (hourly cleanup cron)
- Admin event retention: 30 days (hourly cleanup cron)
- App-level encryption: not included in v1
## Quick start
1. Create a Convex deployment and authenticate your CLI.
2. From this folder:
```bash
cd qa/convex-credential-broker
npm install
npx convex dev
```
3. Deploy:
```bash
npx convex deploy
```
4. In Convex deployment environment variables, set:
- `OPENCLAW_QA_CONVEX_SECRET_MAINTAINER`
- `OPENCLAW_QA_CONVEX_SECRET_CI`
Client URL policy:
- `OPENCLAW_QA_CONVEX_SITE_URL` must use `https://` in normal use.
- Local development may use loopback `http://` only when `OPENCLAW_QA_ALLOW_INSECURE_HTTP=1`.
## Manage credentials from qa-lab CLI
Maintainers can manage rows without using the Convex dashboard:
```bash
pnpm openclaw qa credentials add \
--kind telegram \
--payload-file qa/telegram-credential.json
pnpm openclaw qa credentials list --kind telegram
pnpm openclaw qa credentials remove --credential-id <credential-id>
```
Admin endpoints require `OPENCLAW_QA_CONVEX_SECRET_MAINTAINER`.
## Local request examples
Replace `<site-url>` with your Convex site URL and `<token>` with a configured secret.
Acquire:
```bash
curl -sS -X POST "<site-url>/qa-credentials/v1/acquire" \
-H "authorization: Bearer <token>" \
-H "content-type: application/json" \
-d '{
"kind":"telegram",
"ownerId":"local-dev",
"actorRole":"maintainer",
"leaseTtlMs":1200000,
"heartbeatIntervalMs":30000
}'
```
Heartbeat:
```bash
curl -sS -X POST "<site-url>/qa-credentials/v1/heartbeat" \
-H "authorization: Bearer <token>" \
-H "content-type: application/json" \
-d '{
"kind":"telegram",
"ownerId":"local-dev",
"actorRole":"maintainer",
"credentialId":"<credential-id>",
"leaseToken":"<lease-token>",
"leaseTtlMs":1200000
}'
```
Release:
```bash
curl -sS -X POST "<site-url>/qa-credentials/v1/release" \
-H "authorization: Bearer <token>" \
-H "content-type: application/json" \
-d '{
"kind":"telegram",
"ownerId":"local-dev",
"actorRole":"maintainer",
"credentialId":"<credential-id>",
"leaseToken":"<lease-token>"
}'
```
Admin add (maintainer token only):
```bash
curl -sS -X POST "<site-url>/qa-credentials/v1/admin/add" \
-H "authorization: Bearer <maintainer-token>" \
-H "content-type: application/json" \
-d '{
"kind":"telegram",
"actorId":"local-maintainer",
"payload":{
"groupId":"-100123",
"driverToken":"driver-token",
"sutToken":"sut-token"
}
}'
```
For `kind: "telegram"`, broker `admin/add` validates that payload includes:
- `groupId` as a numeric chat id string
- non-empty `driverToken`
- non-empty `sutToken`
Admin list (default redacted):
```bash
curl -sS -X POST "<site-url>/qa-credentials/v1/admin/list" \
-H "authorization: Bearer <maintainer-token>" \
-H "content-type: application/json" \
-d '{
"kind":"telegram",
"status":"all"
}'
```
Admin remove (soft disable, fails when lease is active):
```bash
curl -sS -X POST "<site-url>/qa-credentials/v1/admin/remove" \
-H "authorization: Bearer <maintainer-token>" \
-H "content-type: application/json" \
-d '{
"credentialId":"<credential-id>",
"actorId":"local-maintainer"
}'
```

View File

@@ -0,0 +1,3 @@
{
"functions": "convex/"
}

View File

@@ -0,0 +1,642 @@
import { v } from "convex/values";
import { internal } from "./_generated/api";
import type { Id } from "./_generated/dataModel";
import { internalMutation, internalQuery } from "./_generated/server";
const LEASE_EVENT_RETENTION_MS = 2 * 24 * 60 * 60 * 1_000;
const ADMIN_EVENT_RETENTION_MS = 30 * 24 * 60 * 60 * 1_000;
const EVENT_RETENTION_BATCH_SIZE = 256;
const MAX_HEARTBEAT_INTERVAL_MS = 5 * 60 * 1_000;
const MAX_LEASE_TTL_MS = 2 * 60 * 60 * 1_000;
const MIN_HEARTBEAT_INTERVAL_MS = 5_000;
const MIN_LEASE_TTL_MS = 30_000;
const MAX_LIST_LIMIT = 500;
const MIN_LIST_LIMIT = 1;
const DEFAULT_HEARTBEAT_INTERVAL_MS = 30_000;
const DEFAULT_LEASE_TTL_MS = 20 * 60 * 1_000;
const DEFAULT_LIST_LIMIT = 100;
const POOL_EXHAUSTED_RETRY_AFTER_MS = 2_000;
const actorRole = v.union(v.literal("ci"), v.literal("maintainer"));
const credentialStatus = v.union(v.literal("active"), v.literal("disabled"));
const listStatus = v.union(v.literal("active"), v.literal("disabled"), v.literal("all"));
type ActorRole = "ci" | "maintainer";
type CredentialStatus = "active" | "disabled";
type ListStatus = CredentialStatus | "all";
type LeaseEventType = "acquire" | "acquire_failed" | "release";
type AdminEventType = "add" | "disable" | "disable_failed";
type BrokerErrorResult = {
status: "error";
code: string;
message: string;
retryAfterMs?: number;
};
type BrokerOkResult = {
status: "ok";
};
type CredentialLease = {
ownerId: string;
actorRole: ActorRole;
leaseToken: string;
acquiredAtMs: number;
heartbeatAtMs: number;
expiresAtMs: number;
};
type CredentialSetRecord = {
_id: Id<"credential_sets">;
kind: string;
status: CredentialStatus;
payload: unknown;
createdAtMs: number;
updatedAtMs: number;
lastLeasedAtMs: number;
note?: string;
lease?: CredentialLease;
};
type EventInsertCtx = {
db: {
insert: (
table: "lease_events" | "admin_events",
value: Record<string, unknown>,
) => Promise<unknown>;
};
};
function normalizeIntervalMs(params: {
value: number | undefined;
fallback: number;
min: number;
max: number;
}) {
const value = params.value ?? params.fallback;
const rounded = Math.floor(value);
if (!Number.isFinite(rounded) || rounded < params.min || rounded > params.max) {
return null;
}
return rounded;
}
function normalizeListLimit(value: number | undefined) {
const limit = value ?? DEFAULT_LIST_LIMIT;
const rounded = Math.floor(limit);
if (!Number.isFinite(rounded) || rounded < MIN_LIST_LIMIT || rounded > MAX_LIST_LIMIT) {
return null;
}
return rounded;
}
function brokerError(code: string, message: string, retryAfterMs?: number): BrokerErrorResult {
return retryAfterMs && retryAfterMs > 0
? {
status: "error",
code,
message,
retryAfterMs,
}
: {
status: "error",
code,
message,
};
}
function leaseIsActive(lease: CredentialLease | undefined, nowMs: number) {
return Boolean(lease && lease.expiresAtMs > nowMs);
}
function toCredentialSummary(row: CredentialSetRecord, includePayload: boolean) {
return {
credentialId: row._id,
kind: row.kind,
status: row.status,
createdAtMs: row.createdAtMs,
updatedAtMs: row.updatedAtMs,
lastLeasedAtMs: row.lastLeasedAtMs,
...(row.note ? { note: row.note } : {}),
...(row.lease
? {
lease: {
ownerId: row.lease.ownerId,
actorRole: row.lease.actorRole,
acquiredAtMs: row.lease.acquiredAtMs,
heartbeatAtMs: row.lease.heartbeatAtMs,
expiresAtMs: row.lease.expiresAtMs,
},
}
: {}),
...(includePayload ? { payload: row.payload } : {}),
};
}
async function insertLeaseEvent(params: {
ctx: EventInsertCtx;
kind: string;
eventType: LeaseEventType;
actorRole: ActorRole;
ownerId: string;
occurredAtMs: number;
credentialId?: Id<"credential_sets">;
code?: string;
message?: string;
}) {
await params.ctx.db.insert("lease_events", {
kind: params.kind,
eventType: params.eventType,
actorRole: params.actorRole,
ownerId: params.ownerId,
occurredAtMs: params.occurredAtMs,
...(params.credentialId ? { credentialId: params.credentialId } : {}),
...(params.code ? { code: params.code } : {}),
...(params.message ? { message: params.message } : {}),
});
}
async function insertAdminEvent(params: {
ctx: EventInsertCtx;
eventType: AdminEventType;
actorRole: ActorRole;
actorId: string;
occurredAtMs: number;
credentialId?: Id<"credential_sets">;
kind?: string;
code?: string;
message?: string;
}) {
await params.ctx.db.insert("admin_events", {
eventType: params.eventType,
actorRole: params.actorRole,
actorId: params.actorId,
occurredAtMs: params.occurredAtMs,
...(params.credentialId ? { credentialId: params.credentialId } : {}),
...(params.kind ? { kind: params.kind } : {}),
...(params.code ? { code: params.code } : {}),
...(params.message ? { message: params.message } : {}),
});
}
function sortByLeastRecentlyLeasedThenId(
rows: Array<{
_id: Id<"credential_sets">;
lastLeasedAtMs: number;
}>,
) {
rows.sort((left, right) => {
if (left.lastLeasedAtMs !== right.lastLeasedAtMs) {
return left.lastLeasedAtMs - right.lastLeasedAtMs;
}
const leftId = String(left._id);
const rightId = String(right._id);
return leftId.localeCompare(rightId);
});
}
function sortCredentialRowsForList(rows: CredentialSetRecord[]) {
const statusRank: Record<CredentialStatus, number> = { active: 0, disabled: 1 };
rows.sort((left, right) => {
const kindCompare = left.kind.localeCompare(right.kind);
if (kindCompare !== 0) {
return kindCompare;
}
if (left.status !== right.status) {
return statusRank[left.status] - statusRank[right.status];
}
if (left.updatedAtMs !== right.updatedAtMs) {
return right.updatedAtMs - left.updatedAtMs;
}
return String(left._id).localeCompare(String(right._id));
});
}
function normalizeActorId(value: string | undefined) {
const normalized = value?.trim();
return normalized && normalized.length > 0 ? normalized : "unknown";
}
export const acquireLease = internalMutation({
args: {
kind: v.string(),
ownerId: v.string(),
actorRole,
leaseTtlMs: v.optional(v.number()),
heartbeatIntervalMs: v.optional(v.number()),
},
handler: async (ctx, args) => {
const nowMs = Date.now();
const leaseTtlMs = normalizeIntervalMs({
value: args.leaseTtlMs,
fallback: DEFAULT_LEASE_TTL_MS,
min: MIN_LEASE_TTL_MS,
max: MAX_LEASE_TTL_MS,
});
if (!leaseTtlMs) {
return brokerError(
"INVALID_LEASE_TTL",
`leaseTtlMs must be between ${MIN_LEASE_TTL_MS} and ${MAX_LEASE_TTL_MS}.`,
);
}
const heartbeatIntervalMs = normalizeIntervalMs({
value: args.heartbeatIntervalMs,
fallback: DEFAULT_HEARTBEAT_INTERVAL_MS,
min: MIN_HEARTBEAT_INTERVAL_MS,
max: MAX_HEARTBEAT_INTERVAL_MS,
});
if (!heartbeatIntervalMs) {
return brokerError(
"INVALID_HEARTBEAT_INTERVAL",
`heartbeatIntervalMs must be between ${MIN_HEARTBEAT_INTERVAL_MS} and ${MAX_HEARTBEAT_INTERVAL_MS}.`,
);
}
const activeRows = (await ctx.db
.query("credential_sets")
.withIndex("by_kind_status", (q) => q.eq("kind", args.kind).eq("status", "active"))
.collect()) as CredentialSetRecord[];
const availableRows = activeRows.filter((row) => !leaseIsActive(row.lease, nowMs));
if (availableRows.length === 0) {
await insertLeaseEvent({
ctx,
kind: args.kind,
eventType: "acquire_failed",
actorRole: args.actorRole,
ownerId: args.ownerId,
occurredAtMs: nowMs,
code: "POOL_EXHAUSTED",
message: "No active credential in this kind is currently available.",
});
return brokerError(
"POOL_EXHAUSTED",
`No available credential for kind "${args.kind}".`,
POOL_EXHAUSTED_RETRY_AFTER_MS,
);
}
sortByLeastRecentlyLeasedThenId(availableRows);
const selected = availableRows[0];
const leaseToken = crypto.randomUUID();
await ctx.db.patch(selected._id, {
lease: {
ownerId: args.ownerId,
actorRole: args.actorRole,
leaseToken,
acquiredAtMs: nowMs,
heartbeatAtMs: nowMs,
expiresAtMs: nowMs + leaseTtlMs,
},
lastLeasedAtMs: nowMs,
updatedAtMs: nowMs,
});
await insertLeaseEvent({
ctx,
kind: args.kind,
eventType: "acquire",
actorRole: args.actorRole,
ownerId: args.ownerId,
occurredAtMs: nowMs,
credentialId: selected._id,
});
return {
status: "ok",
credentialId: selected._id,
leaseToken,
payload: selected.payload,
leaseTtlMs,
heartbeatIntervalMs,
};
},
});
export const heartbeatLease = internalMutation({
args: {
kind: v.string(),
ownerId: v.string(),
actorRole,
credentialId: v.id("credential_sets"),
leaseToken: v.string(),
leaseTtlMs: v.optional(v.number()),
},
handler: async (ctx, args): Promise<BrokerErrorResult | BrokerOkResult> => {
const nowMs = Date.now();
const leaseTtlMs = normalizeIntervalMs({
value: args.leaseTtlMs,
fallback: DEFAULT_LEASE_TTL_MS,
min: MIN_LEASE_TTL_MS,
max: MAX_LEASE_TTL_MS,
});
if (!leaseTtlMs) {
return brokerError(
"INVALID_LEASE_TTL",
`leaseTtlMs must be between ${MIN_LEASE_TTL_MS} and ${MAX_LEASE_TTL_MS}.`,
);
}
const row = (await ctx.db.get(args.credentialId)) as CredentialSetRecord | null;
if (!row) {
return brokerError("CREDENTIAL_NOT_FOUND", "Credential record does not exist.");
}
if (row.kind !== args.kind) {
return brokerError("KIND_MISMATCH", "Credential kind did not match this lease heartbeat.");
}
if (row.status !== "active") {
return brokerError(
"CREDENTIAL_DISABLED",
"Credential is disabled and cannot be heartbeated.",
);
}
if (!row.lease) {
return brokerError("LEASE_NOT_FOUND", "Credential is not currently leased.");
}
if (row.lease.ownerId !== args.ownerId || row.lease.leaseToken !== args.leaseToken) {
return brokerError("LEASE_NOT_OWNER", "Credential lease owner/token mismatch.");
}
if (row.lease.expiresAtMs < nowMs) {
return brokerError("LEASE_EXPIRED", "Credential lease has already expired.");
}
await ctx.db.patch(args.credentialId, {
lease: {
...row.lease,
heartbeatAtMs: nowMs,
expiresAtMs: nowMs + leaseTtlMs,
},
updatedAtMs: nowMs,
});
return { status: "ok" };
},
});
export const releaseLease = internalMutation({
args: {
kind: v.string(),
ownerId: v.string(),
actorRole,
credentialId: v.id("credential_sets"),
leaseToken: v.string(),
},
handler: async (ctx, args): Promise<BrokerErrorResult | BrokerOkResult> => {
const nowMs = Date.now();
const row = (await ctx.db.get(args.credentialId)) as CredentialSetRecord | null;
if (!row) {
return brokerError("CREDENTIAL_NOT_FOUND", "Credential record does not exist.");
}
if (row.kind !== args.kind) {
return brokerError("KIND_MISMATCH", "Credential kind did not match this lease release.");
}
if (!row.lease) {
return { status: "ok" };
}
if (row.lease.ownerId !== args.ownerId || row.lease.leaseToken !== args.leaseToken) {
return brokerError("LEASE_NOT_OWNER", "Credential lease owner/token mismatch.");
}
await ctx.db.patch(args.credentialId, {
lease: undefined,
updatedAtMs: nowMs,
});
await insertLeaseEvent({
ctx,
kind: args.kind,
eventType: "release",
actorRole: args.actorRole,
ownerId: args.ownerId,
occurredAtMs: nowMs,
credentialId: args.credentialId,
});
return { status: "ok" };
},
});
export const addCredentialSet = internalMutation({
args: {
kind: v.string(),
payload: v.any(),
note: v.optional(v.string()),
actorId: v.optional(v.string()),
status: v.optional(credentialStatus),
},
handler: async (ctx, args) => {
const nowMs = Date.now();
const actorId = normalizeActorId(args.actorId);
const status = args.status ?? "active";
const note = args.note?.trim();
const credentialId = await ctx.db.insert("credential_sets", {
kind: args.kind,
status,
payload: args.payload,
createdAtMs: nowMs,
updatedAtMs: nowMs,
lastLeasedAtMs: 0,
...(note ? { note } : {}),
});
await insertAdminEvent({
ctx,
eventType: "add",
actorRole: "maintainer",
actorId,
occurredAtMs: nowMs,
credentialId,
kind: args.kind,
});
const created: CredentialSetRecord = {
_id: credentialId,
kind: args.kind,
status,
payload: args.payload,
createdAtMs: nowMs,
updatedAtMs: nowMs,
lastLeasedAtMs: 0,
...(note ? { note } : {}),
};
return {
status: "ok",
credential: toCredentialSummary(created, false),
};
},
});
export const disableCredentialSet = internalMutation({
args: {
credentialId: v.id("credential_sets"),
actorId: v.optional(v.string()),
},
handler: async (ctx, args) => {
const nowMs = Date.now();
const actorId = normalizeActorId(args.actorId);
const row = (await ctx.db.get(args.credentialId)) as CredentialSetRecord | null;
if (!row) {
await insertAdminEvent({
ctx,
eventType: "disable_failed",
actorRole: "maintainer",
actorId,
occurredAtMs: nowMs,
credentialId: args.credentialId,
code: "CREDENTIAL_NOT_FOUND",
message: "Credential record does not exist.",
});
return brokerError("CREDENTIAL_NOT_FOUND", "Credential record does not exist.");
}
if (leaseIsActive(row.lease, nowMs)) {
await insertAdminEvent({
ctx,
eventType: "disable_failed",
actorRole: "maintainer",
actorId,
occurredAtMs: nowMs,
credentialId: row._id,
kind: row.kind,
code: "LEASE_ACTIVE",
message: "Credential is currently leased and cannot be disabled yet.",
});
return brokerError("LEASE_ACTIVE", "Credential is currently leased and cannot be disabled.");
}
if (row.status === "disabled") {
return {
status: "ok",
changed: false,
credential: toCredentialSummary(row, false),
};
}
await ctx.db.patch(args.credentialId, {
status: "disabled",
lease: undefined,
updatedAtMs: nowMs,
});
await insertAdminEvent({
ctx,
eventType: "disable",
actorRole: "maintainer",
actorId,
occurredAtMs: nowMs,
credentialId: row._id,
kind: row.kind,
});
const updated: CredentialSetRecord = {
...row,
status: "disabled",
lease: undefined,
updatedAtMs: nowMs,
};
return {
status: "ok",
changed: true,
credential: toCredentialSummary(updated, false),
};
},
});
export const listCredentialSets = internalQuery({
args: {
kind: v.optional(v.string()),
status: v.optional(listStatus),
includePayload: v.optional(v.boolean()),
limit: v.optional(v.number()),
},
handler: async (ctx, args) => {
const normalizedStatus: ListStatus = args.status ?? "all";
const includePayload = args.includePayload === true;
const limit = normalizeListLimit(args.limit);
if (!limit) {
return brokerError(
"INVALID_LIST_LIMIT",
`limit must be between ${MIN_LIST_LIMIT} and ${MAX_LIST_LIMIT}.`,
);
}
let rows: CredentialSetRecord[] = [];
const kind = args.kind?.trim();
if (kind) {
if (normalizedStatus === "all") {
rows = (await ctx.db
.query("credential_sets")
.withIndex("by_kind_lastLeasedAtMs", (q) => q.eq("kind", kind))
.collect()) as CredentialSetRecord[];
} else {
rows = (await ctx.db
.query("credential_sets")
.withIndex("by_kind_status", (q) => q.eq("kind", kind).eq("status", normalizedStatus))
.collect()) as CredentialSetRecord[];
}
} else {
rows = (await ctx.db.query("credential_sets").collect()) as CredentialSetRecord[];
if (normalizedStatus !== "all") {
rows = rows.filter((row) => row.status === normalizedStatus);
}
}
sortCredentialRowsForList(rows);
const selected = rows.slice(0, limit);
return {
status: "ok",
credentials: selected.map((row) => toCredentialSummary(row, includePayload)),
count: selected.length,
};
},
});
export const cleanupLeaseEvents = internalMutation({
args: {},
handler: async (ctx) => {
const cutoffMs = Date.now() - LEASE_EVENT_RETENTION_MS;
const staleRows = await ctx.db
.query("lease_events")
.withIndex("by_occurredAtMs", (q) => q.lt("occurredAtMs", cutoffMs))
.take(EVENT_RETENTION_BATCH_SIZE);
for (const row of staleRows) {
await ctx.db.delete(row._id);
}
if (staleRows.length === EVENT_RETENTION_BATCH_SIZE) {
await ctx.scheduler.runAfter(0, internal.credentials.cleanupLeaseEvents, {});
}
return {
status: "ok",
deleted: staleRows.length,
retentionMs: LEASE_EVENT_RETENTION_MS,
};
},
});
export const cleanupAdminEvents = internalMutation({
args: {},
handler: async (ctx) => {
const cutoffMs = Date.now() - ADMIN_EVENT_RETENTION_MS;
const staleRows = await ctx.db
.query("admin_events")
.withIndex("by_occurredAtMs", (q) => q.lt("occurredAtMs", cutoffMs))
.take(EVENT_RETENTION_BATCH_SIZE);
for (const row of staleRows) {
await ctx.db.delete(row._id);
}
if (staleRows.length === EVENT_RETENTION_BATCH_SIZE) {
await ctx.scheduler.runAfter(0, internal.credentials.cleanupAdminEvents, {});
}
return {
status: "ok",
deleted: staleRows.length,
retentionMs: ADMIN_EVENT_RETENTION_MS,
};
},
});

View File

@@ -0,0 +1,20 @@
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";
const crons = cronJobs();
crons.interval(
"qa-credential-lease-event-retention",
{ hours: 1 },
internal.credentials.cleanupLeaseEvents,
{},
);
crons.interval(
"qa-credential-admin-event-retention",
{ hours: 1 },
internal.credentials.cleanupAdminEvents,
{},
);
export default crons;

View File

@@ -0,0 +1,457 @@
import { httpRouter } from "convex/server";
import { internal } from "./_generated/api";
import type { Id } from "./_generated/dataModel";
import { httpAction } from "./_generated/server";
type ActorRole = "ci" | "maintainer";
class BrokerHttpError extends Error {
code: string;
httpStatus: number;
constructor(httpStatus: number, code: string, message: string) {
super(message);
this.name = "BrokerHttpError";
this.httpStatus = httpStatus;
this.code = code;
}
}
function jsonResponse(status: number, payload: unknown) {
return new Response(JSON.stringify(payload), {
status,
headers: {
"content-type": "application/json; charset=utf-8",
"cache-control": "no-store",
},
});
}
function parseBearerToken(request: Request) {
const header = request.headers.get("authorization")?.trim();
if (!header) {
return null;
}
const [scheme, token] = header.split(/\s+/u, 2);
if (scheme?.toLowerCase() !== "bearer" || !token) {
return null;
}
return token;
}
function resolveAuthRole(token: string | null): ActorRole {
if (!token) {
throw new BrokerHttpError(
401,
"AUTH_REQUIRED",
"Missing Authorization: Bearer <secret> header.",
);
}
const maintainerSecret = process.env.OPENCLAW_QA_CONVEX_SECRET_MAINTAINER?.trim();
const ciSecret = process.env.OPENCLAW_QA_CONVEX_SECRET_CI?.trim();
if (!maintainerSecret && !ciSecret) {
throw new BrokerHttpError(
500,
"SERVER_MISCONFIGURED",
"No Convex broker role secrets are configured on this deployment.",
);
}
if (maintainerSecret && token === maintainerSecret) {
return "maintainer";
}
if (ciSecret && token === ciSecret) {
return "ci";
}
throw new BrokerHttpError(401, "AUTH_INVALID", "Credential broker secret is invalid.");
}
function assertMaintainerAdminAuth(token: string | null) {
if (!token) {
throw new BrokerHttpError(
401,
"AUTH_REQUIRED",
"Missing Authorization: Bearer <secret> header.",
);
}
const maintainerSecret = process.env.OPENCLAW_QA_CONVEX_SECRET_MAINTAINER?.trim();
if (!maintainerSecret) {
throw new BrokerHttpError(
500,
"SERVER_MISCONFIGURED",
"Admin endpoints require OPENCLAW_QA_CONVEX_SECRET_MAINTAINER on this deployment.",
);
}
if (token === maintainerSecret) {
return;
}
const ciSecret = process.env.OPENCLAW_QA_CONVEX_SECRET_CI?.trim();
if (ciSecret && token === ciSecret) {
throw new BrokerHttpError(
403,
"AUTH_ROLE_MISMATCH",
"Admin endpoints require maintainer credentials.",
);
}
throw new BrokerHttpError(401, "AUTH_INVALID", "Credential broker secret is invalid.");
}
function asObject(value: unknown) {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return null;
}
return value as Record<string, unknown>;
}
async function parseJsonObject(request: Request) {
let parsed: unknown;
try {
parsed = await request.json();
} catch {
throw new BrokerHttpError(400, "INVALID_JSON", "Request body must be valid JSON.");
}
const body = asObject(parsed);
if (!body) {
throw new BrokerHttpError(400, "INVALID_BODY", "Request body must be a JSON object.");
}
return body;
}
function requireString(body: Record<string, unknown>, key: string) {
const raw = body[key];
if (typeof raw !== "string") {
throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be a string.`);
}
const value = raw.trim();
if (!value) {
throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be non-empty.`);
}
return value;
}
function optionalString(body: Record<string, unknown>, key: string) {
if (!(key in body) || body[key] === undefined || body[key] === null) {
return undefined;
}
const raw = body[key];
if (typeof raw !== "string") {
throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be a string.`);
}
const value = raw.trim();
return value.length > 0 ? value : undefined;
}
function requireObject(body: Record<string, unknown>, key: string) {
const raw = body[key];
const parsed = asObject(raw);
if (!parsed) {
throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be a JSON object.`);
}
return parsed;
}
function optionalPositiveInteger(body: Record<string, unknown>, key: string) {
if (!(key in body) || body[key] === undefined || body[key] === null) {
return undefined;
}
const raw = body[key];
if (typeof raw !== "number" || !Number.isFinite(raw) || !Number.isInteger(raw) || raw < 1) {
throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be a positive integer.`);
}
return raw;
}
function optionalBoolean(body: Record<string, unknown>, key: string) {
if (!(key in body) || body[key] === undefined || body[key] === null) {
return undefined;
}
if (typeof body[key] !== "boolean") {
throw new BrokerHttpError(400, "INVALID_BODY", `Expected "${key}" to be a boolean.`);
}
return body[key];
}
function optionalCredentialStatus(body: Record<string, unknown>, key: string) {
const value = optionalString(body, key);
if (!value) {
return undefined;
}
if (value !== "active" && value !== "disabled") {
throw new BrokerHttpError(
400,
"INVALID_BODY",
`Expected "${key}" to be "active" or "disabled".`,
);
}
return value;
}
function optionalListStatus(body: Record<string, unknown>, key: string) {
const value = optionalString(body, key);
if (!value) {
return undefined;
}
if (value !== "active" && value !== "disabled" && value !== "all") {
throw new BrokerHttpError(
400,
"INVALID_BODY",
`Expected "${key}" to be "active", "disabled", or "all".`,
);
}
return value;
}
function requirePayloadString(payload: Record<string, unknown>, key: string, kind: string): string {
const raw = payload[key];
if (typeof raw !== "string") {
throw new BrokerHttpError(
400,
"INVALID_PAYLOAD",
`Credential payload for kind "${kind}" must include "${key}" as a string.`,
);
}
const value = raw.trim();
if (!value) {
throw new BrokerHttpError(
400,
"INVALID_PAYLOAD",
`Credential payload for kind "${kind}" must include a non-empty "${key}" value.`,
);
}
return value;
}
function normalizeCredentialPayloadForKind(kind: string, payload: Record<string, unknown>) {
if (kind !== "telegram") {
return payload;
}
const groupId = requirePayloadString(payload, "groupId", "telegram");
if (!/^-?\d+$/u.test(groupId)) {
throw new BrokerHttpError(
400,
"INVALID_PAYLOAD",
'Credential payload for kind "telegram" must include a numeric "groupId" string.',
);
}
const driverToken = requirePayloadString(payload, "driverToken", "telegram");
const sutToken = requirePayloadString(payload, "sutToken", "telegram");
return {
groupId,
driverToken,
sutToken,
} satisfies Record<string, unknown>;
}
function parseActorRole(body: Record<string, unknown>) {
const actorRole = requireString(body, "actorRole");
if (actorRole !== "ci" && actorRole !== "maintainer") {
throw new BrokerHttpError(
400,
"INVALID_ACTOR_ROLE",
'Expected "actorRole" to be "maintainer" or "ci".',
);
}
return actorRole as ActorRole;
}
function assertRoleAllowed(tokenRole: ActorRole, requestedRole: ActorRole) {
if (tokenRole !== requestedRole) {
throw new BrokerHttpError(
403,
"AUTH_ROLE_MISMATCH",
`Secret role "${tokenRole}" cannot be used as actorRole "${requestedRole}".`,
);
}
}
function normalizeCredentialId(raw: string) {
// Convex Ids are opaque strings. We only enforce non-empty shape at HTTP boundary.
return raw;
}
function normalizeError(error: unknown) {
if (error instanceof BrokerHttpError) {
return {
httpStatus: error.httpStatus,
payload: {
status: "error",
code: error.code,
message: error.message,
},
};
}
if (error instanceof Error) {
return {
httpStatus: 500,
payload: {
status: "error",
code: "INTERNAL_ERROR",
message: error.message || "Internal credential broker error.",
},
};
}
return {
httpStatus: 500,
payload: {
status: "error",
code: "INTERNAL_ERROR",
message: "Internal credential broker error.",
},
};
}
const http = httpRouter();
http.route({
path: "/qa-credentials/v1/acquire",
method: "POST",
handler: httpAction(async (ctx, request) => {
try {
const tokenRole = resolveAuthRole(parseBearerToken(request));
const body = await parseJsonObject(request);
const actorRole = parseActorRole(body);
assertRoleAllowed(tokenRole, actorRole);
const result = await ctx.runMutation(internal.credentials.acquireLease, {
kind: requireString(body, "kind"),
ownerId: requireString(body, "ownerId"),
actorRole,
leaseTtlMs: optionalPositiveInteger(body, "leaseTtlMs"),
heartbeatIntervalMs: optionalPositiveInteger(body, "heartbeatIntervalMs"),
});
return jsonResponse(200, result);
} catch (error) {
const normalized = normalizeError(error);
return jsonResponse(normalized.httpStatus, normalized.payload);
}
}),
});
http.route({
path: "/qa-credentials/v1/heartbeat",
method: "POST",
handler: httpAction(async (ctx, request) => {
try {
const tokenRole = resolveAuthRole(parseBearerToken(request));
const body = await parseJsonObject(request);
const actorRole = parseActorRole(body);
assertRoleAllowed(tokenRole, actorRole);
const result = await ctx.runMutation(internal.credentials.heartbeatLease, {
kind: requireString(body, "kind"),
ownerId: requireString(body, "ownerId"),
actorRole,
credentialId: normalizeCredentialId(
requireString(body, "credentialId"),
) as Id<"credential_sets">,
leaseToken: requireString(body, "leaseToken"),
leaseTtlMs: optionalPositiveInteger(body, "leaseTtlMs"),
});
return jsonResponse(200, result);
} catch (error) {
const normalized = normalizeError(error);
return jsonResponse(normalized.httpStatus, normalized.payload);
}
}),
});
http.route({
path: "/qa-credentials/v1/release",
method: "POST",
handler: httpAction(async (ctx, request) => {
try {
const tokenRole = resolveAuthRole(parseBearerToken(request));
const body = await parseJsonObject(request);
const actorRole = parseActorRole(body);
assertRoleAllowed(tokenRole, actorRole);
const result = await ctx.runMutation(internal.credentials.releaseLease, {
kind: requireString(body, "kind"),
ownerId: requireString(body, "ownerId"),
actorRole,
credentialId: normalizeCredentialId(
requireString(body, "credentialId"),
) as Id<"credential_sets">,
leaseToken: requireString(body, "leaseToken"),
});
return jsonResponse(200, result);
} catch (error) {
const normalized = normalizeError(error);
return jsonResponse(normalized.httpStatus, normalized.payload);
}
}),
});
http.route({
path: "/qa-credentials/v1/admin/add",
method: "POST",
handler: httpAction(async (ctx, request) => {
try {
assertMaintainerAdminAuth(parseBearerToken(request));
const body = await parseJsonObject(request);
const kind = requireString(body, "kind");
const payload = normalizeCredentialPayloadForKind(kind, requireObject(body, "payload"));
const result = await ctx.runMutation(internal.credentials.addCredentialSet, {
kind,
payload,
note: optionalString(body, "note"),
actorId: optionalString(body, "actorId"),
status: optionalCredentialStatus(body, "status"),
});
return jsonResponse(200, result);
} catch (error) {
const normalized = normalizeError(error);
return jsonResponse(normalized.httpStatus, normalized.payload);
}
}),
});
http.route({
path: "/qa-credentials/v1/admin/remove",
method: "POST",
handler: httpAction(async (ctx, request) => {
try {
assertMaintainerAdminAuth(parseBearerToken(request));
const body = await parseJsonObject(request);
const result = await ctx.runMutation(internal.credentials.disableCredentialSet, {
credentialId: normalizeCredentialId(
requireString(body, "credentialId"),
) as Id<"credential_sets">,
actorId: optionalString(body, "actorId"),
});
return jsonResponse(200, result);
} catch (error) {
const normalized = normalizeError(error);
return jsonResponse(normalized.httpStatus, normalized.payload);
}
}),
});
http.route({
path: "/qa-credentials/v1/admin/list",
method: "POST",
handler: httpAction(async (ctx, request) => {
try {
assertMaintainerAdminAuth(parseBearerToken(request));
const body = await parseJsonObject(request);
const result = await ctx.runQuery(internal.credentials.listCredentialSets, {
kind: optionalString(body, "kind"),
status: optionalListStatus(body, "status"),
includePayload: optionalBoolean(body, "includePayload"),
limit: optionalPositiveInteger(body, "limit"),
});
return jsonResponse(200, result);
} catch (error) {
const normalized = normalizeError(error);
return jsonResponse(normalized.httpStatus, normalized.payload);
}
}),
});
export default http;

View File

@@ -0,0 +1,63 @@
import { defineSchema, defineTable } from "convex/server";
import { v } from "convex/values";
const actorRole = v.union(v.literal("ci"), v.literal("maintainer"));
const credentialStatus = v.union(v.literal("active"), v.literal("disabled"));
const leaseEventType = v.union(
v.literal("acquire"),
v.literal("acquire_failed"),
v.literal("release"),
);
const adminEventType = v.union(v.literal("add"), v.literal("disable"), v.literal("disable_failed"));
export default defineSchema({
credential_sets: defineTable({
kind: v.string(),
status: credentialStatus,
payload: v.any(),
createdAtMs: v.number(),
updatedAtMs: v.number(),
lastLeasedAtMs: v.number(),
note: v.optional(v.string()),
lease: v.optional(
v.object({
ownerId: v.string(),
actorRole,
leaseToken: v.string(),
acquiredAtMs: v.number(),
heartbeatAtMs: v.number(),
expiresAtMs: v.number(),
}),
),
})
.index("by_kind_status", ["kind", "status"])
.index("by_kind_lastLeasedAtMs", ["kind", "lastLeasedAtMs"]),
lease_events: defineTable({
kind: v.string(),
eventType: leaseEventType,
actorRole,
ownerId: v.string(),
occurredAtMs: v.number(),
credentialId: v.optional(v.id("credential_sets")),
code: v.optional(v.string()),
message: v.optional(v.string()),
})
.index("by_occurredAtMs", ["occurredAtMs"])
.index("by_kind_occurredAtMs", ["kind", "occurredAtMs"])
.index("by_credential_occurredAtMs", ["credentialId", "occurredAtMs"]),
admin_events: defineTable({
eventType: adminEventType,
actorRole,
actorId: v.string(),
occurredAtMs: v.number(),
credentialId: v.optional(v.id("credential_sets")),
kind: v.optional(v.string()),
code: v.optional(v.string()),
message: v.optional(v.string()),
})
.index("by_occurredAtMs", ["occurredAtMs"])
.index("by_kind_occurredAtMs", ["kind", "occurredAtMs"])
.index("by_credential_occurredAtMs", ["credentialId", "occurredAtMs"]),
});

View File

@@ -0,0 +1,25 @@
{
/* This TypeScript project config describes the environment that
* Convex functions run in and is used to typecheck them.
* You can modify it, but some settings are required to use Convex.
*/
"compilerOptions": {
/* These settings are not required by Convex and can be modified. */
"allowJs": true,
"strict": true,
"moduleResolution": "Bundler",
"jsx": "react-jsx",
"skipLibCheck": true,
"allowSyntheticDefaultImports": true,
/* These compiler options are required by Convex */
"target": "ESNext",
"lib": ["ES2023", "dom"],
"forceConsistentCasingInFileNames": true,
"module": "ESNext",
"isolatedModules": true,
"noEmit": true
},
"include": ["./**/*"],
"exclude": ["./_generated"]
}

View File

@@ -0,0 +1,15 @@
{
"name": "@openclaw/qa-convex-credential-broker",
"version": "0.1.0",
"private": true,
"description": "Convex HTTP credential lease broker for OpenClaw QA lab",
"type": "module",
"scripts": {
"dashboard": "convex dashboard",
"deploy": "convex deploy",
"dev": "convex dev"
},
"dependencies": {
"convex": "^1.35.1"
}
}