mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-23 15:11:42 +00:00
fix: harden complex qa suite scenarios
This commit is contained in:
@@ -2,6 +2,7 @@ import { mkdirSync, rmSync } from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { resolveSessionTranscriptsDirForAgent } from "openclaw/plugin-sdk/memory-core";
|
||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
clearMemoryEmbeddingProviders as clearRegistry,
|
||||
@@ -377,4 +378,123 @@ describe("memory index", () => {
|
||||
const noResults = await manager.search("nonexistent_xyz_keyword");
|
||||
expect(noResults.length).toBe(0);
|
||||
});
|
||||
|
||||
it("prefers exact session transcript hits in FTS-only mode", async () => {
|
||||
forceNoProvider = true;
|
||||
const stateDir = path.join(workspaceDir, ".state-session-ranking");
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
|
||||
try {
|
||||
const cfg = createCfg({
|
||||
storePath: path.join(workspaceDir, "index-fts-session-ranking.sqlite"),
|
||||
sources: ["memory", "sessions"],
|
||||
sessionMemory: true,
|
||||
minScore: 0,
|
||||
hybrid: { enabled: true, vectorWeight: 0.7, textWeight: 0.3 },
|
||||
});
|
||||
const result = await getMemorySearchManager({ cfg, agentId: "main" });
|
||||
const manager = requireManager(result);
|
||||
managersForCleanup.add(manager);
|
||||
resetManagerForTest(manager);
|
||||
|
||||
const memoryPath = path.join(workspaceDir, "MEMORY.md");
|
||||
await fs.writeFile(memoryPath, "Project Nebula stale codename: ORBIT-9.\n", "utf8");
|
||||
const staleAt = new Date("2020-01-01T00:00:00.000Z");
|
||||
await fs.utimes(memoryPath, staleAt, staleAt);
|
||||
|
||||
const sessionsDir = resolveSessionTranscriptsDirForAgent("main");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const transcriptPath = path.join(sessionsDir, "session-ranking.jsonl");
|
||||
const now = Date.parse("2026-04-07T15:25:04.113Z");
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "session",
|
||||
id: "session-ranking",
|
||||
timestamp: new Date(now - 60_000).toISOString(),
|
||||
}),
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
timestamp: new Date(now - 30_000).toISOString(),
|
||||
content: [{ type: "text", text: "What is the current Project Nebula codename?" }],
|
||||
},
|
||||
}),
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "assistant",
|
||||
timestamp: new Date(now).toISOString(),
|
||||
content: [{ type: "text", text: "The current Project Nebula codename is ORBIT-10." }],
|
||||
},
|
||||
}),
|
||||
].join("\n") + "\n",
|
||||
"utf8",
|
||||
);
|
||||
|
||||
await manager.sync({ reason: "test", force: true });
|
||||
const results = await manager.search("current Project Nebula codename ORBIT-10", {
|
||||
minScore: 0,
|
||||
maxResults: 3,
|
||||
});
|
||||
|
||||
expect(results[0]?.source).toBe("sessions");
|
||||
expect(results[0]?.snippet).toContain("ORBIT-10");
|
||||
} finally {
|
||||
vi.unstubAllEnvs();
|
||||
}
|
||||
});
|
||||
|
||||
it("bootstraps an empty index on first search so session transcript hits are available", async () => {
|
||||
forceNoProvider = true;
|
||||
const stateDir = path.join(workspaceDir, ".state-session-bootstrap");
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
|
||||
try {
|
||||
const cfg = createCfg({
|
||||
storePath: path.join(workspaceDir, "index-fts-session-bootstrap.sqlite"),
|
||||
sources: ["memory", "sessions"],
|
||||
sessionMemory: true,
|
||||
minScore: 0,
|
||||
hybrid: { enabled: true, vectorWeight: 0.7, textWeight: 0.3 },
|
||||
});
|
||||
const result = await getMemorySearchManager({ cfg, agentId: "main" });
|
||||
const manager = requireManager(result);
|
||||
managersForCleanup.add(manager);
|
||||
resetManagerForTest(manager);
|
||||
|
||||
const sessionsDir = resolveSessionTranscriptsDirForAgent("main");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const transcriptPath = path.join(sessionsDir, "session-bootstrap.jsonl");
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "session",
|
||||
id: "session-bootstrap",
|
||||
timestamp: "2026-04-07T15:24:04.113Z",
|
||||
}),
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "assistant",
|
||||
timestamp: "2026-04-07T15:25:04.113Z",
|
||||
content: [{ type: "text", text: "The current Project Nebula codename is ORBIT-10." }],
|
||||
},
|
||||
}),
|
||||
].join("\n") + "\n",
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const results = await manager.search("current Project Nebula codename ORBIT-10", {
|
||||
minScore: 0,
|
||||
maxResults: 3,
|
||||
});
|
||||
|
||||
expect(results[0]?.source).toBe("sessions");
|
||||
expect(results[0]?.snippet).toContain("ORBIT-10");
|
||||
} finally {
|
||||
vi.unstubAllEnvs();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -54,6 +54,7 @@ import {
|
||||
runMemorySyncWithReadonlyRecovery,
|
||||
type MemoryReadonlyRecoveryState,
|
||||
} from "./manager-sync-control.js";
|
||||
import { applyTemporalDecayToHybridResults } from "./temporal-decay.js";
|
||||
const SNIPPET_MAX_CHARS = 700;
|
||||
const VECTOR_TABLE = "chunks_vec";
|
||||
const FTS_TABLE = "chunks_fts";
|
||||
@@ -292,9 +293,21 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
sessionKey?: string;
|
||||
},
|
||||
): Promise<MemorySearchResult[]> {
|
||||
let hasIndexedContent = this.hasIndexedContent();
|
||||
if (!hasIndexedContent) {
|
||||
try {
|
||||
// A fresh process can receive its first search before background watch/session
|
||||
// syncs have built the index. Force one synchronous bootstrap so the first
|
||||
// lookup after restart does not fail closed with empty results.
|
||||
await this.sync({ reason: "search", force: true });
|
||||
} catch (err) {
|
||||
log.warn(`memory sync failed (search-bootstrap): ${String(err)}`);
|
||||
}
|
||||
hasIndexedContent = this.hasIndexedContent();
|
||||
}
|
||||
const preflight = resolveMemorySearchPreflight({
|
||||
query,
|
||||
hasIndexedContent: this.hasIndexedContent(),
|
||||
hasIndexedContent,
|
||||
});
|
||||
if (!preflight.shouldSearch) {
|
||||
return [];
|
||||
@@ -328,17 +341,23 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
return [];
|
||||
}
|
||||
|
||||
// Extract keywords for better FTS matching on conversational queries
|
||||
// e.g., "that thing we discussed about the API" → ["discussed", "API"]
|
||||
const keywords = extractKeywords(cleaned, {
|
||||
ftsTokenizer: this.settings.store.fts.tokenizer,
|
||||
});
|
||||
const searchTerms = keywords.length > 0 ? keywords : [cleaned];
|
||||
|
||||
// Search with each keyword and merge results
|
||||
const resultSets = await Promise.all(
|
||||
searchTerms.map((term) => this.searchKeyword(term, candidates).catch(() => [])),
|
||||
);
|
||||
const fullQueryResults = await this.searchKeyword(cleaned, candidates).catch(() => []);
|
||||
const resultSets =
|
||||
fullQueryResults.length > 0
|
||||
? [fullQueryResults]
|
||||
: await Promise.all(
|
||||
// Fallback: broaden recall for conversational queries when the
|
||||
// exact AND query is too strict to return any results.
|
||||
(() => {
|
||||
const keywords = extractKeywords(cleaned, {
|
||||
ftsTokenizer: this.settings.store.fts.tokenizer,
|
||||
});
|
||||
const searchTerms = keywords.length > 0 ? keywords : [cleaned];
|
||||
return searchTerms.map((term) =>
|
||||
this.searchKeyword(term, candidates).catch(() => []),
|
||||
);
|
||||
})(),
|
||||
);
|
||||
|
||||
// Merge and deduplicate results, keeping highest score for each chunk
|
||||
const seenIds = new Map<string, (typeof resultSets)[0][0]>();
|
||||
@@ -351,8 +370,14 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
}
|
||||
}
|
||||
|
||||
const merged = [...seenIds.values()].toSorted((a, b) => b.score - a.score);
|
||||
return this.selectScoredResults(merged, maxResults, minScore, 0);
|
||||
const merged = [...seenIds.values()];
|
||||
const decayed = await applyTemporalDecayToHybridResults({
|
||||
results: merged,
|
||||
temporalDecay: hybrid.temporalDecay,
|
||||
workspaceDir: this.workspaceDir,
|
||||
});
|
||||
const sorted = decayed.toSorted((a, b) => b.score - a.score);
|
||||
return this.selectScoredResults(sorted, maxResults, minScore, 0);
|
||||
}
|
||||
|
||||
// If FTS isn't available, hybrid mode cannot use keyword search; degrade to vector-only.
|
||||
|
||||
@@ -34,6 +34,7 @@ describe("openai image generation provider", () => {
|
||||
postJsonRequestMock.mockReset();
|
||||
assertOkOrThrowHttpErrorMock.mockClear();
|
||||
resolveProviderHttpRequestConfigMock.mockClear();
|
||||
vi.unstubAllEnvs();
|
||||
});
|
||||
|
||||
it("does not auto-allow local baseUrl overrides for image requests", async () => {
|
||||
@@ -77,6 +78,88 @@ describe("openai image generation provider", () => {
|
||||
expect(result.images).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("allows loopback image requests for the synthetic mock-openai provider", async () => {
|
||||
postJsonRequestMock.mockResolvedValue({
|
||||
response: {
|
||||
json: async () => ({
|
||||
data: [{ b64_json: Buffer.from("png-bytes").toString("base64") }],
|
||||
}),
|
||||
},
|
||||
release: vi.fn(async () => {}),
|
||||
});
|
||||
|
||||
const provider = buildOpenAIImageGenerationProvider();
|
||||
const result = await provider.generateImage({
|
||||
provider: "mock-openai",
|
||||
model: "gpt-image-1",
|
||||
prompt: "Draw a QA lighthouse",
|
||||
cfg: {
|
||||
models: {
|
||||
providers: {
|
||||
openai: {
|
||||
baseUrl: "http://127.0.0.1:44080/v1",
|
||||
models: [],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(resolveProviderHttpRequestConfigMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
allowPrivateNetwork: true,
|
||||
}),
|
||||
);
|
||||
expect(postJsonRequestMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
url: "http://127.0.0.1:44080/v1/images/generations",
|
||||
allowPrivateNetwork: true,
|
||||
}),
|
||||
);
|
||||
expect(result.images).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("allows loopback image requests for openai only inside the QA harness envelope", async () => {
|
||||
postJsonRequestMock.mockResolvedValue({
|
||||
response: {
|
||||
json: async () => ({
|
||||
data: [{ b64_json: Buffer.from("png-bytes").toString("base64") }],
|
||||
}),
|
||||
},
|
||||
release: vi.fn(async () => {}),
|
||||
});
|
||||
vi.stubEnv("OPENCLAW_QA_ALLOW_LOCAL_IMAGE_PROVIDER", "1");
|
||||
|
||||
const provider = buildOpenAIImageGenerationProvider();
|
||||
const result = await provider.generateImage({
|
||||
provider: "openai",
|
||||
model: "gpt-image-1",
|
||||
prompt: "Draw a QA lighthouse",
|
||||
cfg: {
|
||||
models: {
|
||||
providers: {
|
||||
openai: {
|
||||
baseUrl: "http://127.0.0.1:44080/v1",
|
||||
models: [],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(resolveProviderHttpRequestConfigMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
allowPrivateNetwork: true,
|
||||
}),
|
||||
);
|
||||
expect(postJsonRequestMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
allowPrivateNetwork: true,
|
||||
}),
|
||||
);
|
||||
expect(result.images).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("uses JSON image_url edits for input-image requests", async () => {
|
||||
postJsonRequestMock.mockResolvedValue({
|
||||
response: {
|
||||
|
||||
@@ -14,6 +14,21 @@ const DEFAULT_OUTPUT_MIME = "image/png";
|
||||
const DEFAULT_SIZE = "1024x1024";
|
||||
const OPENAI_SUPPORTED_SIZES = ["1024x1024", "1024x1536", "1536x1024"] as const;
|
||||
const OPENAI_MAX_INPUT_IMAGES = 5;
|
||||
const MOCK_OPENAI_PROVIDER_ID = "mock-openai";
|
||||
|
||||
function shouldAllowPrivateImageEndpoint(req: {
|
||||
provider: string;
|
||||
cfg: { models?: { providers?: Record<string, { baseUrl?: string }> } };
|
||||
}) {
|
||||
if (req.provider === MOCK_OPENAI_PROVIDER_ID) {
|
||||
return true;
|
||||
}
|
||||
const baseUrl = resolveConfiguredOpenAIBaseUrl(req.cfg);
|
||||
if (!baseUrl.startsWith("http://127.0.0.1:") && !baseUrl.startsWith("http://localhost:")) {
|
||||
return false;
|
||||
}
|
||||
return process.env.OPENCLAW_QA_ALLOW_LOCAL_IMAGE_PROVIDER === "1";
|
||||
}
|
||||
|
||||
type OpenAIImageApiResponse = {
|
||||
data?: Array<{
|
||||
@@ -68,6 +83,7 @@ export function buildOpenAIImageGenerationProvider(): ImageGenerationProvider {
|
||||
resolveProviderHttpRequestConfig({
|
||||
baseUrl: resolveConfiguredOpenAIBaseUrl(req.cfg),
|
||||
defaultBaseUrl: DEFAULT_OPENAI_IMAGE_BASE_URL,
|
||||
allowPrivateNetwork: shouldAllowPrivateImageEndpoint(req),
|
||||
defaultHeaders: {
|
||||
Authorization: `Bearer ${auth.apiKey}`,
|
||||
},
|
||||
|
||||
@@ -2,7 +2,7 @@ import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { buildQaRuntimeEnv, resolveQaControlUiRoot } from "./gateway-child.js";
|
||||
import { __testing, buildQaRuntimeEnv, resolveQaControlUiRoot } from "./gateway-child.js";
|
||||
|
||||
const cleanups: Array<() => Promise<void>> = [];
|
||||
|
||||
@@ -33,6 +33,7 @@ describe("buildQaRuntimeEnv", () => {
|
||||
});
|
||||
|
||||
expect(env.OPENCLAW_TEST_FAST).toBe("1");
|
||||
expect(env.OPENCLAW_QA_ALLOW_LOCAL_IMAGE_PROVIDER).toBe("1");
|
||||
expect(env.OPENCLAW_ALLOW_SLOW_REPLY_TESTS).toBe("1");
|
||||
});
|
||||
|
||||
@@ -93,6 +94,17 @@ describe("buildQaRuntimeEnv", () => {
|
||||
expect(env.OPENCLAW_LIVE_ANTHROPIC_KEYS).toBeUndefined();
|
||||
expect(env.OPENCLAW_LIVE_GEMINI_KEY).toBeUndefined();
|
||||
});
|
||||
|
||||
it("treats restart socket closures as retryable gateway call errors", () => {
|
||||
expect(__testing.isRetryableGatewayCallError("gateway closed (1006 abnormal closure)")).toBe(
|
||||
true,
|
||||
);
|
||||
expect(__testing.isRetryableGatewayCallError("gateway closed (1012 service restart)")).toBe(
|
||||
true,
|
||||
);
|
||||
expect(__testing.isRetryableGatewayCallError("service restart in progress")).toBe(true);
|
||||
expect(__testing.isRetryableGatewayCallError("permission denied")).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolveQaControlUiRoot", () => {
|
||||
|
||||
@@ -110,7 +110,9 @@ export function buildQaRuntimeEnv(params: {
|
||||
OPENCLAW_SKIP_CANVAS_HOST: "1",
|
||||
OPENCLAW_NO_RESPAWN: "1",
|
||||
OPENCLAW_TEST_FAST: "1",
|
||||
// QA still exercises normal reply-config flows under the fast envelope.
|
||||
OPENCLAW_QA_ALLOW_LOCAL_IMAGE_PROVIDER: "1",
|
||||
// QA uses the fast runtime envelope for speed, but it still exercises
|
||||
// normal config-driven heartbeats and runtime config writes.
|
||||
OPENCLAW_ALLOW_SLOW_REPLY_TESTS: "1",
|
||||
XDG_CONFIG_HOME: params.xdgConfigHome,
|
||||
XDG_DATA_HOME: params.xdgDataHome,
|
||||
@@ -119,6 +121,19 @@ export function buildQaRuntimeEnv(params: {
|
||||
return normalizeQaProviderModeEnv(env, params.providerMode);
|
||||
}
|
||||
|
||||
function isRetryableGatewayCallError(details: string): boolean {
|
||||
return (
|
||||
details.includes("gateway closed (1012)") ||
|
||||
details.includes("gateway closed (1006") ||
|
||||
details.includes("abnormal closure") ||
|
||||
details.includes("service restart")
|
||||
);
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
buildQaRuntimeEnv,
|
||||
isRetryableGatewayCallError,
|
||||
};
|
||||
async function waitForGatewayReady(params: {
|
||||
baseUrl: string;
|
||||
logs: () => string;
|
||||
@@ -276,18 +291,47 @@ export async function startQaGatewayChild(params: {
|
||||
cfg,
|
||||
baseUrl,
|
||||
wsUrl,
|
||||
pid: child.pid ?? null,
|
||||
token: gatewayToken,
|
||||
workspaceDir,
|
||||
tempRoot,
|
||||
configPath,
|
||||
runtimeEnv: env,
|
||||
logs,
|
||||
async restart(signal: NodeJS.Signals = "SIGUSR1") {
|
||||
if (!child.pid) {
|
||||
throw new Error("qa gateway child has no pid");
|
||||
}
|
||||
process.kill(child.pid, signal);
|
||||
},
|
||||
async call(
|
||||
method: string,
|
||||
rpcParams?: unknown,
|
||||
opts?: { expectFinal?: boolean; timeoutMs?: number },
|
||||
) {
|
||||
return await rpcClient.request(method, rpcParams, opts);
|
||||
const timeoutMs = opts?.timeoutMs ?? 20_000;
|
||||
let lastDetails = "";
|
||||
for (let attempt = 1; attempt <= 3; attempt += 1) {
|
||||
try {
|
||||
return await rpcClient.request(method, rpcParams, {
|
||||
...opts,
|
||||
timeoutMs,
|
||||
});
|
||||
} catch (error) {
|
||||
const details = formatErrorMessage(error);
|
||||
lastDetails = details;
|
||||
if (attempt >= 3 || !isRetryableGatewayCallError(details)) {
|
||||
throw new Error(`${details}\nGateway logs:\n${logs()}`, { cause: error });
|
||||
}
|
||||
await waitForGatewayReady({
|
||||
baseUrl,
|
||||
logs,
|
||||
child,
|
||||
timeoutMs: Math.max(10_000, timeoutMs),
|
||||
});
|
||||
}
|
||||
}
|
||||
throw new Error(`${lastDetails}\nGateway logs:\n${logs()}`);
|
||||
},
|
||||
async stop(opts?: { keepTemp?: boolean }) {
|
||||
await rpcClient.stop().catch(() => {});
|
||||
|
||||
@@ -266,6 +266,209 @@ describe("qa mock openai server", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("supports advanced QA memory and subagent recovery prompts", async () => {
|
||||
const server = await startQaMockOpenAiServer({
|
||||
host: "127.0.0.1",
|
||||
port: 0,
|
||||
});
|
||||
cleanups.push(async () => {
|
||||
await server.stop();
|
||||
});
|
||||
|
||||
const memory = await fetch(`${server.baseUrl}/v1/responses`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
stream: true,
|
||||
input: [
|
||||
{
|
||||
role: "user",
|
||||
content: [
|
||||
{
|
||||
type: "input_text",
|
||||
text: "Session memory ranking check: what is the current Project Nebula codename? Use memory tools first.",
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}),
|
||||
});
|
||||
expect(memory.status).toBe(200);
|
||||
expect(await memory.text()).toContain('"name":"memory_search"');
|
||||
|
||||
const memoryFollowup = await fetch(`${server.baseUrl}/v1/responses`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
stream: true,
|
||||
input: [
|
||||
{
|
||||
role: "user",
|
||||
content: [
|
||||
{
|
||||
type: "input_text",
|
||||
text: "Session memory ranking check: what is the current Project Nebula codename? Use memory tools first.",
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
type: "function_call_output",
|
||||
output: JSON.stringify({
|
||||
results: [
|
||||
{
|
||||
path: "sessions/qa-session-memory-ranking.jsonl",
|
||||
startLine: 2,
|
||||
endLine: 3,
|
||||
},
|
||||
],
|
||||
}),
|
||||
},
|
||||
],
|
||||
}),
|
||||
});
|
||||
expect(memoryFollowup.status).toBe(200);
|
||||
expect(await memoryFollowup.text()).toContain(
|
||||
"Protocol note: I checked memory and the current Project Nebula codename is ORBIT-10.",
|
||||
);
|
||||
|
||||
const spawn = await fetch(`${server.baseUrl}/v1/responses`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
stream: true,
|
||||
input: [
|
||||
{
|
||||
role: "user",
|
||||
content: [
|
||||
{
|
||||
type: "input_text",
|
||||
text: "Subagent fanout synthesis check: delegate two bounded subagents sequentially, then report both results together.",
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}),
|
||||
});
|
||||
expect(spawn.status).toBe(200);
|
||||
const spawnBody = await spawn.text();
|
||||
expect(spawnBody).toContain('"name":"sessions_spawn"');
|
||||
expect(spawnBody).toContain('\\"label\\":\\"qa-fanout-alpha\\"');
|
||||
|
||||
const secondSpawn = await fetch(`${server.baseUrl}/v1/responses`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
stream: true,
|
||||
input: [
|
||||
{
|
||||
role: "user",
|
||||
content: [
|
||||
{
|
||||
type: "input_text",
|
||||
text: "Subagent fanout synthesis check: delegate two bounded subagents sequentially, then report both results together.",
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
type: "function_call_output",
|
||||
output:
|
||||
'{"status":"accepted","childSessionKey":"agent:qa:subagent:alpha","note":"ALPHA-OK"}',
|
||||
},
|
||||
],
|
||||
}),
|
||||
});
|
||||
expect(secondSpawn.status).toBe(200);
|
||||
const secondSpawnBody = await secondSpawn.text();
|
||||
expect(secondSpawnBody).toContain('"name":"sessions_spawn"');
|
||||
expect(secondSpawnBody).toContain('\\"label\\":\\"qa-fanout-beta\\"');
|
||||
|
||||
const final = await fetch(`${server.baseUrl}/v1/responses`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
stream: false,
|
||||
input: [
|
||||
{
|
||||
role: "user",
|
||||
content: [
|
||||
{
|
||||
type: "input_text",
|
||||
text: "Subagent fanout synthesis check: delegate two bounded subagents sequentially, then report both results together.",
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
type: "function_call_output",
|
||||
output:
|
||||
'{"status":"accepted","childSessionKey":"agent:qa:subagent:beta","note":"BETA-OK"}',
|
||||
},
|
||||
],
|
||||
}),
|
||||
});
|
||||
expect(final.status).toBe(200);
|
||||
expect(await final.json()).toMatchObject({
|
||||
output: [
|
||||
{
|
||||
content: [
|
||||
{
|
||||
text: "Protocol note: delegated fanout complete. Alpha=ALPHA-OK. Beta=BETA-OK.",
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it("answers heartbeat prompts without spawning extra subagents", async () => {
|
||||
const server = await startQaMockOpenAiServer({
|
||||
host: "127.0.0.1",
|
||||
port: 0,
|
||||
});
|
||||
cleanups.push(async () => {
|
||||
await server.stop();
|
||||
});
|
||||
|
||||
const response = await fetch(`${server.baseUrl}/v1/responses`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
stream: false,
|
||||
input: [
|
||||
{
|
||||
role: "user",
|
||||
content: [
|
||||
{
|
||||
type: "input_text",
|
||||
text: "System: Gateway restart config-apply ok\nSystem: QA-SUBAGENT-RECOVERY-1234\n\nRead HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.",
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}),
|
||||
});
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
expect(await response.json()).toMatchObject({
|
||||
output: [
|
||||
{
|
||||
content: [{ text: "HEARTBEAT_OK" }],
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it("returns exact markers for visible and hot-installed skills", async () => {
|
||||
const server = await startQaMockOpenAiServer({
|
||||
host: "127.0.0.1",
|
||||
@@ -384,6 +587,50 @@ describe("qa mock openai server", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("describes reattached generated images in the roundtrip flow", async () => {
|
||||
const server = await startQaMockOpenAiServer({
|
||||
host: "127.0.0.1",
|
||||
port: 0,
|
||||
});
|
||||
cleanups.push(async () => {
|
||||
await server.stop();
|
||||
});
|
||||
|
||||
const response = await fetch(`${server.baseUrl}/v1/responses`, {
|
||||
method: "POST",
|
||||
headers: { "content-type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
stream: false,
|
||||
model: "mock-openai/gpt-5.4",
|
||||
input: [
|
||||
{
|
||||
role: "user",
|
||||
content: [
|
||||
{
|
||||
type: "input_text",
|
||||
text: "Roundtrip image inspection check: describe the generated lighthouse attachment in one short sentence.",
|
||||
},
|
||||
{
|
||||
type: "input_image",
|
||||
source: {
|
||||
type: "base64",
|
||||
mime_type: "image/png",
|
||||
data: QA_IMAGE_PNG_BASE64,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}),
|
||||
});
|
||||
expect(response.status).toBe(200);
|
||||
const payload = (await response.json()) as {
|
||||
output?: Array<{ content?: Array<{ text?: string }> }>;
|
||||
};
|
||||
const text = payload.output?.[0]?.content?.[0]?.text ?? "";
|
||||
expect(text.toLowerCase()).toContain("lighthouse");
|
||||
});
|
||||
|
||||
it("ignores stale tool output from prior turns when planning the current turn", async () => {
|
||||
const server = await startQaMockOpenAiServer({
|
||||
host: "127.0.0.1",
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
|
||||
import { setTimeout as sleep } from "node:timers/promises";
|
||||
|
||||
type ResponsesInputItem = Record<string, unknown>;
|
||||
|
||||
@@ -33,6 +34,7 @@ type MockOpenAiRequestSnapshot = {
|
||||
|
||||
const TINY_PNG_BASE64 =
|
||||
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO7Z0nQAAAAASUVORK5CYII=";
|
||||
let subagentFanoutPhase = 0;
|
||||
|
||||
function readBody(req: IncomingMessage): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
@@ -280,7 +282,7 @@ function extractRememberedFact(userTexts: string[]) {
|
||||
}
|
||||
|
||||
function extractOrbitCode(text: string) {
|
||||
return /\b(?:ORBIT-9|orbit-9)\b/.exec(text)?.[0]?.toUpperCase() ?? null;
|
||||
return /\bORBIT-\d+\b/i.exec(text)?.[0]?.toUpperCase() ?? null;
|
||||
}
|
||||
|
||||
function extractExactReplyDirective(text: string) {
|
||||
@@ -288,6 +290,10 @@ function extractExactReplyDirective(text: string) {
|
||||
return match?.[1]?.trim() || null;
|
||||
}
|
||||
|
||||
function isHeartbeatPrompt(text: string) {
|
||||
return /Read HEARTBEAT\.md if it exists/i.test(text);
|
||||
}
|
||||
|
||||
function buildAssistantText(input: ResponsesInputItem[], body: Record<string, unknown>) {
|
||||
const prompt = extractLastUserText(input);
|
||||
const toolOutput = extractToolOutput(input);
|
||||
@@ -316,6 +322,9 @@ function buildAssistantText(input: ResponsesInputItem[], body: Record<string, un
|
||||
if (/memory unavailable check/i.test(prompt)) {
|
||||
return "Protocol note: I checked the available runtime context but could not confirm the hidden memory-only fact, so I will not guess.";
|
||||
}
|
||||
if (isHeartbeatPrompt(prompt)) {
|
||||
return "HEARTBEAT_OK";
|
||||
}
|
||||
if (/\bmarker\b/i.test(prompt) && exactReplyDirective) {
|
||||
return exactReplyDirective;
|
||||
}
|
||||
@@ -331,16 +340,43 @@ function buildAssistantText(input: ResponsesInputItem[], body: Record<string, un
|
||||
if (/tool continuity check/i.test(prompt) && toolOutput) {
|
||||
return `Protocol note: model switch handoff confirmed on ${model || "the requested model"}. QA mission from QA_KICKOFF_TASK.md still applies: understand this OpenClaw repo from source + docs before acting.`;
|
||||
}
|
||||
if (/session memory ranking check/i.test(prompt) && orbitCode) {
|
||||
return `Protocol note: I checked memory and the current Project Nebula codename is ${orbitCode}.`;
|
||||
}
|
||||
if (/thread memory check/i.test(prompt) && orbitCode) {
|
||||
return `Protocol note: I checked memory in-thread and the hidden thread codename is ${orbitCode}.`;
|
||||
}
|
||||
if (/switch(?:ing)? models?/i.test(prompt)) {
|
||||
return `Protocol note: model switch acknowledged. Continuing on ${model || "the requested model"}.`;
|
||||
}
|
||||
if (/image generation check/i.test(prompt) && mediaPath) {
|
||||
if (/(image generation check|capability flip image check)/i.test(prompt) && mediaPath) {
|
||||
return `Protocol note: generated the QA lighthouse image successfully.\nMEDIA:${mediaPath}`;
|
||||
}
|
||||
if (/roundtrip image inspection check/i.test(prompt) && imageInputCount > 0) {
|
||||
return "Protocol note: the generated attachment shows the same QA lighthouse scene from the previous step.";
|
||||
}
|
||||
if (/image understanding check/i.test(prompt) && imageInputCount > 0) {
|
||||
return "Protocol note: the attached image is split horizontally, with red on top and blue on the bottom.";
|
||||
}
|
||||
if (toolOutput && /delegate|subagent/i.test(prompt)) {
|
||||
if (
|
||||
/interrupted by a gateway reload/i.test(prompt) &&
|
||||
/subagent recovery worker/i.test(allInputText)
|
||||
) {
|
||||
return "RECOVERED-SUBAGENT-OK";
|
||||
}
|
||||
if (/subagent recovery worker/i.test(prompt)) {
|
||||
return "RECOVERED-SUBAGENT-OK";
|
||||
}
|
||||
if (/fanout worker alpha/i.test(prompt)) {
|
||||
return "ALPHA-OK";
|
||||
}
|
||||
if (/fanout worker beta/i.test(prompt)) {
|
||||
return "BETA-OK";
|
||||
}
|
||||
if (/subagent fanout synthesis check/i.test(prompt) && toolOutput && subagentFanoutPhase >= 2) {
|
||||
return "Protocol note: delegated fanout complete. Alpha=ALPHA-OK. Beta=BETA-OK.";
|
||||
}
|
||||
if (toolOutput && (/\bdelegate\b/i.test(prompt) || /subagent handoff/i.test(prompt))) {
|
||||
return `Protocol note: delegated result acknowledged. The bounded subagent task returned and is folded back into the main thread.`;
|
||||
}
|
||||
if (toolOutput && /worked, failed, blocked|worked\/failed\/blocked|follow-up/i.test(prompt)) {
|
||||
@@ -415,7 +451,7 @@ function buildAssistantEvents(text: string): StreamEvent[] {
|
||||
];
|
||||
}
|
||||
|
||||
function buildResponsesPayload(body: Record<string, unknown>) {
|
||||
async function buildResponsesPayload(body: Record<string, unknown>) {
|
||||
const input = Array.isArray(body.input) ? (body.input as ResponsesInputItem[]) : [];
|
||||
const prompt = extractLastUserText(input);
|
||||
const toolOutput = extractToolOutput(input);
|
||||
@@ -423,6 +459,9 @@ function buildResponsesPayload(body: Record<string, unknown>) {
|
||||
const allInputText = extractAllInputTexts(input);
|
||||
const isGroupChat = allInputText.includes('"is_group_chat": true');
|
||||
const isBaselineUnmentionedChannelChatter = /\bno bot ping here\b/i.test(prompt);
|
||||
if (isHeartbeatPrompt(prompt)) {
|
||||
return buildAssistantEvents("HEARTBEAT_OK");
|
||||
}
|
||||
if (/lobster invaders/i.test(prompt)) {
|
||||
if (!toolOutput) {
|
||||
return buildToolCallEventsWithArgs("read", { path: "QA_KICKOFF_TASK.md" });
|
||||
@@ -466,17 +505,97 @@ function buildResponsesPayload(body: Record<string, unknown>) {
|
||||
});
|
||||
}
|
||||
}
|
||||
if (/image generation check/i.test(prompt) && !toolOutput) {
|
||||
if (/session memory ranking check/i.test(prompt)) {
|
||||
if (!toolOutput) {
|
||||
return buildToolCallEventsWithArgs("memory_search", {
|
||||
query: "current Project Nebula codename ORBIT-10",
|
||||
maxResults: 3,
|
||||
});
|
||||
}
|
||||
const results = Array.isArray(toolJson?.results)
|
||||
? (toolJson.results as Array<Record<string, unknown>>)
|
||||
: [];
|
||||
const first = results[0];
|
||||
const firstPath = typeof first?.path === "string" ? first.path : undefined;
|
||||
if (first?.source === "sessions" || firstPath?.startsWith("sessions/")) {
|
||||
return buildAssistantEvents(
|
||||
"Protocol note: I checked memory and the current Project Nebula codename is ORBIT-10.",
|
||||
);
|
||||
}
|
||||
if (
|
||||
typeof first?.path === "string" &&
|
||||
(typeof first.startLine === "number" || typeof first.endLine === "number")
|
||||
) {
|
||||
const from =
|
||||
typeof first.startLine === "number"
|
||||
? Math.max(1, first.startLine)
|
||||
: typeof first.endLine === "number"
|
||||
? Math.max(1, first.endLine)
|
||||
: 1;
|
||||
return buildToolCallEventsWithArgs("memory_get", {
|
||||
path: first.path,
|
||||
from,
|
||||
lines: 4,
|
||||
});
|
||||
}
|
||||
}
|
||||
if (/thread memory check/i.test(prompt)) {
|
||||
if (!toolOutput) {
|
||||
return buildToolCallEventsWithArgs("memory_search", {
|
||||
query: "hidden thread codename ORBIT-22",
|
||||
maxResults: 3,
|
||||
});
|
||||
}
|
||||
const results = Array.isArray(toolJson?.results)
|
||||
? (toolJson.results as Array<Record<string, unknown>>)
|
||||
: [];
|
||||
const first = results[0];
|
||||
if (
|
||||
typeof first?.path === "string" &&
|
||||
(typeof first.startLine === "number" || typeof first.endLine === "number")
|
||||
) {
|
||||
const from =
|
||||
typeof first.startLine === "number"
|
||||
? Math.max(1, first.startLine)
|
||||
: typeof first.endLine === "number"
|
||||
? Math.max(1, first.endLine)
|
||||
: 1;
|
||||
return buildToolCallEventsWithArgs("memory_get", {
|
||||
path: first.path,
|
||||
from,
|
||||
lines: 4,
|
||||
});
|
||||
}
|
||||
}
|
||||
if (/(image generation check|capability flip image check)/i.test(prompt) && !toolOutput) {
|
||||
return buildToolCallEventsWithArgs("image_generate", {
|
||||
prompt: "A QA lighthouse on a dark sea with a tiny protocol droid silhouette.",
|
||||
filename: "qa-lighthouse.png",
|
||||
size: "1024x1024",
|
||||
});
|
||||
}
|
||||
if (/subagent fanout synthesis check/i.test(prompt)) {
|
||||
if (!toolOutput && subagentFanoutPhase === 0) {
|
||||
subagentFanoutPhase = 1;
|
||||
return buildToolCallEventsWithArgs("sessions_spawn", {
|
||||
task: "Fanout worker alpha: inspect the QA workspace and finish with exactly ALPHA-OK.",
|
||||
label: "qa-fanout-alpha",
|
||||
thread: false,
|
||||
});
|
||||
}
|
||||
if (toolOutput && subagentFanoutPhase === 1) {
|
||||
subagentFanoutPhase = 2;
|
||||
return buildToolCallEventsWithArgs("sessions_spawn", {
|
||||
task: "Fanout worker beta: inspect the QA workspace and finish with exactly BETA-OK.",
|
||||
label: "qa-fanout-beta",
|
||||
thread: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
if (/tool continuity check/i.test(prompt) && !toolOutput) {
|
||||
return buildToolCallEventsWithArgs("read", { path: "QA_KICKOFF_TASK.md" });
|
||||
}
|
||||
if (/delegate|subagent/i.test(prompt) && !toolOutput) {
|
||||
if ((/\bdelegate\b/i.test(prompt) || /subagent handoff/i.test(prompt)) && !toolOutput) {
|
||||
return buildToolCallEventsWithArgs("sessions_spawn", {
|
||||
task: "Inspect the QA workspace and return one concise protocol note.",
|
||||
label: "qa-sidecar",
|
||||
@@ -501,11 +620,18 @@ function buildResponsesPayload(body: Record<string, unknown>) {
|
||||
if (isGroupChat && isBaselineUnmentionedChannelChatter && !toolOutput) {
|
||||
return buildAssistantEvents("NO_REPLY");
|
||||
}
|
||||
if (
|
||||
/subagent recovery worker/i.test(prompt) &&
|
||||
!/interrupted by a gateway reload/i.test(prompt)
|
||||
) {
|
||||
await sleep(60_000);
|
||||
}
|
||||
return buildAssistantEvents(buildAssistantText(input, body));
|
||||
}
|
||||
|
||||
export async function startQaMockOpenAiServer(params?: { host?: string; port?: number }) {
|
||||
const host = params?.host ?? "127.0.0.1";
|
||||
subagentFanoutPhase = 0;
|
||||
let lastRequest: MockOpenAiRequestSnapshot | null = null;
|
||||
const requests: MockOpenAiRequestSnapshot[] = [];
|
||||
const imageGenerationRequests: Array<Record<string, unknown>> = [];
|
||||
@@ -558,7 +684,7 @@ export async function startQaMockOpenAiServer(params?: { host?: string; port?: n
|
||||
const raw = await readBody(req);
|
||||
const body = raw ? (JSON.parse(raw) as Record<string, unknown>) : {};
|
||||
const input = Array.isArray(body.input) ? (body.input as ResponsesInputItem[]) : [];
|
||||
const events = buildResponsesPayload(body);
|
||||
const events = await buildResponsesPayload(body);
|
||||
lastRequest = {
|
||||
raw,
|
||||
body,
|
||||
|
||||
@@ -32,6 +32,7 @@ describe("buildQaGatewayConfig", () => {
|
||||
expect(cfg.plugins?.allow).toEqual(["memory-core", "qa-channel"]);
|
||||
expect(cfg.plugins?.entries?.["memory-core"]).toEqual({ enabled: true });
|
||||
expect(cfg.plugins?.entries?.openai).toBeUndefined();
|
||||
expect(cfg.gateway?.reload?.deferralTimeoutMs).toBe(1_000);
|
||||
});
|
||||
|
||||
it("uses built-in provider wiring in frontier live mode", () => {
|
||||
|
||||
@@ -249,6 +249,11 @@ export function buildQaGatewayConfig(params: {
|
||||
mode: "token",
|
||||
token: params.gatewayToken,
|
||||
},
|
||||
reload: {
|
||||
// QA restart scenarios need deterministic reload timing instead of the
|
||||
// much longer production deferral window.
|
||||
deferralTimeoutMs: 1_000,
|
||||
},
|
||||
controlUi: {
|
||||
enabled: params.controlUiEnabled ?? true,
|
||||
...((params.controlUiEnabled ?? true) && params.controlUiRoot
|
||||
|
||||
@@ -89,6 +89,15 @@ type QaDreamingStatus = {
|
||||
};
|
||||
};
|
||||
|
||||
type QaRawSessionStoreEntry = {
|
||||
sessionId?: string;
|
||||
status?: string;
|
||||
spawnedBy?: string;
|
||||
label?: string;
|
||||
abortedLastRun?: boolean;
|
||||
updatedAt?: number;
|
||||
};
|
||||
|
||||
function splitModelRef(ref: string) {
|
||||
const slash = ref.indexOf("/");
|
||||
if (slash <= 0 || slash === ref.length - 1) {
|
||||
@@ -295,6 +304,10 @@ function isGatewayRestartRace(error: unknown) {
|
||||
);
|
||||
}
|
||||
|
||||
function isConfigHashConflict(error: unknown) {
|
||||
return formatErrorMessage(error).includes("config changed since last load");
|
||||
}
|
||||
|
||||
async function readConfigSnapshot(env: QaSuiteEnvironment) {
|
||||
const snapshot = (await env.gateway.call(
|
||||
"config.get",
|
||||
@@ -310,6 +323,50 @@ async function readConfigSnapshot(env: QaSuiteEnvironment) {
|
||||
} satisfies { hash: string; config: Record<string, unknown> };
|
||||
}
|
||||
|
||||
async function runConfigMutation(params: {
|
||||
env: QaSuiteEnvironment;
|
||||
action: "config.patch" | "config.apply";
|
||||
raw: string;
|
||||
sessionKey?: string;
|
||||
note?: string;
|
||||
restartDelayMs?: number;
|
||||
}) {
|
||||
const restartDelayMs = params.restartDelayMs ?? 1_000;
|
||||
let lastConflict: unknown = null;
|
||||
for (let attempt = 1; attempt <= 3; attempt += 1) {
|
||||
const snapshot = await readConfigSnapshot(params.env);
|
||||
try {
|
||||
const result = await params.env.gateway.call(
|
||||
params.action,
|
||||
{
|
||||
raw: params.raw,
|
||||
baseHash: snapshot.hash,
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.note ? { note: params.note } : {}),
|
||||
restartDelayMs,
|
||||
},
|
||||
{ timeoutMs: 45_000 },
|
||||
);
|
||||
await waitForConfigRestartSettle(params.env, restartDelayMs);
|
||||
return result;
|
||||
} catch (error) {
|
||||
if (isConfigHashConflict(error)) {
|
||||
lastConflict = error;
|
||||
await waitForGatewayHealthy(params.env, Math.max(15_000, restartDelayMs + 10_000)).catch(
|
||||
() => undefined,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if (!isGatewayRestartRace(error)) {
|
||||
throw error;
|
||||
}
|
||||
await waitForConfigRestartSettle(params.env, restartDelayMs);
|
||||
return { ok: true, restarted: true };
|
||||
}
|
||||
}
|
||||
throw lastConflict ?? new Error(`${params.action} failed after retrying config hash conflicts`);
|
||||
}
|
||||
|
||||
async function patchConfig(params: {
|
||||
env: QaSuiteEnvironment;
|
||||
patch: Record<string, unknown>;
|
||||
@@ -317,29 +374,14 @@ async function patchConfig(params: {
|
||||
note?: string;
|
||||
restartDelayMs?: number;
|
||||
}) {
|
||||
const snapshot = await readConfigSnapshot(params.env);
|
||||
const restartDelayMs = params.restartDelayMs ?? 1_000;
|
||||
try {
|
||||
const result = await params.env.gateway.call(
|
||||
"config.patch",
|
||||
{
|
||||
raw: JSON.stringify(params.patch, null, 2),
|
||||
baseHash: snapshot.hash,
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.note ? { note: params.note } : {}),
|
||||
restartDelayMs,
|
||||
},
|
||||
{ timeoutMs: 45_000 },
|
||||
);
|
||||
await waitForConfigRestartSettle(params.env, restartDelayMs);
|
||||
return result;
|
||||
} catch (error) {
|
||||
if (!isGatewayRestartRace(error)) {
|
||||
throw error;
|
||||
}
|
||||
await waitForConfigRestartSettle(params.env, restartDelayMs);
|
||||
return { ok: true, restarted: true };
|
||||
}
|
||||
return await runConfigMutation({
|
||||
env: params.env,
|
||||
action: "config.patch",
|
||||
raw: JSON.stringify(params.patch, null, 2),
|
||||
sessionKey: params.sessionKey,
|
||||
note: params.note,
|
||||
restartDelayMs: params.restartDelayMs,
|
||||
});
|
||||
}
|
||||
|
||||
async function applyConfig(params: {
|
||||
@@ -349,29 +391,14 @@ async function applyConfig(params: {
|
||||
note?: string;
|
||||
restartDelayMs?: number;
|
||||
}) {
|
||||
const snapshot = await readConfigSnapshot(params.env);
|
||||
const restartDelayMs = params.restartDelayMs ?? 1_000;
|
||||
try {
|
||||
const result = await params.env.gateway.call(
|
||||
"config.apply",
|
||||
{
|
||||
raw: JSON.stringify(params.nextConfig, null, 2),
|
||||
baseHash: snapshot.hash,
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.note ? { note: params.note } : {}),
|
||||
restartDelayMs,
|
||||
},
|
||||
{ timeoutMs: 45_000 },
|
||||
);
|
||||
await waitForConfigRestartSettle(params.env, restartDelayMs);
|
||||
return result;
|
||||
} catch (error) {
|
||||
if (!isGatewayRestartRace(error)) {
|
||||
throw error;
|
||||
}
|
||||
await waitForConfigRestartSettle(params.env, restartDelayMs);
|
||||
return { ok: true, restarted: true };
|
||||
}
|
||||
return await runConfigMutation({
|
||||
env: params.env,
|
||||
action: "config.apply",
|
||||
raw: JSON.stringify(params.nextConfig, null, 2),
|
||||
sessionKey: params.sessionKey,
|
||||
note: params.note,
|
||||
restartDelayMs: params.restartDelayMs,
|
||||
});
|
||||
}
|
||||
|
||||
async function createSession(env: QaSuiteEnvironment, label: string, key?: string) {
|
||||
@@ -430,6 +457,26 @@ async function readSkillStatus(env: QaSuiteEnvironment, agentId = "qa") {
|
||||
return payload.skills ?? [];
|
||||
}
|
||||
|
||||
async function readRawQaSessionStore(env: QaSuiteEnvironment) {
|
||||
const storePath = path.join(
|
||||
env.gateway.tempRoot,
|
||||
"state",
|
||||
"agents",
|
||||
"qa",
|
||||
"sessions",
|
||||
"sessions.json",
|
||||
);
|
||||
try {
|
||||
const raw = await fs.readFile(storePath, "utf8");
|
||||
return JSON.parse(raw) as Record<string, QaRawSessionStoreEntry>;
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
|
||||
return {};
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function runQaCli(
|
||||
env: QaSuiteEnvironment,
|
||||
args: string[],
|
||||
@@ -474,6 +521,118 @@ async function runQaCli(
|
||||
return text ? (JSON.parse(text) as unknown) : {};
|
||||
}
|
||||
|
||||
function extractMediaPathFromText(text: string | undefined): string | undefined {
|
||||
return /MEDIA:([^\n]+)/.exec(text ?? "")?.[1]?.trim();
|
||||
}
|
||||
|
||||
async function resolveGeneratedImagePath(params: {
|
||||
env: QaSuiteEnvironment;
|
||||
promptSnippet: string;
|
||||
startedAtMs: number;
|
||||
timeoutMs: number;
|
||||
}) {
|
||||
return await waitForCondition(
|
||||
async () => {
|
||||
if (params.env.mock) {
|
||||
const requests = await fetchJson<Array<{ allInputText?: string; toolOutput?: string }>>(
|
||||
`${params.env.mock.baseUrl}/debug/requests`,
|
||||
);
|
||||
for (let index = requests.length - 1; index >= 0; index -= 1) {
|
||||
const request = requests[index];
|
||||
if (!String(request.allInputText ?? "").includes(params.promptSnippet)) {
|
||||
continue;
|
||||
}
|
||||
const mediaPath = extractMediaPathFromText(request.toolOutput);
|
||||
if (mediaPath) {
|
||||
return mediaPath;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const mediaDir = path.join(params.env.gateway.tempRoot, "media", "tool-image-generation");
|
||||
const entries = await fs.readdir(mediaDir).catch(() => []);
|
||||
const candidates = await Promise.all(
|
||||
entries.map(async (entry) => {
|
||||
const fullPath = path.join(mediaDir, entry);
|
||||
const stat = await fs.stat(fullPath).catch(() => null);
|
||||
if (!stat?.isFile()) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
fullPath,
|
||||
mtimeMs: stat.mtimeMs,
|
||||
};
|
||||
}),
|
||||
);
|
||||
return candidates
|
||||
.filter((entry): entry is NonNullable<typeof entry> => Boolean(entry))
|
||||
.filter((entry) => entry.mtimeMs >= params.startedAtMs - 1_000)
|
||||
.toSorted((left, right) => right.mtimeMs - left.mtimeMs)
|
||||
.at(0)?.fullPath;
|
||||
},
|
||||
params.timeoutMs,
|
||||
250,
|
||||
);
|
||||
}
|
||||
|
||||
async function startAgentRun(
|
||||
env: QaSuiteEnvironment,
|
||||
params: {
|
||||
sessionKey: string;
|
||||
message: string;
|
||||
to?: string;
|
||||
threadId?: string;
|
||||
provider?: string;
|
||||
model?: string;
|
||||
timeoutMs?: number;
|
||||
attachments?: Array<{
|
||||
mimeType: string;
|
||||
fileName: string;
|
||||
content: string;
|
||||
}>;
|
||||
},
|
||||
) {
|
||||
const target = params.to ?? "dm:qa-operator";
|
||||
const started = (await env.gateway.call(
|
||||
"agent",
|
||||
{
|
||||
idempotencyKey: randomUUID(),
|
||||
agentId: "qa",
|
||||
sessionKey: params.sessionKey,
|
||||
message: params.message,
|
||||
deliver: true,
|
||||
channel: "qa-channel",
|
||||
to: target,
|
||||
replyChannel: "qa-channel",
|
||||
replyTo: target,
|
||||
...(params.threadId ? { threadId: params.threadId } : {}),
|
||||
...(params.provider ? { provider: params.provider } : {}),
|
||||
...(params.model ? { model: params.model } : {}),
|
||||
...(params.attachments ? { attachments: params.attachments } : {}),
|
||||
},
|
||||
{
|
||||
timeoutMs: params.timeoutMs ?? 30_000,
|
||||
},
|
||||
)) as { runId?: string; status?: string };
|
||||
if (!started.runId) {
|
||||
throw new Error(`agent call did not return a runId: ${JSON.stringify(started)}`);
|
||||
}
|
||||
return started;
|
||||
}
|
||||
|
||||
async function waitForAgentRun(env: QaSuiteEnvironment, runId: string, timeoutMs = 30_000) {
|
||||
return (await env.gateway.call(
|
||||
"agent.wait",
|
||||
{
|
||||
runId,
|
||||
timeoutMs,
|
||||
},
|
||||
{
|
||||
timeoutMs: timeoutMs + 5_000,
|
||||
},
|
||||
)) as { status?: string; error?: string };
|
||||
}
|
||||
|
||||
async function listCronJobs(env: QaSuiteEnvironment) {
|
||||
const payload = (await env.gateway.call(
|
||||
"cron.list",
|
||||
@@ -511,14 +670,22 @@ async function forceMemoryIndex(params: {
|
||||
await runQaCli(params.env, ["memory", "index", "--agent", "qa", "--force"], {
|
||||
timeoutMs: liveTurnTimeoutMs(params.env, 60_000),
|
||||
});
|
||||
const payload = (await runQaCli(
|
||||
params.env,
|
||||
["memory", "search", "--agent", "qa", "--json", "--query", params.query],
|
||||
{
|
||||
timeoutMs: liveTurnTimeoutMs(params.env, 60_000),
|
||||
json: true,
|
||||
const payload = await waitForCondition(
|
||||
async () => {
|
||||
const result = (await runQaCli(
|
||||
params.env,
|
||||
["memory", "search", "--agent", "qa", "--json", "--query", params.query],
|
||||
{
|
||||
timeoutMs: liveTurnTimeoutMs(params.env, 60_000),
|
||||
json: true,
|
||||
},
|
||||
)) as { results?: Array<{ snippet?: string; text?: string; path?: string }> };
|
||||
const haystack = JSON.stringify(result.results ?? []);
|
||||
return haystack.includes(params.expectedNeedle) ? result : undefined;
|
||||
},
|
||||
)) as { results?: Array<{ snippet?: string; text?: string; path?: string }> };
|
||||
liveTurnTimeoutMs(params.env, 20_000),
|
||||
500,
|
||||
);
|
||||
const haystack = JSON.stringify(payload.results ?? []);
|
||||
if (!haystack.includes(params.expectedNeedle)) {
|
||||
throw new Error(`memory index missing expected fact after reindex: ${haystack}`);
|
||||
@@ -591,41 +758,8 @@ async function runAgentPrompt(
|
||||
}>;
|
||||
},
|
||||
) {
|
||||
const target = params.to ?? "dm:qa-operator";
|
||||
const started = (await env.gateway.call(
|
||||
"agent",
|
||||
{
|
||||
idempotencyKey: randomUUID(),
|
||||
agentId: "qa",
|
||||
sessionKey: params.sessionKey,
|
||||
message: params.message,
|
||||
deliver: true,
|
||||
channel: "qa-channel",
|
||||
to: target,
|
||||
replyChannel: "qa-channel",
|
||||
replyTo: target,
|
||||
...(params.threadId ? { threadId: params.threadId } : {}),
|
||||
...(params.provider ? { provider: params.provider } : {}),
|
||||
...(params.model ? { model: params.model } : {}),
|
||||
...(params.attachments ? { attachments: params.attachments } : {}),
|
||||
},
|
||||
{
|
||||
timeoutMs: params.timeoutMs ?? 30_000,
|
||||
},
|
||||
)) as { runId?: string; status?: string };
|
||||
if (!started.runId) {
|
||||
throw new Error(`agent call did not return a runId: ${JSON.stringify(started)}`);
|
||||
}
|
||||
const waited = (await env.gateway.call(
|
||||
"agent.wait",
|
||||
{
|
||||
runId: started.runId,
|
||||
timeoutMs: params.timeoutMs ?? 30_000,
|
||||
},
|
||||
{
|
||||
timeoutMs: (params.timeoutMs ?? 30_000) + 5_000,
|
||||
},
|
||||
)) as { status?: string; error?: string };
|
||||
const started = await startAgentRun(env, params);
|
||||
const waited = await waitForAgentRun(env, started.runId!, params.timeoutMs ?? 30_000);
|
||||
if (waited.status !== "ok") {
|
||||
throw new Error(
|
||||
`agent.wait returned ${String(waited.status ?? "unknown")}: ${waited.error ?? "no error"}`,
|
||||
@@ -637,6 +771,69 @@ async function runAgentPrompt(
|
||||
};
|
||||
}
|
||||
|
||||
async function ensureImageGenerationConfigured(env: QaSuiteEnvironment) {
|
||||
const imageModelRef = "openai/gpt-image-1";
|
||||
await patchConfig({
|
||||
env,
|
||||
patch:
|
||||
env.providerMode === "mock-openai"
|
||||
? {
|
||||
plugins: {
|
||||
allow: ["memory-core", "openai", "qa-channel"],
|
||||
entries: {
|
||||
openai: {
|
||||
enabled: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
models: {
|
||||
providers: {
|
||||
openai: {
|
||||
baseUrl: `${env.mock?.baseUrl}/v1`,
|
||||
apiKey: "test",
|
||||
api: "openai-responses",
|
||||
models: [
|
||||
{
|
||||
id: "gpt-image-1",
|
||||
name: "gpt-image-1",
|
||||
api: "openai-responses",
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
},
|
||||
contextWindow: 128_000,
|
||||
maxTokens: 4096,
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
agents: {
|
||||
defaults: {
|
||||
imageGenerationModel: {
|
||||
primary: imageModelRef,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
: {
|
||||
agents: {
|
||||
defaults: {
|
||||
imageGenerationModel: {
|
||||
primary: imageModelRef,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
await waitForGatewayHealthy(env);
|
||||
await waitForQaChannelReady(env, 60_000);
|
||||
}
|
||||
|
||||
type QaActionName = "delete" | "edit" | "react" | "thread-create";
|
||||
|
||||
async function handleQaAction(params: {
|
||||
@@ -1124,6 +1321,49 @@ function buildScenarioMap(env: QaSuiteEnvironment) {
|
||||
},
|
||||
]),
|
||||
],
|
||||
[
|
||||
"subagent-fanout-synthesis",
|
||||
async () =>
|
||||
await runScenario("Subagent fanout synthesis", [
|
||||
{
|
||||
name: "spawns sequential workers and folds both results back into the parent reply",
|
||||
run: async () => {
|
||||
await waitForGatewayHealthy(env, 60_000);
|
||||
await waitForQaChannelReady(env, 60_000);
|
||||
await reset();
|
||||
state.addInboundMessage({
|
||||
conversation: { id: "qa-operator", kind: "direct", title: "QA Operator" },
|
||||
senderId: "qa-operator",
|
||||
senderName: "QA Operator",
|
||||
text: "Subagent fanout synthesis check: delegate two bounded subagents sequentially, then report both results together. Do not use ACP.",
|
||||
});
|
||||
const outbound = await waitForOutboundMessage(
|
||||
state,
|
||||
(message) => {
|
||||
const text = message.text ?? "";
|
||||
return text.includes("ALPHA-OK") && text.includes("BETA-OK");
|
||||
},
|
||||
liveTurnTimeoutMs(env, 60_000),
|
||||
);
|
||||
if (!env.mock) {
|
||||
return outbound.text;
|
||||
}
|
||||
const store = await readRawQaSessionStore(env);
|
||||
const childRows = Object.values(store).filter(
|
||||
(entry) => entry.spawnedBy === "agent:qa:main",
|
||||
);
|
||||
const sawAlpha = childRows.some((entry) => entry.label === "qa-fanout-alpha");
|
||||
const sawBeta = childRows.some((entry) => entry.label === "qa-fanout-beta");
|
||||
if (!sawAlpha || !sawBeta) {
|
||||
throw new Error(
|
||||
`fanout child sessions missing (alpha=${String(sawAlpha)} beta=${String(sawBeta)})`,
|
||||
);
|
||||
}
|
||||
return outbound.text;
|
||||
},
|
||||
},
|
||||
]),
|
||||
],
|
||||
[
|
||||
"thread-follow-up",
|
||||
async () =>
|
||||
@@ -1566,6 +1806,12 @@ function buildScenarioMap(env: QaSuiteEnvironment) {
|
||||
if (tools.has("memory_search") || tools.has("memory_get")) {
|
||||
throw new Error("memory tools still present after deny patch");
|
||||
}
|
||||
await runQaCli(env, ["memory", "index", "--agent", "qa", "--force"], {
|
||||
timeoutMs: liveTurnTimeoutMs(env, 60_000),
|
||||
});
|
||||
await env.gateway.restart();
|
||||
await waitForGatewayHealthy(env, 60_000);
|
||||
await waitForQaChannelReady(env, 60_000);
|
||||
await reset();
|
||||
await runAgentPrompt(env, {
|
||||
sessionKey: "agent:qa:memory-failure",
|
||||
@@ -1602,6 +1848,232 @@ function buildScenarioMap(env: QaSuiteEnvironment) {
|
||||
},
|
||||
]),
|
||||
],
|
||||
[
|
||||
"session-memory-ranking",
|
||||
async () =>
|
||||
await runScenario("Session memory ranking", [
|
||||
{
|
||||
name: "prefers the newer transcript-backed fact over the stale durable note",
|
||||
run: async () => {
|
||||
const original = await readConfigSnapshot(env);
|
||||
const originalMemorySearch =
|
||||
original.config.agents &&
|
||||
typeof original.config.agents === "object" &&
|
||||
typeof (original.config.agents as Record<string, unknown>).defaults === "object"
|
||||
? (
|
||||
(original.config.agents as Record<string, unknown>).defaults as Record<
|
||||
string,
|
||||
unknown
|
||||
>
|
||||
).memorySearch
|
||||
: undefined;
|
||||
await patchConfig({
|
||||
env,
|
||||
patch: {
|
||||
agents: {
|
||||
defaults: {
|
||||
memorySearch: {
|
||||
sources: ["memory", "sessions"],
|
||||
experimental: {
|
||||
sessionMemory: true,
|
||||
},
|
||||
query: {
|
||||
minScore: 0,
|
||||
hybrid: {
|
||||
enabled: true,
|
||||
temporalDecay: {
|
||||
enabled: true,
|
||||
halfLifeDays: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
await waitForGatewayHealthy(env);
|
||||
await waitForQaChannelReady(env, 60_000);
|
||||
try {
|
||||
const memoryPath = path.join(env.gateway.workspaceDir, "MEMORY.md");
|
||||
await fs.writeFile(memoryPath, "Project Nebula stale codename: ORBIT-9.\n", "utf8");
|
||||
const staleAt = new Date("2020-01-01T00:00:00.000Z");
|
||||
await fs.utimes(memoryPath, staleAt, staleAt);
|
||||
const transcriptsDir = resolveSessionTranscriptsDirForAgent(
|
||||
"qa",
|
||||
env.gateway.runtimeEnv,
|
||||
() => env.gateway.runtimeEnv.HOME ?? path.join(env.gateway.tempRoot, "home"),
|
||||
);
|
||||
await fs.mkdir(transcriptsDir, { recursive: true });
|
||||
const transcriptPath = path.join(transcriptsDir, "qa-session-memory-ranking.jsonl");
|
||||
const now = Date.now();
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "session",
|
||||
id: "qa-session-memory-ranking",
|
||||
timestamp: new Date(now - 120_000).toISOString(),
|
||||
}),
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
timestamp: new Date(now - 90_000).toISOString(),
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: "What is the current Project Nebula codename?",
|
||||
},
|
||||
],
|
||||
},
|
||||
}),
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "assistant",
|
||||
timestamp: new Date(now - 60_000).toISOString(),
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: "The current Project Nebula codename is ORBIT-10.",
|
||||
},
|
||||
],
|
||||
},
|
||||
}),
|
||||
].join("\n") + "\n",
|
||||
"utf8",
|
||||
);
|
||||
await forceMemoryIndex({
|
||||
env,
|
||||
query: "current Project Nebula codename ORBIT-10",
|
||||
expectedNeedle: "ORBIT-10",
|
||||
});
|
||||
await reset();
|
||||
await runAgentPrompt(env, {
|
||||
sessionKey: "agent:qa:session-memory-ranking",
|
||||
message:
|
||||
"Session memory ranking check: what is the current Project Nebula codename? Use memory tools first.",
|
||||
timeoutMs: liveTurnTimeoutMs(env, 45_000),
|
||||
});
|
||||
const outbound = await waitForOutboundMessage(
|
||||
state,
|
||||
(candidate) =>
|
||||
candidate.conversation.id === "qa-operator" &&
|
||||
candidate.text.includes("ORBIT-10"),
|
||||
liveTurnTimeoutMs(env, 45_000),
|
||||
);
|
||||
if (outbound.text.includes("ORBIT-9")) {
|
||||
throw new Error(`stale durable fact leaked through: ${outbound.text}`);
|
||||
}
|
||||
if (env.mock) {
|
||||
const requests = await fetchJson<
|
||||
Array<{ allInputText?: string; plannedToolName?: string }>
|
||||
>(`${env.mock.baseUrl}/debug/requests`);
|
||||
const relevant = requests.filter((request) =>
|
||||
String(request.allInputText ?? "").includes("Session memory ranking check"),
|
||||
);
|
||||
if (!relevant.some((request) => request.plannedToolName === "memory_search")) {
|
||||
throw new Error("expected memory_search in session memory ranking flow");
|
||||
}
|
||||
}
|
||||
return outbound.text;
|
||||
} finally {
|
||||
await patchConfig({
|
||||
env,
|
||||
patch: {
|
||||
agents: {
|
||||
defaults: {
|
||||
memorySearch:
|
||||
originalMemorySearch === undefined
|
||||
? null
|
||||
: structuredClone(originalMemorySearch),
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
await waitForGatewayHealthy(env);
|
||||
await waitForQaChannelReady(env, 60_000);
|
||||
}
|
||||
},
|
||||
},
|
||||
]),
|
||||
],
|
||||
[
|
||||
"thread-memory-isolation",
|
||||
async () =>
|
||||
await runScenario("Thread memory isolation", [
|
||||
{
|
||||
name: "answers the memory-backed fact inside the thread only",
|
||||
run: async () => {
|
||||
await reset();
|
||||
await fs.writeFile(
|
||||
path.join(env.gateway.workspaceDir, "MEMORY.md"),
|
||||
"Thread-hidden codename: ORBIT-22.\n",
|
||||
"utf8",
|
||||
);
|
||||
await forceMemoryIndex({
|
||||
env,
|
||||
query: "hidden thread codename ORBIT-22",
|
||||
expectedNeedle: "ORBIT-22",
|
||||
});
|
||||
const threadPayload = (await handleQaAction({
|
||||
env,
|
||||
action: "thread-create",
|
||||
args: {
|
||||
channelId: "qa-room",
|
||||
title: "Thread memory QA",
|
||||
},
|
||||
})) as { thread?: { id?: string } } | undefined;
|
||||
const threadId = threadPayload?.thread?.id;
|
||||
if (!threadId) {
|
||||
throw new Error("missing thread id for memory isolation check");
|
||||
}
|
||||
const beforeCursor = state.getSnapshot().messages.length;
|
||||
state.addInboundMessage({
|
||||
conversation: { id: "qa-room", kind: "channel", title: "QA Room" },
|
||||
senderId: "alice",
|
||||
senderName: "Alice",
|
||||
text: "@openclaw Thread memory check: what is the hidden thread codename stored only in memory? Use memory tools first and reply only in this thread.",
|
||||
threadId,
|
||||
threadTitle: "Thread memory QA",
|
||||
});
|
||||
const outbound = await waitForOutboundMessage(
|
||||
state,
|
||||
(candidate) =>
|
||||
candidate.conversation.id === "qa-room" &&
|
||||
candidate.threadId === threadId &&
|
||||
candidate.text.includes("ORBIT-22"),
|
||||
liveTurnTimeoutMs(env, 45_000),
|
||||
);
|
||||
const leaked = state
|
||||
.getSnapshot()
|
||||
.messages.slice(beforeCursor)
|
||||
.some(
|
||||
(candidate) =>
|
||||
candidate.direction === "outbound" &&
|
||||
candidate.conversation.id === "qa-room" &&
|
||||
!candidate.threadId,
|
||||
);
|
||||
if (leaked) {
|
||||
throw new Error("threaded memory answer leaked into root channel");
|
||||
}
|
||||
if (env.mock) {
|
||||
const requests = await fetchJson<
|
||||
Array<{ allInputText?: string; plannedToolName?: string }>
|
||||
>(`${env.mock.baseUrl}/debug/requests`);
|
||||
const relevant = requests.filter((request) =>
|
||||
String(request.allInputText ?? "").includes("Thread memory check"),
|
||||
);
|
||||
if (!relevant.some((request) => request.plannedToolName === "memory_search")) {
|
||||
throw new Error("expected memory_search in thread memory flow");
|
||||
}
|
||||
}
|
||||
return outbound.text;
|
||||
},
|
||||
},
|
||||
]),
|
||||
],
|
||||
[
|
||||
"model-switch-tool-continuity",
|
||||
async () =>
|
||||
@@ -1790,65 +2262,7 @@ When the user asks for the hot install marker exactly, reply with exactly: HOT-I
|
||||
{
|
||||
name: "enables image_generate and saves a real media artifact",
|
||||
run: async () => {
|
||||
const imageModelRef = "openai/gpt-image-1";
|
||||
await patchConfig({
|
||||
env,
|
||||
patch:
|
||||
env.providerMode === "mock-openai"
|
||||
? {
|
||||
plugins: {
|
||||
allow: ["memory-core", "openai", "qa-channel"],
|
||||
entries: {
|
||||
openai: {
|
||||
enabled: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
models: {
|
||||
providers: {
|
||||
openai: {
|
||||
baseUrl: `${env.mock?.baseUrl}/v1`,
|
||||
apiKey: "test",
|
||||
api: "openai-responses",
|
||||
models: [
|
||||
{
|
||||
id: "gpt-image-1",
|
||||
name: "gpt-image-1",
|
||||
api: "openai-responses",
|
||||
reasoning: false,
|
||||
input: ["text"],
|
||||
cost: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
},
|
||||
contextWindow: 128_000,
|
||||
maxTokens: 4096,
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
agents: {
|
||||
defaults: {
|
||||
imageGenerationModel: {
|
||||
primary: "openai/gpt-image-1",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
: {
|
||||
agents: {
|
||||
defaults: {
|
||||
imageGenerationModel: {
|
||||
primary: imageModelRef,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
await waitForGatewayHealthy(env);
|
||||
await ensureImageGenerationConfigured(env);
|
||||
const sessionKey = await createSession(env, "Image generation");
|
||||
const tools = await readEffectiveTools(env, sessionKey);
|
||||
if (!tools.has("image_generate")) {
|
||||
@@ -1904,6 +2318,81 @@ When the user asks for the hot install marker exactly, reply with exactly: HOT-I
|
||||
},
|
||||
]),
|
||||
],
|
||||
[
|
||||
"image-generation-roundtrip",
|
||||
async () =>
|
||||
await runScenario("Image generation roundtrip", [
|
||||
{
|
||||
name: "reattaches the generated media artifact on the follow-up turn",
|
||||
run: async () => {
|
||||
await ensureImageGenerationConfigured(env);
|
||||
const sessionKey = "agent:qa:image-roundtrip";
|
||||
await createSession(env, "Image roundtrip", sessionKey);
|
||||
await reset();
|
||||
const generatedStartedAtMs = Date.now();
|
||||
await runAgentPrompt(env, {
|
||||
sessionKey,
|
||||
message:
|
||||
"Image generation check: generate a QA lighthouse image and summarize it in one short sentence.",
|
||||
timeoutMs: liveTurnTimeoutMs(env, 45_000),
|
||||
});
|
||||
const mediaPath = await resolveGeneratedImagePath({
|
||||
env,
|
||||
promptSnippet: "Image generation check",
|
||||
startedAtMs: generatedStartedAtMs,
|
||||
timeoutMs: liveTurnTimeoutMs(env, 45_000),
|
||||
});
|
||||
const imageBuffer = await fs.readFile(mediaPath);
|
||||
await runAgentPrompt(env, {
|
||||
sessionKey,
|
||||
message:
|
||||
"Roundtrip image inspection check: describe the generated lighthouse attachment in one short sentence.",
|
||||
attachments: [
|
||||
{
|
||||
mimeType: "image/png",
|
||||
fileName: path.basename(mediaPath),
|
||||
content: imageBuffer.toString("base64"),
|
||||
},
|
||||
],
|
||||
timeoutMs: liveTurnTimeoutMs(env, 45_000),
|
||||
});
|
||||
const outbound = await waitForCondition(
|
||||
() =>
|
||||
state
|
||||
.getSnapshot()
|
||||
.messages.filter(
|
||||
(candidate) =>
|
||||
candidate.direction === "outbound" &&
|
||||
candidate.conversation.id === "qa-operator" &&
|
||||
candidate.text.toLowerCase().includes("lighthouse"),
|
||||
)
|
||||
.at(-1),
|
||||
liveTurnTimeoutMs(env, 45_000),
|
||||
);
|
||||
if (env.mock) {
|
||||
const requests = await fetchJson<
|
||||
Array<{ prompt?: string; imageInputCount?: number; plannedToolName?: string }>
|
||||
>(`${env.mock.baseUrl}/debug/requests`);
|
||||
const generatedCall = requests.find(
|
||||
(request) =>
|
||||
request.plannedToolName === "image_generate" &&
|
||||
String(request.prompt ?? "").includes("Image generation check"),
|
||||
);
|
||||
const inspectionCall = requests.find((request) =>
|
||||
String(request.prompt ?? "").includes("Roundtrip image inspection check"),
|
||||
);
|
||||
if (!generatedCall) {
|
||||
throw new Error("expected image_generate call before roundtrip inspection");
|
||||
}
|
||||
if ((inspectionCall?.imageInputCount ?? 0) < 1) {
|
||||
throw new Error("expected generated artifact to be reattached on follow-up turn");
|
||||
}
|
||||
}
|
||||
return `MEDIA:${mediaPath}\n${outbound.text}`;
|
||||
},
|
||||
},
|
||||
]),
|
||||
],
|
||||
[
|
||||
"image-understanding-attachment",
|
||||
async () =>
|
||||
@@ -2101,6 +2590,111 @@ When the user asks for the hot disable marker exactly, reply with exactly: HOT-P
|
||||
},
|
||||
]),
|
||||
],
|
||||
[
|
||||
"config-restart-capability-flip",
|
||||
async () =>
|
||||
await runScenario("Config restart capability flip", [
|
||||
{
|
||||
name: "restores image_generate after restart and uses it in the same session",
|
||||
run: async () => {
|
||||
await ensureImageGenerationConfigured(env);
|
||||
const original = await readConfigSnapshot(env);
|
||||
const originalTools =
|
||||
original.config.tools && typeof original.config.tools === "object"
|
||||
? (original.config.tools as Record<string, unknown>)
|
||||
: null;
|
||||
const originalToolsDeny = originalTools
|
||||
? Object.prototype.hasOwnProperty.call(originalTools, "deny")
|
||||
? structuredClone(originalTools.deny)
|
||||
: undefined
|
||||
: undefined;
|
||||
const denied = Array.isArray(originalToolsDeny)
|
||||
? originalToolsDeny.map((entry) => String(entry))
|
||||
: [];
|
||||
const deniedWithImage = denied.includes("image_generate")
|
||||
? denied
|
||||
: [...denied, "image_generate"];
|
||||
const sessionKey = "agent:qa:capability-flip";
|
||||
await createSession(env, "Capability flip", sessionKey);
|
||||
try {
|
||||
await patchConfig({
|
||||
env,
|
||||
patch: {
|
||||
tools: {
|
||||
deny: deniedWithImage,
|
||||
},
|
||||
},
|
||||
});
|
||||
await waitForGatewayHealthy(env);
|
||||
await waitForQaChannelReady(env, 60_000);
|
||||
await runAgentPrompt(env, {
|
||||
sessionKey,
|
||||
message:
|
||||
"Capability flip setup: acknowledge this setup so restart wake-up has a route.",
|
||||
timeoutMs: liveTurnTimeoutMs(env, 30_000),
|
||||
});
|
||||
const beforeTools = await readEffectiveTools(env, sessionKey);
|
||||
if (beforeTools.has("image_generate")) {
|
||||
throw new Error("image_generate still present before capability flip");
|
||||
}
|
||||
const wakeMarker = `QA-CAPABILITY-${randomUUID().slice(0, 8)}`;
|
||||
await patchConfig({
|
||||
env,
|
||||
patch: {
|
||||
tools: {
|
||||
deny: originalToolsDeny === undefined ? null : originalToolsDeny,
|
||||
},
|
||||
agents: {
|
||||
defaults: {
|
||||
imageGenerationModel: {
|
||||
primary: "openai/gpt-image-1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
sessionKey,
|
||||
note: wakeMarker,
|
||||
});
|
||||
await waitForGatewayHealthy(env, 60_000);
|
||||
await waitForQaChannelReady(env, 60_000);
|
||||
const afterTools = await waitForCondition(
|
||||
async () => {
|
||||
const tools = await readEffectiveTools(env, sessionKey);
|
||||
return tools.has("image_generate") ? tools : undefined;
|
||||
},
|
||||
liveTurnTimeoutMs(env, 45_000),
|
||||
500,
|
||||
);
|
||||
const imageStartedAtMs = Date.now();
|
||||
await runAgentPrompt(env, {
|
||||
sessionKey,
|
||||
message:
|
||||
"Capability flip image check: generate a QA lighthouse image now and keep the media path in the reply.",
|
||||
timeoutMs: liveTurnTimeoutMs(env, 45_000),
|
||||
});
|
||||
const mediaPath = await resolveGeneratedImagePath({
|
||||
env,
|
||||
promptSnippet: "Capability flip image check",
|
||||
startedAtMs: imageStartedAtMs,
|
||||
timeoutMs: liveTurnTimeoutMs(env, 45_000),
|
||||
});
|
||||
return `${wakeMarker}\nimage_generate=${String(afterTools.has("image_generate"))}\nMEDIA:${mediaPath}`;
|
||||
} finally {
|
||||
await patchConfig({
|
||||
env,
|
||||
patch: {
|
||||
tools: {
|
||||
deny: originalToolsDeny === undefined ? null : originalToolsDeny,
|
||||
},
|
||||
},
|
||||
});
|
||||
await waitForGatewayHealthy(env);
|
||||
await waitForQaChannelReady(env, 60_000);
|
||||
}
|
||||
},
|
||||
},
|
||||
]),
|
||||
],
|
||||
[
|
||||
"runtime-inventory-drift-check",
|
||||
async () =>
|
||||
|
||||
@@ -162,6 +162,23 @@
|
||||
"docsRefs": ["docs/tools/subagents.md", "docs/help/testing.md"],
|
||||
"codeRefs": ["src/agents/system-prompt.ts", "extensions/qa-lab/src/report.ts"]
|
||||
},
|
||||
{
|
||||
"id": "subagent-fanout-synthesis",
|
||||
"title": "Subagent fanout synthesis",
|
||||
"surface": "subagents",
|
||||
"objective": "Verify the agent can delegate multiple bounded subagent tasks and fold both results back into one parent reply.",
|
||||
"successCriteria": [
|
||||
"Parent flow launches at least two bounded subagent tasks.",
|
||||
"Both delegated results are acknowledged in the main flow.",
|
||||
"Final answer synthesizes both worker outputs in one reply."
|
||||
],
|
||||
"docsRefs": ["docs/tools/subagents.md", "docs/help/testing.md"],
|
||||
"codeRefs": [
|
||||
"src/agents/subagent-spawn.ts",
|
||||
"src/agents/system-prompt.ts",
|
||||
"extensions/qa-lab/src/suite.ts"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "thread-follow-up",
|
||||
"title": "Threaded follow-up",
|
||||
@@ -201,6 +218,44 @@
|
||||
"docsRefs": ["docs/concepts/memory.md", "docs/tools/index.md"],
|
||||
"codeRefs": ["extensions/memory-core/src/tools.ts", "extensions/qa-lab/src/suite.ts"]
|
||||
},
|
||||
{
|
||||
"id": "session-memory-ranking",
|
||||
"title": "Session memory ranking",
|
||||
"surface": "memory",
|
||||
"objective": "Verify session-transcript memory can outrank stale durable notes and drive the final answer toward the newer fact.",
|
||||
"successCriteria": [
|
||||
"Session memory indexing is enabled for the scenario.",
|
||||
"Search ranks the newer transcript-backed fact ahead of the stale durable note.",
|
||||
"The agent uses memory tools and answers with the current fact, not the stale one."
|
||||
],
|
||||
"docsRefs": ["docs/concepts/memory-search.md", "docs/reference/memory-config.md"],
|
||||
"codeRefs": [
|
||||
"extensions/memory-core/src/tools.ts",
|
||||
"extensions/memory-core/src/memory/manager.ts",
|
||||
"extensions/qa-lab/src/suite.ts"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "thread-memory-isolation",
|
||||
"title": "Thread memory isolation",
|
||||
"surface": "memory",
|
||||
"objective": "Verify a memory-backed answer requested inside a thread stays in-thread and does not leak into the root channel.",
|
||||
"successCriteria": [
|
||||
"Agent uses memory tools inside the thread.",
|
||||
"The hidden fact is answered correctly in the thread.",
|
||||
"No root-channel outbound message leaks during the threaded memory reply."
|
||||
],
|
||||
"docsRefs": [
|
||||
"docs/concepts/memory-search.md",
|
||||
"docs/channels/qa-channel.md",
|
||||
"docs/channels/group-messages.md"
|
||||
],
|
||||
"codeRefs": [
|
||||
"extensions/memory-core/src/tools.ts",
|
||||
"extensions/qa-channel/src/protocol.ts",
|
||||
"extensions/qa-lab/src/suite.ts"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "model-switch-tool-continuity",
|
||||
"title": "Model switch with tool continuity",
|
||||
@@ -286,6 +341,23 @@
|
||||
"extensions/qa-lab/src/mock-openai-server.ts"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "image-generation-roundtrip",
|
||||
"title": "Image generation roundtrip",
|
||||
"surface": "image-generation",
|
||||
"objective": "Verify a generated image is saved as media, reattached on the next turn, and described correctly through the vision path.",
|
||||
"successCriteria": [
|
||||
"image_generate produces a saved MEDIA artifact.",
|
||||
"The generated artifact is reattached on a follow-up turn.",
|
||||
"The follow-up vision answer describes the generated scene rather than a generic attachment placeholder."
|
||||
],
|
||||
"docsRefs": ["docs/tools/image-generation.md", "docs/help/testing.md"],
|
||||
"codeRefs": [
|
||||
"src/agents/tools/image-generate-tool.ts",
|
||||
"src/gateway/chat-attachments.ts",
|
||||
"extensions/qa-lab/src/mock-openai-server.ts"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "config-patch-hot-apply",
|
||||
"title": "Config patch skill disable",
|
||||
@@ -312,6 +384,28 @@
|
||||
"docsRefs": ["docs/gateway/configuration.md", "docs/gateway/protocol.md"],
|
||||
"codeRefs": ["src/gateway/server-methods/config.ts", "src/gateway/server-restart-sentinel.ts"]
|
||||
},
|
||||
{
|
||||
"id": "config-restart-capability-flip",
|
||||
"title": "Config restart capability flip",
|
||||
"surface": "config",
|
||||
"objective": "Verify a restart-triggering config change flips capability inventory and the same session successfully uses the newly restored tool after wake-up.",
|
||||
"successCriteria": [
|
||||
"Capability is absent before the restart-triggering patch.",
|
||||
"Restart sentinel wakes the same session back up after config patch.",
|
||||
"The restored capability appears in tools.effective and works in the follow-up turn."
|
||||
],
|
||||
"docsRefs": [
|
||||
"docs/gateway/configuration.md",
|
||||
"docs/gateway/protocol.md",
|
||||
"docs/tools/image-generation.md"
|
||||
],
|
||||
"codeRefs": [
|
||||
"src/gateway/server-methods/config.ts",
|
||||
"src/gateway/server-restart-sentinel.ts",
|
||||
"src/gateway/server-methods/tools-effective.ts",
|
||||
"extensions/qa-lab/src/suite.ts"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "runtime-inventory-drift-check",
|
||||
"title": "Runtime inventory drift check",
|
||||
|
||||
@@ -136,9 +136,38 @@ describe("subagent-orphan-recovery", () => {
|
||||
});
|
||||
|
||||
expect(result.recovered).toBe(0);
|
||||
expect(result.skipped).toBe(1);
|
||||
expect(gateway.callGateway).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("recovers restart-aborted timeout runs even when the registry marked them ended", async () => {
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:test-session-1": {
|
||||
sessionId: "session-abc",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: true,
|
||||
},
|
||||
});
|
||||
|
||||
const activeRuns = createActiveRuns(
|
||||
createTestRunRecord({
|
||||
endedAt: Date.now() - 1_000,
|
||||
outcome: {
|
||||
status: "timeout",
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
const result = await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => activeRuns,
|
||||
});
|
||||
|
||||
expect(result.recovered).toBe(1);
|
||||
expect(result.failed).toBe(0);
|
||||
expect(result.skipped).toBe(0);
|
||||
expect(gateway.callGateway).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it("handles multiple orphaned sessions", async () => {
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:session-a": {
|
||||
|
||||
@@ -29,6 +29,18 @@ const log = createSubsystemLogger("subagent-orphan-recovery");
|
||||
/** Delay before attempting recovery to let the gateway finish bootstrapping. */
|
||||
const DEFAULT_RECOVERY_DELAY_MS = 5_000;
|
||||
|
||||
function isRestartAbortedTimeoutRun(
|
||||
runRecord: SubagentRunRecord,
|
||||
entry: SessionEntry | undefined,
|
||||
): boolean {
|
||||
return (
|
||||
entry?.abortedLastRun === true &&
|
||||
runRecord.outcome?.status === "timeout" &&
|
||||
typeof runRecord.endedAt === "number" &&
|
||||
runRecord.endedAt > 0
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the resume message for an orphaned subagent.
|
||||
*/
|
||||
@@ -150,11 +162,6 @@ export async function recoverOrphanedSubagentSessions(params: {
|
||||
const storeCache = new Map<string, Record<string, SessionEntry>>();
|
||||
|
||||
for (const [runId, runRecord] of activeRuns.entries()) {
|
||||
// Only consider runs that haven't ended yet
|
||||
if (typeof runRecord.endedAt === "number" && runRecord.endedAt > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const childSessionKey = runRecord.childSessionKey?.trim();
|
||||
if (!childSessionKey) {
|
||||
continue;
|
||||
@@ -180,6 +187,17 @@ export async function recoverOrphanedSubagentSessions(params: {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Restart-aborted subagents can be marked ended with a timeout outcome
|
||||
// before the gateway comes back up to resume them.
|
||||
if (
|
||||
typeof runRecord.endedAt === "number" &&
|
||||
runRecord.endedAt > 0 &&
|
||||
!isRestartAbortedTimeoutRun(runRecord, entry)
|
||||
) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if this session was aborted by the restart
|
||||
if (!entry.abortedLastRun) {
|
||||
result.skipped++;
|
||||
|
||||
@@ -99,6 +99,8 @@ let listenerStarted = false;
|
||||
let listenerStop: (() => void) | null = null;
|
||||
// Use var to avoid TDZ when init runs across circular imports during bootstrap.
|
||||
var restoreAttempted = false;
|
||||
const ORPHAN_RECOVERY_DEBOUNCE_MS = 1_000;
|
||||
let lastOrphanRecoveryScheduleAt = 0;
|
||||
const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000;
|
||||
/**
|
||||
* Embedded runs can emit transient lifecycle `error` events while provider/model
|
||||
@@ -140,6 +142,26 @@ function persistSubagentRuns() {
|
||||
subagentRegistryDeps.persistSubagentRunsToDisk(subagentRuns);
|
||||
}
|
||||
|
||||
export function scheduleSubagentOrphanRecovery(params?: { delayMs?: number; maxRetries?: number }) {
|
||||
const now = Date.now();
|
||||
if (now - lastOrphanRecoveryScheduleAt < ORPHAN_RECOVERY_DEBOUNCE_MS) {
|
||||
return;
|
||||
}
|
||||
lastOrphanRecoveryScheduleAt = now;
|
||||
void import("./subagent-orphan-recovery.js").then(
|
||||
({ scheduleOrphanRecovery }) => {
|
||||
scheduleOrphanRecovery({
|
||||
getActiveRuns: () => subagentRuns,
|
||||
delayMs: params?.delayMs,
|
||||
maxRetries: params?.maxRetries,
|
||||
});
|
||||
},
|
||||
() => {
|
||||
// Ignore import failures — orphan recovery is best-effort.
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
const resumedRuns = new Set<string>();
|
||||
const endedHookInFlightRunIds = new Set<string>();
|
||||
const pendingLifecycleErrorByRunId = new Map<
|
||||
@@ -417,18 +439,9 @@ function restoreSubagentRunsOnce() {
|
||||
resumeSubagentRun(runId);
|
||||
}
|
||||
|
||||
// Schedule orphan recovery for subagent sessions that were aborted
|
||||
// by a SIGUSR1 reload. This runs after a short delay to let the
|
||||
// gateway fully bootstrap first. Dynamic import to avoid increasing
|
||||
// startup memory footprint. (#47711)
|
||||
void import("./subagent-orphan-recovery.js").then(
|
||||
({ scheduleOrphanRecovery }) => {
|
||||
scheduleOrphanRecovery({ getActiveRuns: () => subagentRuns });
|
||||
},
|
||||
() => {
|
||||
// Ignore import failures — orphan recovery is best-effort.
|
||||
},
|
||||
);
|
||||
// Cold-start restore path: queue the same recovery pass that restart
|
||||
// startup also uses so resumed children are handled through one seam.
|
||||
scheduleSubagentOrphanRecovery();
|
||||
} catch {
|
||||
// ignore restore failures
|
||||
}
|
||||
|
||||
@@ -30,6 +30,13 @@ const abortEmbeddedPiRun = vi.fn(
|
||||
const getActiveEmbeddedRunCount = vi.fn(() => 0);
|
||||
const waitForActiveEmbeddedRuns = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
|
||||
const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart";
|
||||
const loadConfig = vi.fn(() => ({
|
||||
gateway: {
|
||||
reload: {
|
||||
deferralTimeoutMs: 90_000,
|
||||
},
|
||||
},
|
||||
}));
|
||||
const gatewayLog = {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
@@ -66,6 +73,10 @@ vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({
|
||||
waitForActiveEmbeddedRuns: (timeoutMs: number) => waitForActiveEmbeddedRuns(timeoutMs),
|
||||
}));
|
||||
|
||||
vi.mock("../../config/config.js", () => ({
|
||||
loadConfig: () => loadConfig(),
|
||||
}));
|
||||
|
||||
vi.mock("../../logging/subsystem.js", () => ({
|
||||
createSubsystemLogger: () => gatewayLog,
|
||||
}));
|
||||
@@ -217,6 +228,13 @@ describe("runGatewayLoop", () => {
|
||||
|
||||
it("restarts after SIGUSR1 even when drain times out, and resets lanes for the new iteration", async () => {
|
||||
vi.clearAllMocks();
|
||||
loadConfig.mockReturnValue({
|
||||
gateway: {
|
||||
reload: {
|
||||
deferralTimeoutMs: 1_234,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await withIsolatedSignals(async ({ captureSignal }) => {
|
||||
getActiveTaskCount.mockReturnValueOnce(2).mockReturnValueOnce(0);
|
||||
@@ -280,8 +298,8 @@ describe("runGatewayLoop", () => {
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "compacting" });
|
||||
expect(waitForActiveTasks).toHaveBeenCalledWith(90_000);
|
||||
expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(90_000);
|
||||
expect(waitForActiveTasks).toHaveBeenCalledWith(1_234);
|
||||
expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(1_234);
|
||||
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" });
|
||||
expect(markGatewayDraining).toHaveBeenCalledTimes(1);
|
||||
expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG);
|
||||
|
||||
@@ -3,6 +3,7 @@ import {
|
||||
getActiveEmbeddedRunCount,
|
||||
waitForActiveEmbeddedRuns,
|
||||
} from "../../agents/pi-embedded-runner/runs.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
import type { startGatewayServer } from "../../gateway/server.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { acquireGatewayLock } from "../../infra/gateway-lock.js";
|
||||
@@ -26,6 +27,7 @@ import type { RuntimeEnv } from "../../runtime.js";
|
||||
|
||||
const gatewayLog = createSubsystemLogger("gateway");
|
||||
const LAUNCHD_SUPERVISED_RESTART_EXIT_DELAY_MS = 1500;
|
||||
const DEFAULT_RESTART_DRAIN_TIMEOUT_MS = 300_000;
|
||||
|
||||
type GatewayRunSignalAction = "stop" | "restart";
|
||||
|
||||
@@ -113,9 +115,18 @@ export async function runGatewayLoop(params: {
|
||||
exitProcess(0);
|
||||
};
|
||||
|
||||
const DRAIN_TIMEOUT_MS = 90_000;
|
||||
const SUPERVISOR_STOP_TIMEOUT_MS = 30_000;
|
||||
const SHUTDOWN_TIMEOUT_MS = SUPERVISOR_STOP_TIMEOUT_MS - 5_000;
|
||||
const resolveRestartDrainTimeoutMs = () => {
|
||||
try {
|
||||
const timeoutMs = loadConfig().gateway?.reload?.deferralTimeoutMs;
|
||||
return typeof timeoutMs === "number" && Number.isFinite(timeoutMs) && timeoutMs >= 0
|
||||
? timeoutMs
|
||||
: DEFAULT_RESTART_DRAIN_TIMEOUT_MS;
|
||||
} catch {
|
||||
return DEFAULT_RESTART_DRAIN_TIMEOUT_MS;
|
||||
}
|
||||
};
|
||||
|
||||
const request = (action: GatewayRunSignalAction, signal: string) => {
|
||||
if (shuttingDown) {
|
||||
@@ -124,10 +135,13 @@ export async function runGatewayLoop(params: {
|
||||
}
|
||||
shuttingDown = true;
|
||||
const isRestart = action === "restart";
|
||||
const restartDrainTimeoutMs = isRestart ? resolveRestartDrainTimeoutMs() : 0;
|
||||
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
|
||||
|
||||
// Allow extra time for draining active turns on restart.
|
||||
const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS;
|
||||
const forceExitMs = isRestart
|
||||
? restartDrainTimeoutMs + SHUTDOWN_TIMEOUT_MS
|
||||
: SHUTDOWN_TIMEOUT_MS;
|
||||
const forceExitTimer = setTimeout(() => {
|
||||
gatewayLog.error("shutdown timed out; exiting without full cleanup");
|
||||
// Keep the in-process watchdog below the supervisor stop budget so this
|
||||
@@ -155,14 +169,14 @@ export async function runGatewayLoop(params: {
|
||||
|
||||
if (activeTasks > 0 || activeRuns > 0) {
|
||||
gatewayLog.info(
|
||||
`draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`,
|
||||
`draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart (timeout ${restartDrainTimeoutMs}ms)`,
|
||||
);
|
||||
const [tasksDrain, runsDrain] = await Promise.all([
|
||||
activeTasks > 0
|
||||
? waitForActiveTasks(DRAIN_TIMEOUT_MS)
|
||||
? waitForActiveTasks(restartDrainTimeoutMs)
|
||||
: Promise.resolve({ drained: true }),
|
||||
activeRuns > 0
|
||||
? waitForActiveEmbeddedRuns(DRAIN_TIMEOUT_MS)
|
||||
? waitForActiveEmbeddedRuns(restartDrainTimeoutMs)
|
||||
: Promise.resolve({ drained: true }),
|
||||
]);
|
||||
if (tasksDrain.drained && runsDrain.drained) {
|
||||
|
||||
@@ -12,6 +12,7 @@ import { ensureOpenClawModelsJson } from "../agents/models-config.js";
|
||||
import { resolveModel } from "../agents/pi-embedded-runner/model.js";
|
||||
import { resolveAgentSessionDirs } from "../agents/session-dirs.js";
|
||||
import { cleanStaleLockFiles } from "../agents/session-write-lock.js";
|
||||
import { scheduleSubagentOrphanRecovery } from "../agents/subagent-registry.js";
|
||||
import type { CliDeps } from "../cli/deps.js";
|
||||
import type { loadConfig } from "../config/config.js";
|
||||
import { resolveAgentModelPrimaryValue } from "../config/model-input.js";
|
||||
@@ -215,6 +216,10 @@ export async function startGatewaySidecars(params: {
|
||||
}, 750);
|
||||
}
|
||||
|
||||
// Same-process SIGUSR1 restarts keep subagent registry memory alive, so
|
||||
// schedule recovery on every startup cycle instead of only cold restore.
|
||||
scheduleSubagentOrphanRecovery();
|
||||
|
||||
return { pluginServices };
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user