From d6bb36730b507f9fb7f272be14b77cf61e3b61f8 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 12 Apr 2026 16:07:46 +0100 Subject: [PATCH] fix(agents): stabilize subagent lifecycle --- src/agents/pi-embedded-runner/run.ts | 1 + .../run/helpers.resolve-error-context.test.ts | 13 ++++++ src/agents/pi-embedded-runner/run/helpers.ts | 12 +++-- src/agents/run-wait.test.ts | 8 ++++ src/agents/run-wait.ts | 5 ++- src/agents/subagent-announce-origin.ts | 5 +++ .../subagent-announce.format.e2e.test.ts | 9 +++- src/agents/subagent-announce.ts | 4 +- src/agents/subagent-registry-run-manager.ts | 3 ++ ...registry.lifecycle-retry-grace.e2e.test.ts | 44 ++++++++++++++++--- src/utils/delivery-context.ts | 13 +++--- 11 files changed, 100 insertions(+), 17 deletions(-) diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 5a01b929bb7..f5cd5590171 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -750,6 +750,7 @@ export async function runEmbeddedPiAgent( const activeErrorContext = resolveActiveErrorContext({ provider, model: modelId, + assistant: currentAttemptAssistant ?? sessionLastAssistant, }); const resolveReplayInvalidForAttempt = (incompleteTurnText?: string | null) => accumulatedReplayState.replayInvalid || diff --git a/src/agents/pi-embedded-runner/run/helpers.resolve-error-context.test.ts b/src/agents/pi-embedded-runner/run/helpers.resolve-error-context.test.ts index 6e0e6bae31d..437f8b223a7 100644 --- a/src/agents/pi-embedded-runner/run/helpers.resolve-error-context.test.ts +++ b/src/agents/pi-embedded-runner/run/helpers.resolve-error-context.test.ts @@ -9,4 +9,17 @@ describe("resolveActiveErrorContext", () => { }); expect(result).toEqual({ provider: "deepseek", model: "deepseek-chat" }); }); + + it("prefers assistant provider/model when the failing attempt reports them", () => { + const result = resolveActiveErrorContext({ + provider: "openai", + model: "gpt-5.4", + assistant: { + provider: "openai", + model: "gpt-5.4-codex", + }, + }); + + expect(result).toEqual({ provider: "openai", model: "gpt-5.4-codex" }); + }); }); diff --git a/src/agents/pi-embedded-runner/run/helpers.ts b/src/agents/pi-embedded-runner/run/helpers.ts index 7704340b87c..fda43062e2d 100644 --- a/src/agents/pi-embedded-runner/run/helpers.ts +++ b/src/agents/pi-embedded-runner/run/helpers.ts @@ -77,13 +77,19 @@ export function resolveMaxRunRetryIterations(profileCandidateCount: number): num return Math.min(MAX_RUN_RETRY_ITERATIONS, Math.max(MIN_RUN_RETRY_ITERATIONS, scaled)); } -export function resolveActiveErrorContext(params: { provider: string; model: string }): { +export function resolveActiveErrorContext(params: { + provider: string; + model: string; + assistant?: { provider?: string; model?: string }; +}): { provider: string; model: string; } { + const assistantProvider = params.assistant?.provider?.trim(); + const assistantModel = params.assistant?.model?.trim(); return { - provider: params.provider, - model: params.model, + provider: assistantProvider || params.provider, + model: assistantModel || params.model, }; } diff --git a/src/agents/run-wait.test.ts b/src/agents/run-wait.test.ts index bc6bc63270f..eca0385915a 100644 --- a/src/agents/run-wait.test.ts +++ b/src/agents/run-wait.test.ts @@ -151,6 +151,14 @@ describe("waitForAgentRun", () => { }); }); + it("preserves pending agent.wait status", async () => { + callGatewayMock.mockResolvedValue({ status: "pending" }); + + const result = await waitForAgentRun({ runId: "run-pending", timeoutMs: 500 }); + + expect(result).toEqual({ status: "pending" }); + }); + it("preserves timing metadata from agent.wait", async () => { callGatewayMock.mockResolvedValue({ status: "ok", diff --git a/src/agents/run-wait.ts b/src/agents/run-wait.ts index b7f49b75eac..fe4cd1e486c 100644 --- a/src/agents/run-wait.ts +++ b/src/agents/run-wait.ts @@ -18,7 +18,7 @@ export type AssistantReplySnapshot = { }; export type AgentWaitResult = { - status: "ok" | "timeout" | "error"; + status: "ok" | "timeout" | "error" | "pending"; error?: string; startedAt?: number; endedAt?: number; @@ -133,6 +133,9 @@ export async function waitForAgentRun(params: { if (wait?.status === "timeout") { return normalizeAgentWaitResult("timeout", wait); } + if (wait?.status === "pending") { + return normalizeAgentWaitResult("pending", wait); + } if (wait?.status === "error") { return normalizeAgentWaitResult("error", wait); } diff --git a/src/agents/subagent-announce-origin.ts b/src/agents/subagent-announce-origin.ts index b19e5ff4721..83ba2a11d64 100644 --- a/src/agents/subagent-announce-origin.ts +++ b/src/agents/subagent-announce-origin.ts @@ -128,6 +128,11 @@ function shouldStripThreadFromAnnounceEntry( return requesterTarget !== entryTarget; } } + const requesterTarget = normalizeOptionalString(normalizedRequester.to); + const entryTarget = normalizeOptionalString(normalizedEntry?.to); + if (requesterTarget && entryTarget) { + return requesterTarget !== entryTarget; + } return false; } diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index 11b8c2da577..e0393ea28e6 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -16,7 +16,8 @@ import * as hookRunnerGlobal from "../plugins/hook-runner-global.js"; import type { HookRunner } from "../plugins/hooks.js"; import { setActivePluginRegistry } from "../plugins/runtime.js"; import { createChannelTestPluginBase, createTestRegistry } from "../test-utils/channel-plugins.js"; -import * as piEmbedded from "./pi-embedded.js"; +import * as piEmbedded from "./pi-embedded-runner/runs.js"; +import { __testing as subagentAnnounceDeliveryTesting } from "./subagent-announce-delivery.js"; import * as agentStep from "./tools/agent-step.js"; type AgentCallRequest = { method?: string; params?: Record }; @@ -202,6 +203,7 @@ describe("subagent announce formatting", () => { }); afterAll(() => { + subagentAnnounceDeliveryTesting.setDepsForTest(); clearRuntimeConfigSnapshot(); if (previousFastTestEnv === undefined) { delete process.env.OPENCLAW_TEST_FAST; @@ -243,6 +245,11 @@ describe("subagent announce formatting", () => { } return {}; }); + subagentAnnounceDeliveryTesting.setDepsForTest({ + callGateway: async >( + req: Parameters[0], + ) => (await callGatewaySpy(req)) as T, + }); loadSessionStoreSpy.mockReset().mockImplementation(() => loadSessionStoreFixture()); resolveAgentIdFromSessionKeySpy.mockReset().mockImplementation(() => "main"); resolveStorePathSpy.mockReset().mockImplementation(() => "/tmp/sessions.json"); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 422ca59f5e6..db308f55657 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -42,11 +42,13 @@ import { isAnnounceSkip } from "./tools/sessions-send-tokens.js"; type SubagentAnnounceDeps = { callGateway: typeof callGateway; loadConfig: typeof loadConfig; + loadSubagentRegistryRuntime: typeof loadSubagentRegistryRuntime; }; const defaultSubagentAnnounceDeps: SubagentAnnounceDeps = { callGateway, loadConfig, + loadSubagentRegistryRuntime, }; let subagentAnnounceDeps: SubagentAnnounceDeps = defaultSubagentAnnounceDeps; @@ -266,7 +268,7 @@ export async function runSubagentAnnounceFlow(params: { | Awaited> | undefined; try { - subagentRegistryRuntime = await loadSubagentRegistryRuntime(); + subagentRegistryRuntime = await subagentAnnounceDeps.loadSubagentRegistryRuntime(); if ( requesterDepth >= 1 && subagentRegistryRuntime.shouldIgnorePostCompletionAnnounceForSession( diff --git a/src/agents/subagent-registry-run-manager.ts b/src/agents/subagent-registry-run-manager.ts index d7f27daa7c1..b8f805111ce 100644 --- a/src/agents/subagent-registry-run-manager.ts +++ b/src/agents/subagent-registry-run-manager.ts @@ -83,6 +83,9 @@ export function createSubagentRunManager(params: { if (!entry) { return; } + if (wait.status === "pending") { + return; + } let mutated = false; if (typeof wait.startedAt === "number") { entry.startedAt = wait.startedAt; diff --git a/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts b/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts index bd431a8694e..9df935b084f 100644 --- a/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts +++ b/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts @@ -114,7 +114,11 @@ vi.mock("./subagent-registry.store.js", () => ({ })); describe("subagent registry lifecycle error grace", () => { + let previousFastTestEnv: string | undefined; + beforeEach(async () => { + previousFastTestEnv = process.env.OPENCLAW_TEST_FAST; + process.env.OPENCLAW_TEST_FAST = "1"; vi.useFakeTimers(); callGatewayMock.mockClear(); onAgentEventMock.mockClear(); @@ -158,6 +162,18 @@ describe("subagent registry lifecycle error grace", () => { subagentAnnounceTesting.setDepsForTest({ callGateway: callGatewayMock as typeof import("../gateway/call.js").callGateway, loadConfig: loadConfigMock as typeof import("../config/config.js").loadConfig, + loadSubagentRegistryRuntime: async () => ({ + countActiveDescendantRuns: mod.countActiveDescendantRuns, + countPendingDescendantRuns: mod.countPendingDescendantRuns, + countPendingDescendantRunsExcludingRun: mod.countPendingDescendantRunsExcludingRun, + getLatestSubagentRunByChildSessionKey: mod.getLatestSubagentRunByChildSessionKey, + isSubagentSessionRunActive: mod.isSubagentSessionRunActive, + listSubagentRunsForRequester: mod.listSubagentRunsForRequester, + replaceSubagentRunAfterSteer: mod.replaceSubagentRunAfterSteer, + resolveRequesterForChildSession: mod.resolveRequesterForChildSession, + shouldIgnorePostCompletionAnnounceForSession: + mod.shouldIgnorePostCompletionAnnounceForSession, + }), }); subagentAnnounceDeliveryTesting.setDepsForTest({ callGateway: callGatewayMock as typeof import("../gateway/call.js").callGateway, @@ -177,6 +193,11 @@ describe("subagent registry lifecycle error grace", () => { mod.__testing.setDepsForTest(); mod.resetSubagentRegistryForTests({ persist: false }); vi.useRealTimers(); + if (previousFastTestEnv === undefined) { + delete process.env.OPENCLAW_TEST_FAST; + } else { + process.env.OPENCLAW_TEST_FAST = previousFastTestEnv; + } }); const flushAsync = async () => { @@ -209,6 +230,20 @@ describe("subagent registry lifecycle error grace", () => { throw new Error(`expected ${expectedCount} agent call(s), got ${getAgentCalls().length}`); }; + const waitForFrozenResultText = async (runId: string, expectedText: string) => { + for (let attempt = 0; attempt < 80; attempt += 1) { + const run = mod + .listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY) + .find((candidate) => candidate.runId === runId); + if (run?.frozenResultText === expectedText) { + return run; + } + await vi.advanceTimersByTimeAsync(1); + await flushAsync(); + } + throw new Error(`run ${runId} frozen result did not refresh`); + }; + function registerCompletionRun(runId: string, childSuffix: string, task: string) { mod.registerSubagentRun({ runId, @@ -394,11 +429,10 @@ describe("subagent registry lifecycle error grace", () => { { phase: "end", endedAt: endedAt + 200 }, { sessionKey: "agent:main:subagent:refresh" }, ); - await flushAsync(); - - const runAfterRefresh = mod - .listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY) - .find((candidate) => candidate.runId === "run-refresh"); + const runAfterRefresh = await waitForFrozenResultText( + "run-refresh", + "All 3 subagents complete. Here's the final summary.", + ); expect(runAfterRefresh?.frozenResultText).toBe( "All 3 subagents complete. Here's the final summary.", ); diff --git a/src/utils/delivery-context.ts b/src/utils/delivery-context.ts index 92aace23879..4c58a39a322 100644 --- a/src/utils/delivery-context.ts +++ b/src/utils/delivery-context.ts @@ -70,12 +70,13 @@ export function resolveConversationDeliveryTarget(params: { const isThreadChild = conversationId && parentConversationId && parentConversationId !== conversationId; if (channel && isThreadChild) { - if ( - channel === "matrix" || - channel === "slack" || - channel === "mattermost" || - channel === "telegram" - ) { + if (channel === "matrix") { + return { + to: `room:${parentConversationId}`, + threadId: conversationId, + }; + } + if (channel === "slack" || channel === "mattermost" || channel === "telegram") { return { to: `channel:${parentConversationId}`, threadId: conversationId,