test: stabilize qa suite concurrency

This commit is contained in:
Peter Steinberger
2026-04-24 20:39:26 +01:00
parent 47f6a98909
commit 88c91675e2
23 changed files with 887 additions and 164 deletions

View File

@@ -64,6 +64,9 @@ the child config scoped to the transport under test, so Matrix runs without
a combined stdout/stderr log into the selected Matrix QA output directory. To
capture the outer `scripts/run-node.mjs` build/launcher output too, set
`OPENCLAW_RUN_NODE_OUTPUT_LOG=<path>` to a repo-local log file.
Matrix progress is printed by default. `OPENCLAW_QA_MATRIX_TIMEOUT_MS` bounds
the full run, and `OPENCLAW_QA_MATRIX_CLEANUP_TIMEOUT_MS` bounds cleanup so a
stuck Docker teardown reports the exact recovery command instead of hanging.
For a transport-real Telegram smoke lane, run:
@@ -83,6 +86,16 @@ you want artifacts without a failing exit code.
The Telegram report and summary include per-reply RTT from the driver message
send request to the observed SUT reply, starting with the canary.
Before using pooled live credentials, run:
```bash
pnpm openclaw qa credentials doctor
```
The doctor checks Convex broker env, validates endpoint settings, and verifies
admin/list reachability when the maintainer secret is present. It reports only
set/missing status for secrets.
For a transport-real Discord smoke lane, run:
```bash

View File

@@ -147,6 +147,7 @@ runs the same lanes before release approval.
- Uses the pinned stable Tuwunel image `ghcr.io/matrix-construct/tuwunel:v1.5.1` by default. Override with `OPENCLAW_QA_MATRIX_TUWUNEL_IMAGE` when you need to test a different image.
- Matrix does not expose shared credential-source flags because the lane provisions disposable users locally.
- Writes a Matrix QA report, summary, observed-events artifact, and combined stdout/stderr output log under `.artifacts/qa-e2e/...`.
- Emits progress by default and enforces a hard run timeout with `OPENCLAW_QA_MATRIX_TIMEOUT_MS` (default 30 minutes). Cleanup is bounded by `OPENCLAW_QA_MATRIX_CLEANUP_TIMEOUT_MS` and failures include the recovery `docker compose ... down --remove-orphans` command.
- `pnpm openclaw qa telegram`
- Runs the Telegram live QA lane against a real private group using the driver and SUT bot tokens from env.
- Requires `OPENCLAW_QA_TELEGRAM_GROUP_ID`, `OPENCLAW_QA_TELEGRAM_DRIVER_BOT_TOKEN`, and `OPENCLAW_QA_TELEGRAM_SUT_BOT_TOKEN`. The group id must be the numeric Telegram chat id.
@@ -205,12 +206,16 @@ Maintainer admin commands (pool add/remove/list) require
CLI helpers for maintainers:
```bash
pnpm openclaw qa credentials doctor
pnpm openclaw qa credentials add --kind telegram --payload-file qa/telegram-credential.json
pnpm openclaw qa credentials list --kind telegram
pnpm openclaw qa credentials remove --credential-id <credential-id>
```
Use `--json` for machine-readable output in scripts and CI utilities.
Use `doctor` before live runs to check the Convex site URL, broker secrets,
endpoint prefix, HTTP timeout, and admin/list reachability without printing
secret values. Use `--json` for machine-readable output in scripts and CI
utilities.
Default endpoint contract (`OPENCLAW_QA_CONVEX_SITE_URL` + `/qa-credentials/v1`):

View File

@@ -16,6 +16,10 @@ const QA_CLI_METADATA_ENTRY_BASENAMES = Object.freeze([
"cli-metadata.mjs",
"cli-metadata.cjs",
]);
const QA_RUNTIME_DEPS_ARTIFACT_BASENAMES = new Set([
".openclaw-runtime-deps.json",
".openclaw-runtime-deps-stamp.json",
]);
function assertSafeQaBundledPluginId(pluginId: string) {
if (!QA_BUNDLED_PLUGIN_ID_PATTERN.test(pluginId)) {
@@ -312,6 +316,14 @@ async function seedQaStagedBuiltTreeRoots(params: {
}
}
function shouldStageQaBundledPluginPath(sourcePath: string) {
const basename = path.basename(sourcePath);
return (
!QA_RUNTIME_DEPS_ARTIFACT_BASENAMES.has(basename) &&
!basename.startsWith(".openclaw-runtime-deps-copy-")
);
}
export async function resolveQaRuntimeHostVersion(params: {
repoRoot: string;
allowedPluginIds: readonly string[];
@@ -414,7 +426,10 @@ export async function createQaBundledPluginsDir(params: {
if (!sourceDir) {
throw new Error(`qa bundled plugin not found: ${pluginId}`);
}
await fs.cp(sourceDir, path.join(bundledPluginsDir, pluginId), { recursive: true });
await fs.cp(sourceDir, path.join(bundledPluginsDir, pluginId), {
recursive: true,
filter: shouldStageQaBundledPluginPath,
});
}
await symlinkQaStagedDirEntry({
sourcePath: path.join(stagedRoot, "dist"),

View File

@@ -24,6 +24,7 @@ import {
import { startQaProviderServer } from "./providers/server-runtime.js";
import {
addQaCredentialSet,
diagnoseQaCredentialBroker,
listQaCredentialSets,
QaCredentialAdminError,
removeQaCredentialSet,
@@ -427,6 +428,18 @@ function printQaCredentialListTable(credentials: QaCredentialRecord[]) {
}
}
function printQaCredentialDoctorTable(
result: Awaited<ReturnType<typeof diagnoseQaCredentialBroker>>,
) {
process.stdout.write(`QA credentials doctor: ${result.status}\n`);
const nameWidth = Math.max("check".length, ...result.checks.map((check) => check.name.length));
for (const check of result.checks) {
process.stdout.write(
`${check.name.padEnd(nameWidth)} ${check.status.padEnd(4)} ${check.details ?? ""}\n`,
);
}
}
export async function runQaLabSelfCheckCommand(opts: { repoRoot?: string; output?: string }) {
const repoRoot = path.resolve(opts.repoRoot ?? process.cwd());
const server = await startQaLabServer({
@@ -840,6 +853,27 @@ export async function runQaCredentialsListCommand(opts: {
}
}
export async function runQaCredentialsDoctorCommand(opts: {
actorId?: string;
endpointPrefix?: string;
json?: boolean;
siteUrl?: string;
}) {
const result = await diagnoseQaCredentialBroker({
actorId: opts.actorId,
endpointPrefix: opts.endpointPrefix,
siteUrl: opts.siteUrl,
});
if (opts.json) {
process.stdout.write(`${JSON.stringify(result, null, 2)}\n`);
} else {
printQaCredentialDoctorTable(result);
}
if (result.status === "fail") {
process.exitCode = 1;
}
}
export async function runQaLabUiCommand(opts: {
repoRoot?: string;
host?: string;

View File

@@ -140,6 +140,16 @@ async function runQaCredentialsList(opts: {
await runtime.runQaCredentialsListCommand(opts);
}
async function runQaCredentialsDoctor(opts: {
actorId?: string;
endpointPrefix?: string;
json?: boolean;
siteUrl?: string;
}) {
const runtime = await loadQaLabCliRuntime();
await runtime.runQaCredentialsDoctorCommand(opts);
}
async function runQaUi(opts: {
repoRoot?: string;
host?: string;
@@ -434,6 +444,24 @@ export function registerQaLabCli(program: Command) {
.command("credentials")
.description("Manage pooled Convex live credentials used by QA lanes");
credentials
.command("doctor")
.description("Check Convex credential broker env and admin reachability")
.option("--site-url <url>", "Override OPENCLAW_QA_CONVEX_SITE_URL")
.option("--endpoint-prefix <path>", "Override OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX")
.option("--actor-id <id>", "Optional admin actor id to include in broker audit events")
.option("--json", "Emit machine-readable JSON output", false)
.action(
async (opts: {
siteUrl?: string;
endpointPrefix?: string;
actorId?: string;
json?: boolean;
}) => {
await runQaCredentialsDoctor(opts);
},
);
credentials
.command("add")
.description("Add one credential payload to the shared pool")

View File

@@ -890,6 +890,57 @@ describe("qa bundled plugin dir", () => {
).resolves.toBeTruthy();
});
it("skips transient runtime dependency artifacts while staging built bundled plugins", async () => {
const repoRoot = await mkdtemp(path.join(os.tmpdir(), "qa-bundled-runtime-deps-"));
cleanups.push(async () => {
await rm(repoRoot, { recursive: true, force: true });
});
await writeFile(
path.join(repoRoot, "package.json"),
JSON.stringify({ name: "openclaw", type: "module" }, null, 2),
"utf8",
);
const pluginDir = path.join(repoRoot, "dist", "extensions", "qa-channel");
await mkdir(path.join(pluginDir, ".openclaw-runtime-deps-copy-active", "node_modules"), {
recursive: true,
});
await writeFile(
path.join(pluginDir, "package.json"),
JSON.stringify({ name: "@openclaw/qa-channel", type: "module" }, null, 2),
"utf8",
);
await writeFile(path.join(pluginDir, "index.js"), "export const ok = true;\n", "utf8");
await writeFile(path.join(pluginDir, ".openclaw-runtime-deps.json"), "{}\n", "utf8");
await writeFile(path.join(pluginDir, ".openclaw-runtime-deps-stamp.json"), "{}\n", "utf8");
await writeFile(
path.join(pluginDir, ".openclaw-runtime-deps-copy-active", "node_modules", "transient.js"),
"export {};\n",
"utf8",
);
const tempRoot = await mkdtemp(path.join(os.tmpdir(), "qa-bundled-runtime-deps-target-"));
cleanups.push(async () => {
await rm(tempRoot, { recursive: true, force: true });
});
const { bundledPluginsDir } = await __testing.createQaBundledPluginsDir({
repoRoot,
tempRoot,
allowedPluginIds: ["qa-channel"],
});
const stagedPluginDir = path.join(bundledPluginsDir, "qa-channel");
await expect(readFile(path.join(stagedPluginDir, "index.js"), "utf8")).resolves.toContain("ok");
await expect(lstat(path.join(stagedPluginDir, ".openclaw-runtime-deps.json"))).rejects.toThrow(
/ENOENT/u,
);
await expect(
lstat(path.join(stagedPluginDir, ".openclaw-runtime-deps-stamp.json")),
).rejects.toThrow(/ENOENT/u);
await expect(
lstat(path.join(stagedPluginDir, ".openclaw-runtime-deps-copy-active")),
).rejects.toThrow(/ENOENT/u);
});
it("preserves dist-runtime-only root chunks when dist also exists", async () => {
const repoRoot = await mkdtemp(path.join(os.tmpdir(), "qa-bundled-mixed-runtime-"));
cleanups.push(async () => {

View File

@@ -1,6 +1,7 @@
import { describe, expect, it, vi } from "vitest";
import {
addQaCredentialSet,
diagnoseQaCredentialBroker,
listQaCredentialSets,
QaCredentialAdminError,
removeQaCredentialSet,
@@ -204,4 +205,42 @@ describe("qa credential admin runtime", () => {
limit: 5,
});
});
it("doctors credential broker env without exposing secret values", async () => {
const fetchImpl = vi.fn(async (_input: RequestInfo | URL, _init?: RequestInit) =>
jsonResponse({
status: "ok",
count: 1,
credentials: [
{
credentialId: "cred-2",
kind: "telegram",
status: "active",
createdAtMs: 100,
updatedAtMs: 100,
lastLeasedAtMs: 50,
},
],
}),
);
const result = await diagnoseQaCredentialBroker({
siteUrl: "https://first-schnauzer-821.convex.site",
env: {
OPENCLAW_QA_CONVEX_SECRET_CI: "ci-secret",
OPENCLAW_QA_CONVEX_SECRET_MAINTAINER: "maint-secret",
},
fetchImpl,
});
expect(result.status).toBe("pass");
expect(JSON.stringify(result)).not.toContain("ci-secret");
expect(JSON.stringify(result)).not.toContain("maint-secret");
expect(result.checks).toContainEqual(
expect.objectContaining({
name: "broker admin/list",
status: "pass",
}),
);
});
});

View File

@@ -112,6 +112,17 @@ type ListQaCredentialSetsOptions = AdminBaseOptions & {
status?: string;
};
export type QaCredentialDoctorCheck = {
details?: string;
name: string;
status: "fail" | "pass" | "warn";
};
export type QaCredentialDoctorResult = {
checks: QaCredentialDoctorCheck[];
status: "fail" | "pass" | "warn";
};
function parsePositiveIntegerEnv(env: NodeJS.ProcessEnv, key: string, fallback: number): number {
return parseQaCredentialPositiveIntegerEnv({
env,
@@ -163,6 +174,137 @@ function resolveAdminAuthToken(env: NodeJS.ProcessEnv): string {
});
}
function addQaCredentialDoctorCheck(
checks: QaCredentialDoctorCheck[],
check: QaCredentialDoctorCheck,
) {
checks.push(check);
}
function summarizeQaCredentialDoctorStatus(checks: readonly QaCredentialDoctorCheck[]) {
if (checks.some((check) => check.status === "fail")) {
return "fail" as const;
}
if (checks.some((check) => check.status === "warn")) {
return "warn" as const;
}
return "pass" as const;
}
export async function diagnoseQaCredentialBroker(options: AdminBaseOptions = {}) {
const env = options.env ?? process.env;
const checks: QaCredentialDoctorCheck[] = [];
const siteUrl = options.siteUrl?.trim() || env.OPENCLAW_QA_CONVEX_SITE_URL?.trim();
const endpointPrefix = options.endpointPrefix?.trim() || env.OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX;
let normalizedSiteUrl: string | null = null;
let normalizedEndpointPrefix: string | null = null;
if (!siteUrl) {
addQaCredentialDoctorCheck(checks, {
name: "OPENCLAW_QA_CONVEX_SITE_URL",
status: "fail",
details: "missing Convex credential broker site URL",
});
} else {
try {
normalizedSiteUrl = normalizeConvexSiteUrl(siteUrl, env);
addQaCredentialDoctorCheck(checks, {
name: "OPENCLAW_QA_CONVEX_SITE_URL",
status: "pass",
details: normalizedSiteUrl,
});
} catch (error) {
addQaCredentialDoctorCheck(checks, {
name: "OPENCLAW_QA_CONVEX_SITE_URL",
status: "fail",
details: formatErrorMessage(error),
});
}
}
try {
normalizedEndpointPrefix = normalizeEndpointPrefix(endpointPrefix);
addQaCredentialDoctorCheck(checks, {
name: "OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX",
status: "pass",
details: normalizedEndpointPrefix,
});
} catch (error) {
addQaCredentialDoctorCheck(checks, {
name: "OPENCLAW_QA_CONVEX_ENDPOINT_PREFIX",
status: "fail",
details: formatErrorMessage(error),
});
}
for (const [name, requiredFor] of [
["OPENCLAW_QA_CONVEX_SECRET_CI", "live lane leasing"],
["OPENCLAW_QA_CONVEX_SECRET_MAINTAINER", "credential add/list/remove"],
] as const) {
const present = Boolean(env[name]?.trim());
addQaCredentialDoctorCheck(checks, {
name,
status: present ? "pass" : "warn",
details: present ? "set" : `missing; required for ${requiredFor}`,
});
}
try {
const timeoutMs = parsePositiveIntegerEnv(
env,
"OPENCLAW_QA_CREDENTIAL_HTTP_TIMEOUT_MS",
DEFAULT_HTTP_TIMEOUT_MS,
);
addQaCredentialDoctorCheck(checks, {
name: "OPENCLAW_QA_CREDENTIAL_HTTP_TIMEOUT_MS",
status: "pass",
details: `${timeoutMs}ms`,
});
} catch (error) {
addQaCredentialDoctorCheck(checks, {
name: "OPENCLAW_QA_CREDENTIAL_HTTP_TIMEOUT_MS",
status: "fail",
details: formatErrorMessage(error),
});
}
if (normalizedSiteUrl && normalizedEndpointPrefix && env.OPENCLAW_QA_CONVEX_SECRET_MAINTAINER) {
try {
const listed = await listQaCredentialSets({
actorId: options.actorId,
endpointPrefix: normalizedEndpointPrefix,
env,
fetchImpl: options.fetchImpl,
limit: 1,
siteUrl: normalizedSiteUrl,
status: "active",
});
addQaCredentialDoctorCheck(checks, {
name: "broker admin/list",
status: "pass",
details: `reachable; sampled ${listed.credentials.length} active credential row${listed.credentials.length === 1 ? "" : "s"}`,
});
} catch (error) {
addQaCredentialDoctorCheck(checks, {
name: "broker admin/list",
status: "fail",
details: formatErrorMessage(error),
});
}
} else {
addQaCredentialDoctorCheck(checks, {
name: "broker admin/list",
status: "warn",
details: "skipped; site URL and maintainer secret are required",
});
}
return {
checks,
status: summarizeQaCredentialDoctorStatus(checks),
} satisfies QaCredentialDoctorResult;
}
function resolveAdminConfig(options: AdminBaseOptions): AdminConfig {
const env = options.env ?? process.env;
const siteUrl = options.siteUrl?.trim() || env.OPENCLAW_QA_CONVEX_SITE_URL?.trim();

View File

@@ -55,6 +55,7 @@ function createDeps(overrides?: Partial<QaScenarioRuntimeDeps>): QaScenarioRunti
waitForAgentRun: fn,
listCronJobs: fn,
waitForCronRunCompletion: fn,
findManagedDreamingCronJob: fn,
readDoctorMemoryStatus: fn,
forceMemoryIndex: fn,
findSkill: fn,

View File

@@ -66,6 +66,7 @@ export type QaScenarioRuntimeDeps = {
startAgentRun: QaScenarioRuntimeFunction;
waitForAgentRun: QaScenarioRuntimeFunction;
listCronJobs: QaScenarioRuntimeFunction;
findManagedDreamingCronJob: QaScenarioRuntimeFunction;
waitForCronRunCompletion: QaScenarioRuntimeFunction;
readDoctorMemoryStatus: QaScenarioRuntimeFunction;
forceMemoryIndex: QaScenarioRuntimeFunction;
@@ -150,6 +151,7 @@ export type QaScenarioRuntimeApi<
startAgentRun: TDeps["startAgentRun"];
waitForAgentRun: TDeps["waitForAgentRun"];
listCronJobs: TDeps["listCronJobs"];
findManagedDreamingCronJob: TDeps["findManagedDreamingCronJob"];
waitForCronRunCompletion: TDeps["waitForCronRunCompletion"];
readDoctorMemoryStatus: TDeps["readDoctorMemoryStatus"];
forceMemoryIndex: TDeps["forceMemoryIndex"];
@@ -249,6 +251,7 @@ export function createQaScenarioRuntimeApi<
startAgentRun: params.deps.startAgentRun,
waitForAgentRun: params.deps.waitForAgentRun,
listCronJobs: params.deps.listCronJobs,
findManagedDreamingCronJob: params.deps.findManagedDreamingCronJob,
waitForCronRunCompletion: params.deps.waitForCronRunCompletion,
readDoctorMemoryStatus: params.deps.readDoctorMemoryStatus,
forceMemoryIndex: params.deps.forceMemoryIndex,

View File

@@ -9,6 +9,7 @@ import {
collectQaSuitePluginIds,
mapQaSuiteWithConcurrency,
normalizeQaSuiteConcurrency,
resolveQaSuiteWorkerStartStaggerMs,
resolveQaSuiteOutputDir,
scenarioRequiresControlUi,
selectQaSuiteScenarios,
@@ -91,6 +92,56 @@ describe("qa suite planning helpers", () => {
expect(result).toEqual([10, 20, 30, 40]);
});
it("staggers scenario starts without reducing mapped concurrency", async () => {
const sleeps: number[] = [];
const releaseSleeps: Array<() => void> = [];
const started: number[] = [];
const tick = () => new Promise((resolve) => setTimeout(resolve, 0));
const resultPromise = mapQaSuiteWithConcurrency(
[1, 2, 3, 4],
3,
async (item) => {
started.push(item);
return item;
},
{
startStaggerMs: 25,
sleepImpl: async (ms) => {
sleeps.push(ms);
await new Promise<void>((resolve) => {
releaseSleeps.push(resolve);
});
},
},
);
await tick();
expect(started).toEqual([1]);
releaseSleeps.shift()?.();
await tick();
expect(started).toEqual([1, 2]);
releaseSleeps.shift()?.();
await tick();
expect(started).toEqual([1, 2, 3]);
releaseSleeps.shift()?.();
await tick();
expect(started).toEqual([1, 2, 3, 4]);
const result = await resultPromise;
expect(result).toEqual([1, 2, 3, 4]);
expect(sleeps).toEqual([25, 25, 25]);
});
it("resolves a default worker startup stagger for concurrent suite workers", () => {
expect(resolveQaSuiteWorkerStartStaggerMs(1, {})).toBe(0);
expect(resolveQaSuiteWorkerStartStaggerMs(4, {})).toBe(1500);
expect(
resolveQaSuiteWorkerStartStaggerMs(4, {
OPENCLAW_QA_SUITE_WORKER_START_STAGGER_MS: "0",
}),
).toBe(0);
});
it("keeps explicitly requested provider-specific scenarios", () => {
const scenarios = [
makeQaSuiteTestScenario("generic"),

View File

@@ -8,6 +8,7 @@ import type { QaTransportId } from "./qa-transport-registry.js";
import { readQaBootstrapScenarioCatalog } from "./scenario-catalog.js";
const DEFAULT_QA_SUITE_CONCURRENCY = 64;
const DEFAULT_QA_SUITE_WORKER_START_STAGGER_MS = 1_500;
const QA_MERGE_PATCH_BLOCKED_KEYS = new Set(["__proto__", "constructor", "prototype"]);
type QaSeedScenario = ReturnType<typeof readQaBootstrapScenarioCatalog>["scenarios"][number];
@@ -174,18 +175,67 @@ function normalizeQaSuiteConcurrency(
return Math.max(1, Math.min(Math.floor(raw), Math.max(1, scenarioCount)));
}
function resolveQaSuiteWorkerStartStaggerMs(
concurrency: number,
env: NodeJS.ProcessEnv = process.env,
) {
if (concurrency <= 1) {
return 0;
}
const raw = env.OPENCLAW_QA_SUITE_WORKER_START_STAGGER_MS;
if (raw === undefined) {
return DEFAULT_QA_SUITE_WORKER_START_STAGGER_MS;
}
const parsed = Number(raw);
if (!Number.isFinite(parsed) || parsed < 0) {
return DEFAULT_QA_SUITE_WORKER_START_STAGGER_MS;
}
return Math.floor(parsed);
}
async function mapQaSuiteWithConcurrency<T, U>(
items: readonly T[],
concurrency: number,
mapper: (item: T, index: number) => Promise<U>,
opts?: {
startStaggerMs?: number;
sleepImpl?: (ms: number) => Promise<unknown>;
},
) {
const results = Array.from<U>({ length: items.length });
let nextIndex = 0;
let nextStartGate = Promise.resolve();
const workerCount = Math.min(Math.max(1, Math.floor(concurrency)), items.length);
const startStaggerMs = Math.max(0, Math.floor(opts?.startStaggerMs ?? 0));
const sleepImpl =
opts?.sleepImpl ?? ((ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms)));
async function waitForStartSlot(shouldReleaseNextSlot: boolean) {
const currentGate = nextStartGate;
let releaseNextSlot: (() => void) | undefined;
if (shouldReleaseNextSlot) {
nextStartGate = new Promise<void>((resolve) => {
releaseNextSlot = resolve;
});
}
await currentGate;
if (!releaseNextSlot) {
return;
}
void (async () => {
try {
if (startStaggerMs > 0) {
await sleepImpl(startStaggerMs);
}
} finally {
releaseNextSlot();
}
})();
}
const workers = Array.from({ length: workerCount }, async () => {
while (nextIndex < items.length) {
const index = nextIndex;
nextIndex += 1;
await waitForStartSlot(nextIndex < items.length);
results[index] = await mapper(items[index], index);
}
});
@@ -218,6 +268,7 @@ export {
collectQaSuitePluginIds,
mapQaSuiteWithConcurrency,
normalizeQaSuiteConcurrency,
resolveQaSuiteWorkerStartStaggerMs,
resolveQaSuiteOutputDir,
scenarioMatchesLiveLane,
scenarioRequiresControlUi,

View File

@@ -20,6 +20,8 @@ vi.mock("./suite-runtime-gateway.js", () => ({
}));
import {
findManagedDreamingCronJob,
isManagedDreamingCronJob,
listCronJobs,
readDoctorMemoryStatus,
runAgentPrompt,
@@ -221,6 +223,32 @@ describe("qa suite runtime agent process helpers", () => {
);
});
it("finds managed dreaming cron jobs across legacy and current payload contracts", () => {
const legacy = {
id: "legacy",
name: "Memory Dreaming Promotion",
payload: {
kind: "systemEvent",
text: "__openclaw_memory_core_short_term_promotion_dream__",
},
};
const current = {
id: "current",
name: "Memory Dreaming Promotion",
payload: {
kind: "agentTurn",
message: "__openclaw_memory_core_short_term_promotion_dream__",
lightContext: true,
},
sessionTarget: "isolated",
delivery: { mode: "none" },
};
expect(isManagedDreamingCronJob(legacy)).toBe(true);
expect(isManagedDreamingCronJob(current)).toBe(true);
expect(findManagedDreamingCronJob([{ id: "other", name: "Other" }, current])).toBe(current);
});
it("waits for an agent run and fails when the run does not finish ok", async () => {
const gatewayCall = vi
.fn()

View File

@@ -10,7 +10,20 @@ type QaMemorySearchResult = {
results?: Array<{ snippet?: string; text?: string; path?: string }>;
};
type QaCronJob = {
delivery?: { mode?: string };
description?: string;
id?: string;
name?: string;
payload?: { kind?: string; message?: string; text?: string; lightContext?: boolean };
sessionTarget?: string;
state?: { nextRunAtMs?: number };
};
const ANSI_ESCAPE_PATTERN = new RegExp(String.raw`\x1B\[[0-?]*[ -/]*[@-~]`, "g");
const MANAGED_DREAMING_CRON_MARKER = "[managed-by=memory-core.short-term-promotion]";
const MANAGED_DREAMING_CRON_NAME = "Memory Dreaming Promotion";
const MANAGED_DREAMING_PROMPT = "__openclaw_memory_core_short_term_promotion_dream__";
function stripAnsiCodes(text: string) {
return text.replace(ANSI_ESCAPE_PATTERN, "");
@@ -176,16 +189,34 @@ async function listCronJobs(env: Pick<QaSuiteRuntimeEnv, "gateway">) {
},
{ timeoutMs: 30_000 },
)) as {
jobs?: Array<{
id?: string;
name?: string;
payload?: { kind?: string; text?: string };
state?: { nextRunAtMs?: number };
}>;
jobs?: QaCronJob[];
};
return payload.jobs ?? [];
}
function isManagedDreamingCronJob(job: QaCronJob) {
if (job.description?.includes(MANAGED_DREAMING_CRON_MARKER)) {
return true;
}
if (job.name !== MANAGED_DREAMING_CRON_NAME) {
return false;
}
if (job.payload?.kind === "systemEvent" && job.payload.text === MANAGED_DREAMING_PROMPT) {
return true;
}
return (
job.payload?.kind === "agentTurn" &&
job.payload.message === MANAGED_DREAMING_PROMPT &&
job.payload.lightContext === true &&
job.sessionTarget === "isolated" &&
job.delivery?.mode === "none"
);
}
function findManagedDreamingCronJob(jobs: readonly QaCronJob[]) {
return jobs.find(isManagedDreamingCronJob);
}
async function readDoctorMemoryStatus(env: Pick<QaSuiteRuntimeEnv, "gateway">) {
return (await env.gateway.call("doctor.memory.status", {}, { timeoutMs: 30_000 })) as {
dreaming?: QaDreamingStatus;
@@ -269,6 +300,8 @@ async function runAgentPrompt(
export {
forceMemoryIndex,
findManagedDreamingCronJob,
isManagedDreamingCronJob,
listCronJobs,
readDoctorMemoryStatus,
runAgentPrompt,

View File

@@ -6,6 +6,7 @@ export {
} from "./suite-runtime-agent-session.js";
export {
forceMemoryIndex,
findManagedDreamingCronJob,
listCronJobs,
readDoctorMemoryStatus,
runAgentPrompt,

View File

@@ -28,6 +28,7 @@ const resolveGeneratedImagePath = vi.hoisted(() => vi.fn());
const startAgentRun = vi.hoisted(() => vi.fn());
const waitForAgentRun = vi.hoisted(() => vi.fn());
const listCronJobs = vi.hoisted(() => vi.fn());
const findManagedDreamingCronJob = vi.hoisted(() => vi.fn());
const waitForCronRunCompletion = vi.hoisted(() => vi.fn());
const readDoctorMemoryStatus = vi.hoisted(() => vi.fn());
const forceMemoryIndex = vi.hoisted(() => vi.fn());
@@ -92,6 +93,7 @@ vi.mock("./suite-runtime-agent.js", () => ({
startAgentRun,
waitForAgentRun,
listCronJobs,
findManagedDreamingCronJob,
readDoctorMemoryStatus,
forceMemoryIndex,
findSkill,
@@ -232,6 +234,7 @@ describe("qa suite runtime flow", () => {
runScenario: typeof runScenario;
waitForQaChannelReady: typeof waitForQaChannelReady;
waitForOutboundMessage: typeof waitForOutboundMessage;
findManagedDreamingCronJob: typeof findManagedDreamingCronJob;
forceMemoryIndex: typeof forceMemoryIndex;
runAgentPrompt: typeof runAgentPrompt;
qaChannelPlugin: typeof qaChannelPlugin;
@@ -248,6 +251,7 @@ describe("qa suite runtime flow", () => {
expect(call.deps.runScenario).toBe(runScenario);
expect(call.deps.waitForQaChannelReady).toBe(waitForQaChannelReady);
expect(call.deps.waitForOutboundMessage).toBe(waitForOutboundMessage);
expect(call.deps.findManagedDreamingCronJob).toBe(findManagedDreamingCronJob);
expect(call.deps.forceMemoryIndex).toBe(forceMemoryIndex);
expect(call.deps.runAgentPrompt).toBe(runAgentPrompt);
expect(call.deps.qaChannelPlugin).toBe(qaChannelPlugin);

View File

@@ -33,6 +33,7 @@ import {
extractMediaPathFromText,
findSkill,
forceMemoryIndex,
findManagedDreamingCronJob,
handleQaAction,
listCronJobs,
readDoctorMemoryStatus,
@@ -170,6 +171,7 @@ function createQaSuiteScenarioDeps(params: QaSuiteScenarioDepsParams) {
startAgentRun,
waitForAgentRun,
listCronJobs,
findManagedDreamingCronJob,
waitForCronRunCompletion,
readDoctorMemoryStatus,
forceMemoryIndex,

View File

@@ -38,6 +38,7 @@ import {
collectQaSuitePluginIds,
mapQaSuiteWithConcurrency,
normalizeQaSuiteConcurrency,
resolveQaSuiteWorkerStartStaggerMs,
resolveQaSuiteOutputDir,
scenarioRequiresControlUi,
selectQaSuiteScenarios,
@@ -440,9 +441,54 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise<QaSuiteResu
startedAt: startedAt.toISOString(),
scenarios: [...liveScenarioOutcomes],
});
const completedScenarioResults: Array<QaSuiteScenarioResult | undefined> = Array.from({
length: selectedCatalogScenarios.length,
});
let artifactWriteQueue = Promise.resolve();
const writePartialArtifacts = () => {
const partialScenarios = completedScenarioResults.filter(
(scenario): scenario is QaSuiteScenarioResult => scenario !== undefined,
);
if (partialScenarios.length === 0) {
return;
}
artifactWriteQueue = artifactWriteQueue
.then(async () => {
const partialFinishedAt = new Date();
const { report, reportPath } = await writeQaSuiteArtifacts({
outputDir,
startedAt,
finishedAt: partialFinishedAt,
scenarios: partialScenarios,
transport,
providerMode,
primaryModel,
alternateModel,
fastMode,
concurrency,
scenarioIds:
params?.scenarioIds && params.scenarioIds.length > 0
? selectedCatalogScenarios.map((scenario) => scenario.id)
: undefined,
});
lab.setLatestReport({
outputPath: reportPath,
markdown: report,
generatedAt: partialFinishedAt.toISOString(),
} satisfies QaLabLatestReport);
})
.catch((error) => {
writeQaSuiteProgress(
progressEnabled,
`partial artifact write failed: ${sanitizeQaSuiteProgressValue(formatErrorMessage(error))}`,
);
});
};
try {
updateScenarioRun();
const workerStartStaggerMs = resolveQaSuiteWorkerStartStaggerMs(concurrency);
writeQaSuiteProgress(progressEnabled, `scenario start stagger=${workerStartStaggerMs}ms`);
const scenarios: QaSuiteScenarioResult[] = await mapQaSuiteWithConcurrency(
selectedCatalogScenarios,
concurrency,
@@ -507,6 +553,8 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise<QaSuiteResu
progressEnabled,
`scenario ${scenarioResult.status} (${index + 1}/${selectedCatalogScenarios.length}): ${scenarioIdForLog}`,
);
completedScenarioResults[index] = scenarioResult;
writePartialArtifacts();
return scenarioResult;
} catch (error) {
const details = formatErrorMessage(error);
@@ -536,10 +584,14 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise<QaSuiteResu
progressEnabled,
`scenario fail (${index + 1}/${selectedCatalogScenarios.length}): ${scenarioIdForLog}`,
);
completedScenarioResults[index] = scenarioResult;
writePartialArtifacts();
return scenarioResult;
}
},
{ startStaggerMs: workerStartStaggerMs },
);
await artifactWriteQueue;
const finishedAt = new Date();
const failedCount = scenarios.filter((scenario) => scenario.status === "fail").length;
lab.setScenarioRun({

View File

@@ -2,6 +2,8 @@ import { createServer } from "node:net";
import { runExec } from "openclaw/plugin-sdk/process-runtime";
import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime";
const DEFAULT_DOCKER_COMMAND_TIMEOUT_MS = 120_000;
export type RunCommand = (
command: string,
args: string[],
@@ -86,7 +88,11 @@ function trimCommandOutput(output: string) {
export async function execCommand(command: string, args: string[], cwd: string) {
try {
return await runExec(command, args, { cwd, maxBuffer: 10 * 1024 * 1024 });
return await runExec(command, args, {
cwd,
maxBuffer: 10 * 1024 * 1024,
timeoutMs: DEFAULT_DOCKER_COMMAND_TIMEOUT_MS,
});
} catch (error) {
const failedProcess = error as Error & { stdout?: string; stderr?: string };
const renderedStdout = trimCommandOutput(failedProcess.stdout ?? "");

View File

@@ -74,6 +74,38 @@ function buildMatrixQaSummaryInput(
}
describe("matrix live qa runtime", () => {
it("prints Matrix QA progress by default for non-interactive runs", () => {
const previous = process.env.OPENCLAW_QA_MATRIX_PROGRESS;
delete process.env.OPENCLAW_QA_MATRIX_PROGRESS;
try {
expect(liveTesting.shouldWriteMatrixQaProgress()).toBe(true);
process.env.OPENCLAW_QA_MATRIX_PROGRESS = "0";
expect(liveTesting.shouldWriteMatrixQaProgress()).toBe(false);
} finally {
if (previous === undefined) {
delete process.env.OPENCLAW_QA_MATRIX_PROGRESS;
} else {
process.env.OPENCLAW_QA_MATRIX_PROGRESS = previous;
}
}
});
it("normalizes the Matrix QA hard timeout env", () => {
const previous = process.env.OPENCLAW_QA_MATRIX_TIMEOUT_MS;
try {
process.env.OPENCLAW_QA_MATRIX_TIMEOUT_MS = "12345";
expect(liveTesting.createMatrixQaRunDeadline().timeoutMs).toBe(12345);
process.env.OPENCLAW_QA_MATRIX_TIMEOUT_MS = "nope";
expect(liveTesting.createMatrixQaRunDeadline().timeoutMs).toBe(30 * 60_000);
} finally {
if (previous === undefined) {
delete process.env.OPENCLAW_QA_MATRIX_TIMEOUT_MS;
} else {
process.env.OPENCLAW_QA_MATRIX_TIMEOUT_MS = previous;
}
}
});
it("injects a temporary Matrix account into the QA gateway config", () => {
const baseCfg: OpenClawConfig = {
plugins: {

View File

@@ -49,6 +49,9 @@ type MatrixQaGatewayChild = {
runtimeEnv?: NodeJS.ProcessEnv;
};
const DEFAULT_MATRIX_QA_RUN_TIMEOUT_MS = 30 * 60_000;
const DEFAULT_MATRIX_QA_CLEANUP_TIMEOUT_MS = 90_000;
type MatrixQaLiveLaneGatewayHarness = {
gateway: MatrixQaGatewayChild;
stop(): Promise<void>;
@@ -150,7 +153,7 @@ function shouldWriteMatrixQaProgress() {
if (override === "1") {
return true;
}
return process.stderr.isTTY;
return true;
}
function formatMatrixQaDurationMs(durationMs: number) {
@@ -164,6 +167,80 @@ function writeMatrixQaProgress(message: string) {
process.stderr.write(`[matrix-qa] ${message}\n`);
}
function parsePositiveMatrixQaEnvMs(name: string, fallback: number) {
const raw = process.env[name];
if (raw === undefined) {
return fallback;
}
const parsed = Number(raw);
if (!Number.isFinite(parsed) || parsed < 1) {
return fallback;
}
return Math.floor(parsed);
}
function createMatrixQaRunDeadline() {
const timeoutMs = parsePositiveMatrixQaEnvMs(
"OPENCLAW_QA_MATRIX_TIMEOUT_MS",
DEFAULT_MATRIX_QA_RUN_TIMEOUT_MS,
);
return {
timeoutMs,
deadlineMs: Date.now() + timeoutMs,
};
}
function remainingMatrixQaRunMs(deadline: { deadlineMs: number }) {
return Math.max(1, deadline.deadlineMs - Date.now());
}
async function withMatrixQaTimeout<T>(
label: string,
timeoutMs: number,
task: () => Promise<T>,
): Promise<T> {
let timeout: NodeJS.Timeout | undefined;
try {
return await Promise.race([
task(),
new Promise<never>((_, reject) => {
timeout = setTimeout(() => {
reject(new Error(`${label} timed out after ${timeoutMs}ms`));
}, timeoutMs);
}),
]);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}
async function withMatrixQaRunDeadline<T>(
deadline: { deadlineMs: number; timeoutMs: number },
label: string,
task: () => Promise<T>,
) {
return await withMatrixQaTimeout(label, remainingMatrixQaRunMs(deadline), task);
}
async function cleanupMatrixQaResource(params: {
action: () => Promise<void>;
label: string;
recovery?: string;
}) {
const timeoutMs = parsePositiveMatrixQaEnvMs(
"OPENCLAW_QA_MATRIX_CLEANUP_TIMEOUT_MS",
DEFAULT_MATRIX_QA_CLEANUP_TIMEOUT_MS,
);
try {
await withMatrixQaTimeout(params.label, timeoutMs, params.action);
} catch (error) {
const recovery = params.recovery ? `\nRecovery: ${params.recovery}` : "";
throw new Error(`${formatErrorMessage(error)}${recovery}`, { cause: error });
}
}
function countMatrixQaStatuses(entries: Array<{ status: "fail" | "pass" | "skip" }>) {
return {
failed: entries.filter((entry) => entry.status === "fail").length,
@@ -441,15 +518,18 @@ export async function runMatrixQaLive(params: {
const startedAtDate = new Date();
const startedAt = startedAtDate.toISOString();
const runStartedAtMs = Date.now();
const runDeadline = createMatrixQaRunDeadline();
writeMatrixQaProgress(
`suite start scenarios=${scenarios.length} provider=${providerMode} output=${outputDir}`,
`suite start scenarios=${scenarios.length} provider=${providerMode} output=${outputDir} timeout=${formatMatrixQaDurationMs(runDeadline.timeoutMs)}`,
);
const { durationMs: harnessBootMs, result: harness } = await measureMatrixQaStep(() =>
startMatrixQaHarness({
outputDir: path.join(outputDir, "matrix-harness"),
repoRoot,
}),
withMatrixQaRunDeadline(runDeadline, "Matrix harness boot", () =>
startMatrixQaHarness({
outputDir: path.join(outputDir, "matrix-harness"),
repoRoot,
}),
),
);
writeMatrixQaProgress(
`harness ready ${formatMatrixQaDurationMs(harnessBootMs)} baseUrl=${harness.baseUrl}`,
@@ -457,18 +537,24 @@ export async function runMatrixQaLive(params: {
const { durationMs: provisioningMs, result: provisioning } = await (async () => {
try {
return await measureMatrixQaStep(() =>
provisionMatrixQaRoom({
baseUrl: harness.baseUrl,
driverLocalpart: `qa-driver-${runSuffix}`,
observerLocalpart: `qa-observer-${runSuffix}`,
registrationToken: harness.registrationToken,
roomName: `OpenClaw Matrix QA ${runSuffix}`,
sutLocalpart: `qa-sut-${runSuffix}`,
topology,
}),
withMatrixQaRunDeadline(runDeadline, "Matrix topology provisioning", () =>
provisionMatrixQaRoom({
baseUrl: harness.baseUrl,
driverLocalpart: `qa-driver-${runSuffix}`,
observerLocalpart: `qa-observer-${runSuffix}`,
registrationToken: harness.registrationToken,
roomName: `OpenClaw Matrix QA ${runSuffix}`,
sutLocalpart: `qa-sut-${runSuffix}`,
topology,
}),
),
);
} catch (error) {
await harness.stop().catch(() => {});
await cleanupMatrixQaResource({
label: "Matrix homeserver cleanup after provisioning failure",
action: () => harness.stop(),
recovery: harness.stopCommand,
}).catch(() => {});
throw error;
}
})();
@@ -530,33 +616,38 @@ export async function runMatrixQaLive(params: {
};
}
if (gatewayHarness) {
await gatewayHarness.stop();
await cleanupMatrixQaResource({
label: "Matrix live gateway cleanup before config switch",
action: () => gatewayHarness!.stop(),
});
gatewayHarness = null;
gatewayHarnessKey = nextKey;
}
writeMatrixQaProgress("gateway boot start");
const { durationMs, result: started } = await measureMatrixQaStep(async () => {
const nextHarness = await startMatrixQaLiveLaneGateway({
repoRoot,
transport: {
requiredPluginIds: [],
createGatewayConfig: () => ({}),
},
transportBaseUrl: "http://127.0.0.1:43123",
providerMode,
primaryModel,
alternateModel,
fastMode: params.fastMode,
controlUiEnabled: false,
mutateConfig: (cfg) =>
buildMatrixQaConfig(cfg, {
...gatewayConfigParams,
overrides,
}),
});
await waitForMatrixChannelReady(nextHarness.gateway, sutAccountId);
return nextHarness;
});
const { durationMs, result: started } = await measureMatrixQaStep(() =>
withMatrixQaRunDeadline(runDeadline, "Matrix gateway boot", async () => {
const nextHarness = await startMatrixQaLiveLaneGateway({
repoRoot,
transport: {
requiredPluginIds: [],
createGatewayConfig: () => ({}),
},
transportBaseUrl: "http://127.0.0.1:43123",
providerMode,
primaryModel,
alternateModel,
fastMode: params.fastMode,
controlUiEnabled: false,
mutateConfig: (cfg) =>
buildMatrixQaConfig(cfg, {
...gatewayConfigParams,
overrides,
}),
});
await waitForMatrixChannelReady(nextHarness.gateway, sutAccountId);
return nextHarness;
}),
);
writeMatrixQaProgress(`gateway boot done ${formatMatrixQaDurationMs(durationMs)}`);
gatewayHarness = started;
gatewayHarnessKey = nextKey;
@@ -580,16 +671,18 @@ export async function runMatrixQaLive(params: {
try {
writeMatrixQaProgress("canary start");
const canaryMeasured = await measureMatrixQaStep(() =>
runMatrixQaCanary({
baseUrl: harness.baseUrl,
driverAccessToken: provisioning.driver.accessToken,
observedEvents,
roomId: provisioning.roomId,
syncState,
syncStreams,
sutUserId: provisioning.sut.userId,
timeoutMs: 45_000,
}),
withMatrixQaRunDeadline(runDeadline, "Matrix canary", () =>
runMatrixQaCanary({
baseUrl: harness.baseUrl,
driverAccessToken: provisioning.driver.accessToken,
observedEvents,
roomId: provisioning.roomId,
syncState,
syncStreams,
sutUserId: provisioning.sut.userId,
timeoutMs: 45_000,
}),
),
);
canaryMs = canaryMeasured.durationMs;
const canary = canaryMeasured.result;
@@ -631,108 +724,110 @@ export async function runMatrixQaLive(params: {
gatewayBootMs = scenarioGateway.durationMs;
scenarioGatewayBootMs += gatewayBootMs;
const measuredScenario = await measureMatrixQaStep(() =>
runMatrixQaScenario(scenario, {
baseUrl: harness.baseUrl,
canary: canaryArtifact,
driverAccessToken: provisioning.driver.accessToken,
driverDeviceId: provisioning.driver.deviceId,
driverPassword: provisioning.driver.password,
driverUserId: provisioning.driver.userId,
interruptTransport: async () => {
writeMatrixQaProgress(`transport interrupt start ${scenario.id}`);
const measuredInterrupt = await measureMatrixQaStep(async () => {
await harness.restartService();
await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId, {
timeoutMs: 90_000,
withMatrixQaRunDeadline(runDeadline, `Matrix scenario ${scenario.id}`, () =>
runMatrixQaScenario(scenario, {
baseUrl: harness.baseUrl,
canary: canaryArtifact,
driverAccessToken: provisioning.driver.accessToken,
driverDeviceId: provisioning.driver.deviceId,
driverPassword: provisioning.driver.password,
driverUserId: provisioning.driver.userId,
interruptTransport: async () => {
writeMatrixQaProgress(`transport interrupt start ${scenario.id}`);
const measuredInterrupt = await measureMatrixQaStep(async () => {
await harness.restartService();
await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId, {
timeoutMs: 90_000,
});
});
});
transportInterruptMs += measuredInterrupt.durationMs;
scenarioTransportInterruptMs += measuredInterrupt.durationMs;
writeMatrixQaProgress(
`transport interrupt done ${scenario.id} ${formatMatrixQaDurationMs(measuredInterrupt.durationMs)}`,
);
},
observedEvents,
observerAccessToken: provisioning.observer.accessToken,
observerDeviceId: provisioning.observer.deviceId,
observerPassword: provisioning.observer.password,
observerUserId: provisioning.observer.userId,
gatewayStateDir: scenarioGateway.harness.gateway.runtimeEnv?.OPENCLAW_STATE_DIR,
outputDir,
restartGateway: async () => {
if (!gatewayHarness) {
throw new Error("Matrix restart scenario requires a live gateway");
}
writeMatrixQaProgress(`gateway restart start ${scenario.id}`);
const measuredRestart = await measureMatrixQaStep(async () => {
await scenarioGateway.harness.gateway.restart();
await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId);
});
gatewayRestartMs += measuredRestart.durationMs;
scenarioRestartGatewayMs += measuredRestart.durationMs;
writeMatrixQaProgress(
`gateway restart done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`,
);
},
restartGatewayAfterStateMutation: async (mutateState) => {
if (!gatewayHarness) {
throw new Error(
"Matrix persisted-state restart scenario requires a live gateway",
transportInterruptMs += measuredInterrupt.durationMs;
scenarioTransportInterruptMs += measuredInterrupt.durationMs;
writeMatrixQaProgress(
`transport interrupt done ${scenario.id} ${formatMatrixQaDurationMs(measuredInterrupt.durationMs)}`,
);
}
const restartAfterStateMutation =
scenarioGateway.harness.gateway.restartAfterStateMutation;
if (!restartAfterStateMutation) {
throw new Error(
"Matrix persisted-state restart scenario requires a hard restart callback",
},
observedEvents,
observerAccessToken: provisioning.observer.accessToken,
observerDeviceId: provisioning.observer.deviceId,
observerPassword: provisioning.observer.password,
observerUserId: provisioning.observer.userId,
gatewayStateDir: scenarioGateway.harness.gateway.runtimeEnv?.OPENCLAW_STATE_DIR,
outputDir,
restartGateway: async () => {
if (!gatewayHarness) {
throw new Error("Matrix restart scenario requires a live gateway");
}
writeMatrixQaProgress(`gateway restart start ${scenario.id}`);
const measuredRestart = await measureMatrixQaStep(async () => {
await scenarioGateway.harness.gateway.restart();
await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId);
});
gatewayRestartMs += measuredRestart.durationMs;
scenarioRestartGatewayMs += measuredRestart.durationMs;
writeMatrixQaProgress(
`gateway restart done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`,
);
}
writeMatrixQaProgress(`gateway hard restart start ${scenario.id}`);
const measuredRestart = await measureMatrixQaStep(async () => {
await restartAfterStateMutation(mutateState);
await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId);
});
gatewayRestartMs += measuredRestart.durationMs;
scenarioRestartGatewayMs += measuredRestart.durationMs;
writeMatrixQaProgress(
`gateway hard restart done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`,
);
},
restartGatewayWithQueuedMessage: async (queueMessage) => {
if (!gatewayHarness) {
throw new Error("Matrix restart catchup scenario requires a live gateway");
}
writeMatrixQaProgress(`gateway restart+queue start ${scenario.id}`);
const measuredRestart = await measureMatrixQaStep(async () => {
await scenarioGateway.harness.gateway.restart();
await sleep(250);
await queueMessage();
await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId);
});
gatewayRestartMs += measuredRestart.durationMs;
scenarioRestartGatewayMs += measuredRestart.durationMs;
writeMatrixQaProgress(
`gateway restart+queue done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`,
);
},
roomId: provisioning.roomId,
sutAccountId,
sutAccessToken: provisioning.sut.accessToken,
sutDeviceId: provisioning.sut.deviceId,
sutPassword: provisioning.sut.password,
syncState,
syncStreams,
sutUserId: provisioning.sut.userId,
timeoutMs: scenario.timeoutMs,
topology: provisioning.topology,
patchGatewayConfig: async (patch, opts) => {
await patchMatrixQaGatewayConfig({
gateway: scenarioGateway.harness.gateway,
patch,
restartDelayMs: opts?.restartDelayMs,
});
},
}),
},
restartGatewayAfterStateMutation: async (mutateState) => {
if (!gatewayHarness) {
throw new Error(
"Matrix persisted-state restart scenario requires a live gateway",
);
}
const restartAfterStateMutation =
scenarioGateway.harness.gateway.restartAfterStateMutation;
if (!restartAfterStateMutation) {
throw new Error(
"Matrix persisted-state restart scenario requires a hard restart callback",
);
}
writeMatrixQaProgress(`gateway hard restart start ${scenario.id}`);
const measuredRestart = await measureMatrixQaStep(async () => {
await restartAfterStateMutation(mutateState);
await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId);
});
gatewayRestartMs += measuredRestart.durationMs;
scenarioRestartGatewayMs += measuredRestart.durationMs;
writeMatrixQaProgress(
`gateway hard restart done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`,
);
},
restartGatewayWithQueuedMessage: async (queueMessage) => {
if (!gatewayHarness) {
throw new Error("Matrix restart catchup scenario requires a live gateway");
}
writeMatrixQaProgress(`gateway restart+queue start ${scenario.id}`);
const measuredRestart = await measureMatrixQaStep(async () => {
await scenarioGateway.harness.gateway.restart();
await sleep(250);
await queueMessage();
await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId);
});
gatewayRestartMs += measuredRestart.durationMs;
scenarioRestartGatewayMs += measuredRestart.durationMs;
writeMatrixQaProgress(
`gateway restart+queue done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`,
);
},
roomId: provisioning.roomId,
sutAccountId,
sutAccessToken: provisioning.sut.accessToken,
sutDeviceId: provisioning.sut.deviceId,
sutPassword: provisioning.sut.password,
syncState,
syncStreams,
sutUserId: provisioning.sut.userId,
timeoutMs: scenario.timeoutMs,
topology: provisioning.topology,
patchGatewayConfig: async (patch, opts) => {
await patchMatrixQaGatewayConfig({
gateway: scenarioGateway.harness.gateway,
patch,
restartDelayMs: opts?.restartDelayMs,
});
},
}),
),
);
const result = measuredScenario.result;
scenarioTimings[originalIndex] = {
@@ -775,13 +870,20 @@ export async function runMatrixQaLive(params: {
} finally {
if (gatewayHarness) {
try {
await gatewayHarness.stop();
await cleanupMatrixQaResource({
label: "Matrix live gateway cleanup",
action: () => gatewayHarness!.stop(),
});
} catch (error) {
appendLiveLaneIssue(cleanupErrors, "live gateway cleanup", error);
}
}
try {
await harness.stop();
await cleanupMatrixQaResource({
label: "Matrix homeserver cleanup",
action: () => harness.stop(),
recovery: harness.stopCommand,
});
} catch (error) {
appendLiveLaneIssue(cleanupErrors, "Matrix harness cleanup", error);
}
@@ -938,10 +1040,12 @@ export const __testing = {
MATRIX_QA_SCENARIOS,
buildMatrixQaConfig,
buildMatrixQaConfigSnapshot,
createMatrixQaRunDeadline,
findMatrixQaScenarios,
isMatrixAccountReady,
patchMatrixQaGatewayConfig,
resolveMatrixQaModels,
shouldWriteMatrixQaProgress,
summarizeMatrixQaConfigSnapshot,
waitForMatrixChannelReady,
};

View File

@@ -18,6 +18,7 @@ const MATRIX_QA_DEFAULT_SERVER_NAME = "matrix-qa.test";
const MATRIX_QA_DEFAULT_PORT = 28008;
const MATRIX_QA_INTERNAL_PORT = 8008;
const MATRIX_QA_SERVICE = "matrix-qa-homeserver";
const MATRIX_QA_CLEANUP_TIMEOUT_MS = 90_000;
type MatrixQaHarnessManifest = {
image: string;
@@ -54,6 +55,28 @@ async function isMatrixVersionsReachable(baseUrl: string, fetchImpl: FetchLike)
.catch(() => false);
}
async function withMatrixQaHarnessTimeout<T>(
label: string,
timeoutMs: number,
task: Promise<T>,
): Promise<T> {
let timeout: NodeJS.Timeout | undefined;
try {
return await Promise.race([
task,
new Promise<never>((_, reject) => {
timeout = setTimeout(() => {
reject(new Error(`${label} timed out after ${timeoutMs}ms`));
}, timeoutMs);
}),
]);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}
async function waitForReachableMatrixBaseUrl(params: {
composeFile: string;
containerBaseUrl: string | null;
@@ -279,10 +302,14 @@ export async function startMatrixQaHarness(
},
stopCommand: `docker compose -f ${files.composeFile} down --remove-orphans`,
async stop() {
await runCommand(
"docker",
["compose", "-f", files.composeFile, "down", "--remove-orphans"],
repoRoot,
await withMatrixQaHarnessTimeout(
"Matrix homeserver cleanup",
MATRIX_QA_CLEANUP_TIMEOUT_MS,
runCommand(
"docker",
["compose", "-f", files.composeFile, "down", "--remove-orphans"],
repoRoot,
),
);
},
};
@@ -293,6 +320,7 @@ export const __testing = {
MATRIX_QA_DEFAULT_PORT,
MATRIX_QA_DEFAULT_SERVER_NAME,
MATRIX_QA_SERVICE,
MATRIX_QA_CLEANUP_TIMEOUT_MS,
buildVersionsUrl,
isMatrixVersionsReachable,
renderMatrixQaCompose,

View File

@@ -96,7 +96,7 @@ steps:
- ref: env
- set: managed
value:
expr: "jobs.find((job) => job.description?.includes('[managed-by=memory-core.short-term-promotion]') || (job.name === 'Memory Dreaming Promotion' && ((job.payload?.kind === 'systemEvent' && job.payload.text === '__openclaw_memory_core_short_term_promotion_dream__') || (job.payload?.kind === 'agentTurn' && job.payload.message === '__openclaw_memory_core_short_term_promotion_dream__' && job.sessionTarget === 'isolated' && job.payload.lightContext === true && job.delivery?.mode === 'none'))))"
expr: "findManagedDreamingCronJob(jobs)"
- assert:
expr: "Boolean(managed?.id)"
message: managed dreaming cron job missing after enablement