Files
openclaw/extensions/qa-lab/src/suite-runtime-agent-process.ts
2026-04-24 20:39:33 +01:00

313 lines
8.9 KiB
TypeScript

import { spawn } from "node:child_process";
import { randomUUID } from "node:crypto";
import path from "node:path";
import { resolveQaNodeExecPath } from "./node-exec.js";
import { liveTurnTimeoutMs } from "./suite-runtime-agent-common.js";
import { waitForGatewayHealthy, waitForTransportReady } from "./suite-runtime-gateway.js";
import type { QaDreamingStatus, QaSuiteRuntimeEnv } from "./suite-runtime-types.js";
type QaMemorySearchResult = {
results?: Array<{ snippet?: string; text?: string; path?: string }>;
};
type QaCronJob = {
delivery?: { mode?: string };
description?: string;
id?: string;
name?: string;
payload?: { kind?: string; message?: string; text?: string; lightContext?: boolean };
sessionTarget?: string;
state?: { nextRunAtMs?: number };
};
const ANSI_ESCAPE_PATTERN = new RegExp(String.raw`\x1B\[[0-?]*[ -/]*[@-~]`, "g");
const MANAGED_DREAMING_CRON_MARKER = "[managed-by=memory-core.short-term-promotion]";
const MANAGED_DREAMING_CRON_NAME = "Memory Dreaming Promotion";
const MANAGED_DREAMING_PROMPT = "__openclaw_memory_core_short_term_promotion_dream__";
function stripAnsiCodes(text: string) {
return text.replace(ANSI_ESCAPE_PATTERN, "");
}
function parseQaCliJsonOutput(text: string) {
const cleaned = stripAnsiCodes(text).trim();
if (!cleaned) {
return {};
}
try {
return JSON.parse(cleaned) as unknown;
} catch {
// Some startup repair logs are emitted on stdout before command JSON.
const lines = cleaned.split(/\r?\n/);
for (let index = 0; index < lines.length; index += 1) {
const candidate = lines[index].trim();
if (!candidate.startsWith("{") && !candidate.startsWith("[")) {
continue;
}
try {
return JSON.parse(lines.slice(index).join("\n")) as unknown;
} catch {
// Keep looking for the actual payload start.
}
}
// Keep a line-oriented fallback for compact payloads followed by diagnostics.
for (const line of lines.toReversed()) {
const candidate = line.trim();
if (!candidate.startsWith("{") && !candidate.startsWith("[")) {
continue;
}
try {
return JSON.parse(candidate) as unknown;
} catch {
// Keep looking for the actual payload line.
}
}
throw new Error(`qa cli returned non-JSON stdout: ${cleaned.slice(0, 240)}`);
}
}
async function runQaCli(
env: Pick<
QaSuiteRuntimeEnv,
"gateway" | "repoRoot" | "primaryModel" | "alternateModel" | "providerMode"
>,
args: string[],
opts?: { timeoutMs?: number; json?: boolean },
) {
const stdout: Buffer[] = [];
const stderr: Buffer[] = [];
const distEntryPath = path.join(env.repoRoot, "dist", "index.js");
const nodeExecPath = await resolveQaNodeExecPath();
await new Promise<void>((resolve, reject) => {
const child = spawn(nodeExecPath, [distEntryPath, ...args], {
cwd: env.gateway.tempRoot,
env: env.gateway.runtimeEnv,
stdio: ["ignore", "pipe", "pipe"],
});
const timeout = setTimeout(() => {
child.kill("SIGKILL");
reject(new Error(`qa cli timed out: openclaw ${args.join(" ")}`));
}, opts?.timeoutMs ?? 60_000);
child.stdout.on("data", (chunk) => stdout.push(Buffer.from(chunk)));
child.stderr.on("data", (chunk) => stderr.push(Buffer.from(chunk)));
child.once("error", (error) => {
clearTimeout(timeout);
reject(error);
});
child.once("exit", (code) => {
clearTimeout(timeout);
if (code === 0) {
resolve();
return;
}
reject(
new Error(
`qa cli failed (${code ?? "unknown"}): ${Buffer.concat(stderr).toString("utf8").trim()}`,
),
);
});
});
const text = Buffer.concat(stdout).toString("utf8").trim();
if (!opts?.json) {
return text;
}
return parseQaCliJsonOutput(text);
}
async function startAgentRun(
env: Pick<QaSuiteRuntimeEnv, "gateway" | "transport">,
params: {
sessionKey: string;
message: string;
to?: string;
threadId?: string;
provider?: string;
model?: string;
timeoutMs?: number;
attachments?: Array<{
mimeType: string;
fileName: string;
content: string;
}>;
},
) {
const target = params.to ?? "dm:qa-operator";
const delivery = env.transport.buildAgentDelivery({ target });
const started = (await env.gateway.call(
"agent",
{
idempotencyKey: randomUUID(),
agentId: "qa",
sessionKey: params.sessionKey,
message: params.message,
deliver: true,
channel: delivery.channel,
to: target,
replyChannel: delivery.replyChannel,
replyTo: delivery.replyTo,
...(params.threadId ? { threadId: params.threadId } : {}),
...(params.provider ? { provider: params.provider } : {}),
...(params.model ? { model: params.model } : {}),
...(params.attachments ? { attachments: params.attachments } : {}),
},
{
timeoutMs: params.timeoutMs ?? 30_000,
},
)) as { runId?: string; status?: string };
if (!started.runId) {
throw new Error(`agent call did not return a runId: ${JSON.stringify(started)}`);
}
return started;
}
async function waitForAgentRun(
env: Pick<QaSuiteRuntimeEnv, "gateway">,
runId: string,
timeoutMs = 30_000,
) {
return (await env.gateway.call(
"agent.wait",
{
runId,
timeoutMs,
},
{
timeoutMs: timeoutMs + 5_000,
},
)) as { status?: string; error?: string };
}
async function listCronJobs(env: Pick<QaSuiteRuntimeEnv, "gateway">) {
const payload = (await env.gateway.call(
"cron.list",
{
includeDisabled: true,
limit: 200,
sortBy: "name",
sortDir: "asc",
},
{ timeoutMs: 30_000 },
)) as {
jobs?: QaCronJob[];
};
return payload.jobs ?? [];
}
function isManagedDreamingCronJob(job: QaCronJob) {
if (job.description?.includes(MANAGED_DREAMING_CRON_MARKER)) {
return true;
}
if (job.name !== MANAGED_DREAMING_CRON_NAME) {
return false;
}
if (job.payload?.kind === "systemEvent" && job.payload.text === MANAGED_DREAMING_PROMPT) {
return true;
}
return (
job.payload?.kind === "agentTurn" &&
job.payload.message === MANAGED_DREAMING_PROMPT &&
job.payload.lightContext === true &&
job.sessionTarget === "isolated" &&
job.delivery?.mode === "none"
);
}
function findManagedDreamingCronJob(jobs: readonly QaCronJob[]) {
return jobs.find(isManagedDreamingCronJob);
}
async function readDoctorMemoryStatus(env: Pick<QaSuiteRuntimeEnv, "gateway">) {
return (await env.gateway.call("doctor.memory.status", {}, { timeoutMs: 30_000 })) as {
dreaming?: QaDreamingStatus;
};
}
async function waitForMemorySearchMatch(params: {
search: () => Promise<QaMemorySearchResult>;
expectedNeedle: string;
timeoutMs: number;
}) {
const startedAt = Date.now();
while (Date.now() - startedAt < params.timeoutMs) {
const result = await params.search();
const haystack = JSON.stringify(result.results ?? []);
if (haystack.includes(params.expectedNeedle)) {
return result;
}
await new Promise((resolve) => setTimeout(resolve, 500));
}
throw new Error(`memory index missing expected fact after reindex: ${params.expectedNeedle}`);
}
async function forceMemoryIndex(params: {
env: Pick<
QaSuiteRuntimeEnv,
"gateway" | "transport" | "primaryModel" | "alternateModel" | "providerMode" | "repoRoot"
>;
query: string;
expectedNeedle: string;
}) {
await waitForGatewayHealthy(params.env, 60_000);
await waitForTransportReady(params.env, 60_000);
await runQaCli(params.env, ["memory", "index", "--agent", "qa", "--force"], {
timeoutMs: liveTurnTimeoutMs(params.env, 60_000),
});
return await waitForMemorySearchMatch({
expectedNeedle: params.expectedNeedle,
timeoutMs: liveTurnTimeoutMs(params.env, 20_000),
search: async () =>
(await runQaCli(
params.env,
["memory", "search", "--agent", "qa", "--json", "--query", params.query],
{
timeoutMs: liveTurnTimeoutMs(params.env, 60_000),
json: true,
},
)) as QaMemorySearchResult,
});
}
async function runAgentPrompt(
env: Pick<QaSuiteRuntimeEnv, "gateway" | "transport">,
params: {
sessionKey: string;
message: string;
to?: string;
threadId?: string;
provider?: string;
model?: string;
timeoutMs?: number;
attachments?: Array<{
mimeType: string;
fileName: string;
content: string;
}>;
},
) {
const started = await startAgentRun(env, params);
const waited = await waitForAgentRun(env, started.runId!, params.timeoutMs ?? 30_000);
if (waited.status !== "ok") {
throw new Error(
`agent.wait returned ${waited.status ?? "unknown"}: ${waited.error ?? "no error"}`,
);
}
return {
started,
waited,
};
}
export {
forceMemoryIndex,
findManagedDreamingCronJob,
isManagedDreamingCronJob,
listCronJobs,
readDoctorMemoryStatus,
runAgentPrompt,
runQaCli,
startAgentRun,
waitForMemorySearchMatch,
waitForAgentRun,
};