fix(agents): stabilize subagent lifecycle

This commit is contained in:
Peter Steinberger
2026-04-12 16:07:46 +01:00
parent 9f09001014
commit d6bb36730b
11 changed files with 100 additions and 17 deletions

View File

@@ -750,6 +750,7 @@ export async function runEmbeddedPiAgent(
const activeErrorContext = resolveActiveErrorContext({
provider,
model: modelId,
assistant: currentAttemptAssistant ?? sessionLastAssistant,
});
const resolveReplayInvalidForAttempt = (incompleteTurnText?: string | null) =>
accumulatedReplayState.replayInvalid ||

View File

@@ -9,4 +9,17 @@ describe("resolveActiveErrorContext", () => {
});
expect(result).toEqual({ provider: "deepseek", model: "deepseek-chat" });
});
it("prefers assistant provider/model when the failing attempt reports them", () => {
const result = resolveActiveErrorContext({
provider: "openai",
model: "gpt-5.4",
assistant: {
provider: "openai",
model: "gpt-5.4-codex",
},
});
expect(result).toEqual({ provider: "openai", model: "gpt-5.4-codex" });
});
});

View File

@@ -77,13 +77,19 @@ export function resolveMaxRunRetryIterations(profileCandidateCount: number): num
return Math.min(MAX_RUN_RETRY_ITERATIONS, Math.max(MIN_RUN_RETRY_ITERATIONS, scaled));
}
export function resolveActiveErrorContext(params: { provider: string; model: string }): {
export function resolveActiveErrorContext(params: {
provider: string;
model: string;
assistant?: { provider?: string; model?: string };
}): {
provider: string;
model: string;
} {
const assistantProvider = params.assistant?.provider?.trim();
const assistantModel = params.assistant?.model?.trim();
return {
provider: params.provider,
model: params.model,
provider: assistantProvider || params.provider,
model: assistantModel || params.model,
};
}

View File

@@ -151,6 +151,14 @@ describe("waitForAgentRun", () => {
});
});
it("preserves pending agent.wait status", async () => {
callGatewayMock.mockResolvedValue({ status: "pending" });
const result = await waitForAgentRun({ runId: "run-pending", timeoutMs: 500 });
expect(result).toEqual({ status: "pending" });
});
it("preserves timing metadata from agent.wait", async () => {
callGatewayMock.mockResolvedValue({
status: "ok",

View File

@@ -18,7 +18,7 @@ export type AssistantReplySnapshot = {
};
export type AgentWaitResult = {
status: "ok" | "timeout" | "error";
status: "ok" | "timeout" | "error" | "pending";
error?: string;
startedAt?: number;
endedAt?: number;
@@ -133,6 +133,9 @@ export async function waitForAgentRun(params: {
if (wait?.status === "timeout") {
return normalizeAgentWaitResult("timeout", wait);
}
if (wait?.status === "pending") {
return normalizeAgentWaitResult("pending", wait);
}
if (wait?.status === "error") {
return normalizeAgentWaitResult("error", wait);
}

View File

@@ -128,6 +128,11 @@ function shouldStripThreadFromAnnounceEntry(
return requesterTarget !== entryTarget;
}
}
const requesterTarget = normalizeOptionalString(normalizedRequester.to);
const entryTarget = normalizeOptionalString(normalizedEntry?.to);
if (requesterTarget && entryTarget) {
return requesterTarget !== entryTarget;
}
return false;
}

View File

@@ -16,7 +16,8 @@ import * as hookRunnerGlobal from "../plugins/hook-runner-global.js";
import type { HookRunner } from "../plugins/hooks.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createChannelTestPluginBase, createTestRegistry } from "../test-utils/channel-plugins.js";
import * as piEmbedded from "./pi-embedded.js";
import * as piEmbedded from "./pi-embedded-runner/runs.js";
import { __testing as subagentAnnounceDeliveryTesting } from "./subagent-announce-delivery.js";
import * as agentStep from "./tools/agent-step.js";
type AgentCallRequest = { method?: string; params?: Record<string, unknown> };
@@ -202,6 +203,7 @@ describe("subagent announce formatting", () => {
});
afterAll(() => {
subagentAnnounceDeliveryTesting.setDepsForTest();
clearRuntimeConfigSnapshot();
if (previousFastTestEnv === undefined) {
delete process.env.OPENCLAW_TEST_FAST;
@@ -243,6 +245,11 @@ describe("subagent announce formatting", () => {
}
return {};
});
subagentAnnounceDeliveryTesting.setDepsForTest({
callGateway: async <T = Record<string, unknown>>(
req: Parameters<typeof gatewayCall.callGateway>[0],
) => (await callGatewaySpy(req)) as T,
});
loadSessionStoreSpy.mockReset().mockImplementation(() => loadSessionStoreFixture());
resolveAgentIdFromSessionKeySpy.mockReset().mockImplementation(() => "main");
resolveStorePathSpy.mockReset().mockImplementation(() => "/tmp/sessions.json");

View File

@@ -42,11 +42,13 @@ import { isAnnounceSkip } from "./tools/sessions-send-tokens.js";
type SubagentAnnounceDeps = {
callGateway: typeof callGateway;
loadConfig: typeof loadConfig;
loadSubagentRegistryRuntime: typeof loadSubagentRegistryRuntime;
};
const defaultSubagentAnnounceDeps: SubagentAnnounceDeps = {
callGateway,
loadConfig,
loadSubagentRegistryRuntime,
};
let subagentAnnounceDeps: SubagentAnnounceDeps = defaultSubagentAnnounceDeps;
@@ -266,7 +268,7 @@ export async function runSubagentAnnounceFlow(params: {
| Awaited<ReturnType<typeof loadSubagentRegistryRuntime>>
| undefined;
try {
subagentRegistryRuntime = await loadSubagentRegistryRuntime();
subagentRegistryRuntime = await subagentAnnounceDeps.loadSubagentRegistryRuntime();
if (
requesterDepth >= 1 &&
subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession(

View File

@@ -83,6 +83,9 @@ export function createSubagentRunManager(params: {
if (!entry) {
return;
}
if (wait.status === "pending") {
return;
}
let mutated = false;
if (typeof wait.startedAt === "number") {
entry.startedAt = wait.startedAt;

View File

@@ -114,7 +114,11 @@ vi.mock("./subagent-registry.store.js", () => ({
}));
describe("subagent registry lifecycle error grace", () => {
let previousFastTestEnv: string | undefined;
beforeEach(async () => {
previousFastTestEnv = process.env.OPENCLAW_TEST_FAST;
process.env.OPENCLAW_TEST_FAST = "1";
vi.useFakeTimers();
callGatewayMock.mockClear();
onAgentEventMock.mockClear();
@@ -158,6 +162,18 @@ describe("subagent registry lifecycle error grace", () => {
subagentAnnounceTesting.setDepsForTest({
callGateway: callGatewayMock as typeof import("../gateway/call.js").callGateway,
loadConfig: loadConfigMock as typeof import("../config/config.js").loadConfig,
loadSubagentRegistryRuntime: async () => ({
countActiveDescendantRuns: mod.countActiveDescendantRuns,
countPendingDescendantRuns: mod.countPendingDescendantRuns,
countPendingDescendantRunsExcludingRun: mod.countPendingDescendantRunsExcludingRun,
getLatestSubagentRunByChildSessionKey: mod.getLatestSubagentRunByChildSessionKey,
isSubagentSessionRunActive: mod.isSubagentSessionRunActive,
listSubagentRunsForRequester: mod.listSubagentRunsForRequester,
replaceSubagentRunAfterSteer: mod.replaceSubagentRunAfterSteer,
resolveRequesterForChildSession: mod.resolveRequesterForChildSession,
shouldIgnorePostCompletionAnnounceForSession:
mod.shouldIgnorePostCompletionAnnounceForSession,
}),
});
subagentAnnounceDeliveryTesting.setDepsForTest({
callGateway: callGatewayMock as typeof import("../gateway/call.js").callGateway,
@@ -177,6 +193,11 @@ describe("subagent registry lifecycle error grace", () => {
mod.__testing.setDepsForTest();
mod.resetSubagentRegistryForTests({ persist: false });
vi.useRealTimers();
if (previousFastTestEnv === undefined) {
delete process.env.OPENCLAW_TEST_FAST;
} else {
process.env.OPENCLAW_TEST_FAST = previousFastTestEnv;
}
});
const flushAsync = async () => {
@@ -209,6 +230,20 @@ describe("subagent registry lifecycle error grace", () => {
throw new Error(`expected ${expectedCount} agent call(s), got ${getAgentCalls().length}`);
};
const waitForFrozenResultText = async (runId: string, expectedText: string) => {
for (let attempt = 0; attempt < 80; attempt += 1) {
const run = mod
.listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY)
.find((candidate) => candidate.runId === runId);
if (run?.frozenResultText === expectedText) {
return run;
}
await vi.advanceTimersByTimeAsync(1);
await flushAsync();
}
throw new Error(`run ${runId} frozen result did not refresh`);
};
function registerCompletionRun(runId: string, childSuffix: string, task: string) {
mod.registerSubagentRun({
runId,
@@ -394,11 +429,10 @@ describe("subagent registry lifecycle error grace", () => {
{ phase: "end", endedAt: endedAt + 200 },
{ sessionKey: "agent:main:subagent:refresh" },
);
await flushAsync();
const runAfterRefresh = mod
.listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY)
.find((candidate) => candidate.runId === "run-refresh");
const runAfterRefresh = await waitForFrozenResultText(
"run-refresh",
"All 3 subagents complete. Here's the final summary.",
);
expect(runAfterRefresh?.frozenResultText).toBe(
"All 3 subagents complete. Here's the final summary.",
);

View File

@@ -70,12 +70,13 @@ export function resolveConversationDeliveryTarget(params: {
const isThreadChild =
conversationId && parentConversationId && parentConversationId !== conversationId;
if (channel && isThreadChild) {
if (
channel === "matrix" ||
channel === "slack" ||
channel === "mattermost" ||
channel === "telegram"
) {
if (channel === "matrix") {
return {
to: `room:${parentConversationId}`,
threadId: conversationId,
};
}
if (channel === "slack" || channel === "mattermost" || channel === "telegram") {
return {
to: `channel:${parentConversationId}`,
threadId: conversationId,